FlinkSQL 动态加载 UDF 实现思路( 二 )
修改后 , 我们将 UDF jar 存放到 OSS 中进行管理 。 当 Job 需要依赖某个 UDF 时 , 只需要通过动态加载就可以完成 。 动态加载使用 URLClassLoader 实现 , 使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中 , 并取得 returnSelf 类 。
代码调整后存在的问题
文章插图
运行结果:代码调整后 , 在本地 IDEA 运行程序(即 , 启动了 Mini Cluster集群)是可以成功运行的 。 但是当发布到远程 Flink 集群上时(采用 Flink on K8S,Session Cluster 部署模式) , 会出现找不到 UDF 异常 , 如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf
分析:这是由于 Flink 的部署方式有多种 。 在本地运行的启动的是 MiniCluster , 即 JobManager 和 TaskManager 在同一个JVM 进程中 。 而我们在远程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 进程 。
文章插图
在 Session 模式下 , 客户端在 main() 方法开始执行直到 env.execute() 方法之前需要完成以下三件事情
- 获取作业所需的依赖项
- 通过执行环境分析并取得逻辑计划 , 即StreamGraph→JobGraph
- 将依赖项和JobGraph上传到集群中
而当在远程集群时 , 客户端实现动态加载 Jar 后将依赖项和 JobGraph 提交给 JobMananger , 但是由于 JobMananger 和 TaskMananger 是处于不同的 JVM进程中 , 且没有对自定义 UDF Jar URL 进行分发 , 这会让 TaskMananger 在运行任务时出现 Class Not Found 异常 , 这是因为 TaskMananger 没有进行类加载 , JVM 中没有 returnSelf 类所导致 。
解决 UDF Jar 分发的思路基于以上问题我们查阅了一些相关资料及阅读源码 , 以以下三点为条件
- 基于采用 Session 模式部署
- 基于 REST API 提交 Job 而不采用命令行方式
- 不改动 Flink 源码
文章插图
但是我们平台由于采用 REST API , 而提交 Job 的 API 并没有提供该参数 , 所以在不改变 Flink 源码的前提下进行源码研究 , 最后发现可以在 main 中将 UDF Jar 的 URL 加到配置项 pipeline.classpaths 中 , 也就是曲线救国实现了 -C 的效果 。 在 main 中增加以下代码片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);
- 动态降噪+双设备连接,华为FreeBuds Pro上手评
- 网络比15年前更慢错误更多?开发者加载了100万个网站实测
- 算法萌新如何学好动态规划(3)
- 大神已提取出一加8T的动态壁纸:Android 8.0+设备均可使用
- 关于边缘计算与网络动态加速
- PS5系统更新带来动态调整游戏机的风扇速度特性 以提升散热
- “会员配送费更贵”,美团外卖回应了
- Google Photos丰富Memories功能:新增循环显示图片的动态壁纸功能
- Firefox 83将默认启用Warp更新:大幅提升响应时间和加载速度
- 当“一兆难求”遇上动态频谱共享,爱立信与四川电信的5G实践