FlinkSQL 动态加载 UDF 实现思路( 二 )


代码调整后存在的问题
FlinkSQL 动态加载 UDF 实现思路文章插图
运行结果:代码调整后 , 在本地 IDEA 运行程序(即 , 启动了 Mini Cluster集群)是可以成功运行的 。 但是当发布到远程 Flink 集群上时(采用 Flink on K8S,Session Cluster 部署模式) , 会出现找不到 UDF 异常 , 如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf分析:这是由于 Flink 的部署方式有多种 。 在本地运行的启动的是 MiniCluster , 即 JobManager 和 TaskManager 在同一个JVM 进程中 。 而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程 。
FlinkSQL 动态加载 UDF 实现思路文章插图
在 Session 模式下 , 客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情

  • 获取作业所需的依赖项
  • 通过执行环境分析并取得逻辑计划 , 即StreamGraph→JobGraph
  • 将依赖项和JobGraph上传到集群中
只有在这些都完成之后 , 才会通过env.execute() 方法触发 Flink 运行时真正地开始执行作业 。 所以在本地运行的 Mini Cluster , 因为都处于同一个 JVM 进程 , 客户端运行 main() 方法进行动态加载后将依赖项和 JobGraph 提交给 JobMananger 再由 TaskManager 执行 Job 。
而当在远程集群时 , 客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger , 但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中 , 且没有对自定义 UDF Jar URL 进行分发 , 这会让 TaskMananger 在运行任务时出现 Class Not Found 异常 , 这是因为 TaskMananger 没有进行类加载 , JVM 中没有 returnSelf 类所导致 。
解决 UDF Jar 分发的思路基于以上问题我们查阅了一些相关资料及阅读源码 , 以以下三点为条件
  • 基于采用 Session 模式部署
  • 基于 REST API 提交 Job 而不采用命令行方式
  • 不改动 Flink 源码
分析:官网提供了一个 -C 参数 , 大致用法就是把用户自定义 Jar 放到一个 JobMananger 和 TaskMananger 都能访问到的存储地方 , 然后通过命令行方式启动 Job 时使用 -C 参数 , 后面加上自定义 Jar 的URLs 就可以实现分发 。
FlinkSQL 动态加载 UDF 实现思路文章插图
但是我们平台由于采用 REST API , 而提交 Job 的 API 并没有提供该参数 , 所以在不改变 Flink 源码的前提下进行源码研究 , 最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中 , 也就是曲线救国实现了 -C 的效果 。 在 main 中增加以下代码片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);完整代码public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}