FlinkSQL 动态加载 UDF 实现思路

导读: 最近在对 Flink 进行平台化 , 基于 REST API 构建一个平台实现通过纯 SQL 化编写和管理 Job 。 尽管 Flink官方希望用户将所有的依赖和业务逻辑打成一个fat jar , 这样方便提交 。 但我们在开发的过程中想对用户自定义 UDF Jar 进行管理 , 想将 UDF Jar 存储管理在阿里云 OSS, 在 Job 中通过动态加载的方式将 UDF Jar 加载进来 , 取代之前将 UDF 和 Job 打成一个 fat jar 的方式 。 下面将从几点展开讨论:

  • 将 UDF 写到 Job 中并打成一个 fat jar 的实现方式
  • 动态加载 UDF Jar 代码调整
  • 代码调整后存在的问题
  • 解决 UDF Jar URL 分发的思路
环境
  • Flink 1.11.2
  • 部署方式:Flink on Kubernetes
  • 部署模式: Session Cluster
将 UDF 写到 Job 中并打成一个 fat jar 的方式下面是一个简单采用 FlinkSQL 编写 Job 的例子 。 使用 datagen 连接器作为 Source 生成数据 ,print 作为 Sink 将结果打印到控制台 。 自定义的一个简单 UDF自定义函数(returnSelf) 。
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}要将该 Job 提交给远程 Flink 集群时 , 我们需要将 Job(包括自定义 UDF) 打成一个 fat Jar 。 但这并不是我们期望的操作 , 由于打成 fat jar 会显得比较臃肿 , 同时不方便管理 UDF Jar, 有些 UDF 具有通用性 , 可复用 。 所以我们希望将自定义的UDF Jar 独立出来保存管理 , 并在 Job 中通过动态加载的方式使用 , 如下图:
FlinkSQL 动态加载 UDF 实现思路文章插图
动态加载 UDF Jar 代码调整
  • 将 returnSelf 并独立打成一个 UDF Jar 上传到阿里云OSS 。
  • 在 Job 的 main() 方法中新增动态加载的代码
public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}修改后 , 我们将 UDF jar 存放到 OSS 中进行管理 。 当 Job 需要依赖某个 UDF 时 , 只需要通过动态加载就可以完成 。 动态加载使用 URLClassLoader 实现 , 使用被管理于 OSS 的 UDF Jar 的 URL 将 Jar 加载进 JVM 中 , 并取得 returnSelf 类 。