FlinkSQL 动态加载 UDF 实现思路
导读: 最近在对 Flink 进行平台化 , 基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job 。 尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar , 这样方便提交 。 但我们在开发的过程中想对用户自定义 UDF Jar 进行管理 , 想将 UDF Jar 存储管理在阿里云 OSS, 在 Job 中通过动态加载的方式将 UDF Jar 加载进来 , 取代之前将 UDF 和 Job 打成一个 fat jar 的方式 。 下面将从几点展开讨论:
- 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
- 动态加载 UDF Jar 代码调整
- 代码调整后存在的问题
- 解决 UDF Jar URL 分发的思路
- Flink 1.11.2
- 部署方式:Flink on Kubernetes
- 部署模式: Session Cluster
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}
要将该 Job 提交给远程 Flink 集群时 , 我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar 。 但这并不是我们期望的操作 , 由于打成 fat jar 会显得比较臃肿 , 同时不方便管理 UDF Jar, 有些 UDF 具有通用性 , 可复用 。 所以我们希望将自定义的UDF Jar 独立出来保存管理 , 并在 Job 中通过动态加载的方式使用 , 如下图:文章插图
动态加载 UDF Jar 代码调整
- 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS 。
- 在 Job 的 main() 方法中新增动态加载的代码
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}
修改后 , 我们将 UDF jar 存放到 OSS 中进行管理 。 当 Job 需要依赖某个 UDF 时 , 只需要通过动态加载就可以完成 。 动态加载使用 URLClassLoader 实现 , 使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中 , 并取得 returnSelf 类 。
- Windows11|Windows 11 操作系统已经完全取消了 Windows 8 时代的动态磁贴
- 7nm|支持8K/120Hz动态补帧!联发科发布首款7nm电视芯片
- 耳机|Xiaomi真无线降噪耳机3Pro,动态降噪,静听曼妙
- 索尼|索尼A7M4测试:高像素高解析力,动态范围妥协
- 人工智能|IEEE Fellow姚新:在多目标动态优化问题中,演化计算仍有独特优势
- 达晨财智|融资丨「世通亨奇」获达晨财智领投近亿元A轮融资,采用动态本体技术分析数据信息
- 按钮|Axure9 打开同一个页面时显示页面中不同的动态面板
- 豆瓣|从圈子到话题,从帖子到动态,表达更自由,互动更稀缺
- 造车|百度造车最新进展:集度模拟样车已进入动态测试阶段
- 百度|百度造车最新进展:集度模拟样车已进入动态测试阶段