Apache Hudi与Apache Flink集成( 二 )
5. 实现示例1) HoodieTable
/** * Abstract implementation of a HoodieTable. * * @paramSub type of HoodieRecordPayload * @param Type of inputs * @param Type of keys * @param Type of outputs */public abstract class HoodieTable implements Serializable {protected final HoodieWriteConfig config;protected final HoodieTableMetaClient metaClient;protected final HoodieIndex index;public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata insert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata bulkInsert(HoodieEngineContext context, String instantTime,I records, Option> bulkInsertPartitioner);......}
HoodieTable 是 hudi的核心抽象之一 , 其中定义了表支持的insert,upsert,bulkInsert等操作 。 以 upsert 为例 , 输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.从类注释可以看到 T,I,K,O分别代表了hudi操作的负载数据类型、输入数据类型、主键类型以及输出数据类型 。 这些泛型将贯穿整个抽象层 。
2) HoodieEngineContext
/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */public abstract class HoodieEngineContext {public abstract List map(List data, SerializableFunction func, int parallelism);public abstract List flatMap(List data, SerializableFunction> func, int parallelism);public abstract void foreach(List data, SerializableConsumer consumer, int parallelism);......}
HoodieEngineContext 扮演了 JavaSparkContext 的角色 , 它不仅能提供所有 JavaSparkContext能提供的信息 , 还封装了 map,flatMap,foreach等诸多方法 , 隐藏了JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach()等方法的具体实现 。以map方法为例 , 在Spark的实现类 HoodieSparkEngineContext中 , map方法如下:
@Overridepublic List map(List data, SerializableFunction func, int parallelism) {return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();}
在操作List的引擎中其实现可以为(不同方法需注意线程安全问题 , 慎用parallel()):@Overridepublic List map(List data, SerializableFunction func, int parallelism) {return data.stream().parallel().map(func::apply).collect(Collectors.toList());}
注:map函数中抛出的异常 , 可以通过包装SerializableFunction func解决.这里简要介绍下 SerializableFunction:
@FunctionalInterfacepublic interface SerializableFunction extends Serializable {O apply(I v1) throws Exception;}
该方法实际上是 java.util.function.Function 的变种 , 与java.util.function.Function 不同的是 SerializableFunction可以序列化 , 可以抛异常 。 引入该函数是因为JavaSparkContext#map()函数能接收的入参必须可序列 , 同时在hudi的逻辑中 , 有多处需要抛异常 , 而在Lambda表达式中进行 try catch 代码会略显臃肿 , 不太优雅 。6.现状和后续计划6.1 工作时间轴2020年4月 , T3出行(杨华@vinoyang , 王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方案;
2020年4月 , T3出行(王祥虎@wangxianghu)在内部完成了编码实现 , 并进行了初步验证 , 得出方案可行的结论;
2020年7月 , T3出行(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的Spark版本推向社区(HUDI-1089);
2020年9月26日 , 顺丰科技基于T3内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公开PR, 使其成为业界第一个在线上使用Flink将数据写hudi的企业 。
2020年10月2日 , HUDI-1089 合并入hudi主分支 , 标志着hudi-spark解耦完成 。
6.2 后续计划1)推进hudi和flink集成
将flink与hudi的集成尽快推向社区,在初期 , 该特性可能只支持kafka数据源 。
2)性能优化
为保证hudi-spark版本的稳定性和性能 , 此次解耦没有太多考虑flink版本可能存在的性能问题 。
- C++|嵌入式开发:C++中的结构与类
- 苏宁易购|透视2022年家电市场 头部家电品牌与苏宁易购敲定提升策略
- 摄像头|李彦宏《智能交通》与百度Apollo,还是小瞧它的胃口了!
- 智能网联汽车|我国智慧城市基础设施与智能网联汽车协同发展第二批试点城市公布
- 原创|别花冤枉钱,我教你怎么样给电脑装系统,安装版与Ghost都不难!
- 华为|12月刚开始,手机圈就传来两个重磅消息,与iPhone、华为有关
- 腾讯云|白银市政府与腾讯云达成战略合作
- 华为|Windows 11明年将让用户自定义开始菜单,显示更多应用与推荐项目
- playstation5|一加10 Pro传言消息汇总,有一点与往年不同
- |与任正非并称“二任”,把千亿公司给国家,堪称中国“并购之王”