FlinkSQL 动态加载 UDF 实现思路( 二 )
代码调整后存在的问题
文章插图
运行结果:代码调整后 , 在本地 IDEA 运行程序(即 , 启动了 Mini Cluster集群)是可以成功运行的 。 但是当发布到远程 Flink 集群上时(采用 Flink on K8S,Session Cluster 部署模式) , 会出现找不到 UDF 异常 , 如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf
分析:这是由于 Flink 的部署方式有多种 。 在本地运行的启动的是 MiniCluster , 即 JobManager 和 TaskManager 在同一个JVM 进程中 。 而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程 。
文章插图
在 Session 模式下 , 客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情
- 获取作业所需的依赖项
- 通过执行环境分析并取得逻辑计划 , 即StreamGraph→JobGraph
- 将依赖项和JobGraph上传到集群中
而当在远程集群时 , 客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger , 但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中 , 且没有对自定义 UDF Jar URL 进行分发 , 这会让 TaskMananger 在运行任务时出现 Class Not Found 异常 , 这是因为 TaskMananger 没有进行类加载 , JVM 中没有 returnSelf 类所导致 。
解决 UDF Jar 分发的思路基于以上问题我们查阅了一些相关资料及阅读源码 , 以以下三点为条件
- 基于采用 Session 模式部署
- 基于 REST API 提交 Job 而不采用命令行方式
- 不改动 Flink 源码
文章插图
但是我们平台由于采用 REST API , 而提交 Job 的 API 并没有提供该参数 , 所以在不改变 Flink 源码的前提下进行源码研究 , 最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中 , 也就是曲线救国实现了 -C 的效果 。 在 main 中增加以下代码片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);
完整代码public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}
- Windows11|Windows 11 操作系统已经完全取消了 Windows 8 时代的动态磁贴
- 7nm|支持8K/120Hz动态补帧!联发科发布首款7nm电视芯片
- 耳机|Xiaomi真无线降噪耳机3Pro,动态降噪,静听曼妙
- 索尼|索尼A7M4测试:高像素高解析力,动态范围妥协
- 人工智能|IEEE Fellow姚新:在多目标动态优化问题中,演化计算仍有独特优势
- 达晨财智|融资丨「世通亨奇」获达晨财智领投近亿元A轮融资,采用动态本体技术分析数据信息
- 按钮|Axure9 打开同一个页面时显示页面中不同的动态面板
- 豆瓣|从圈子到话题,从帖子到动态,表达更自由,互动更稀缺
- 造车|百度造车最新进展:集度模拟样车已进入动态测试阶段
- 百度|百度造车最新进展:集度模拟样车已进入动态测试阶段