Apache Hudi 与 Apache Flink 集成( 二 )
- source 接收 Kafka 数据 , 转换成 List;
- InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当前批次无数据时 , 不创建新的 instant;
- KeyBy partitionPath 根据 partitionPath 分区 , 避免多个子任务写同一个分区;
- WriteProcessOperator 执行写操作 , 当当前分区无数据时 , 向下游发送空的结果数据凑数;
- CommitSink 接收上游任务的计算结果 , 当收到 parallelism 个结果时 , 认为上游子任务全部执行完成 , 执行 commit.
5. 实现示例1) HoodieTable
/** * Abstract implementation of a HoodieTable. * * @paramSub type of HoodieRecordPayload * @param Type of inputs * @param Type of keys * @param Type of outputs */public abstract class HoodieTable implements Serializable {protected final HoodieWriteConfig config;protected final HoodieTableMetaClient metaClient;protected final HoodieIndex index;public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata insert(HoodieEngineContext context, String instantTime,I records);public abstract HoodieWriteMetadata bulkInsert(HoodieEngineContext context, String instantTime,I records, Option> bulkInsertPartitioner);......}
HoodieTable 是 Hudi 的核心抽象之一 , 其中定义了表支持的 insert,upsert,bulkInsert 等操作 。 以 upsert 为例 , 输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型 。 这些泛型将贯穿整个抽象层 。
2) HoodieEngineContext
/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */public abstract class HoodieEngineContext {public abstract List map(List data, SerializableFunction func, int parallelism);public abstract List flatMap(List data, SerializableFunction> func, int parallelism);public abstract void foreach(List data, SerializableConsumer consumer, int parallelism);......}
HoodieEngineContext 扮演了 JavaSparkContext 的角色 , 它不仅能提供所有 JavaSparkContext 能提供的信息 , 还封装了 map,flatMap,foreach 等诸多方法 , 隐藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等方法的具体实现 。以 map 方法为例 , 在 Spark 的实现类 HoodieSparkEngineContext 中 , map 方法如下:
@Overridepublic List map(List data, SerializableFunction func, int parallelism) {return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();}
在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题 , 慎用 parallel()):@Overridepublic List map(List data, SerializableFunction func, int parallelism) {return data.stream().parallel().map(func::apply).collect(Collectors.toList());}
注:map 函数中抛出的异常 , 可以通过包装 SerializableFunction func 解决.这里简要介绍下 SerializableFunction:
@FunctionalInterfacepublic interface SerializableFunction extends Serializable {O apply(I v1) throws Exception;}
该方法实际上是 java.util.function.Function 的变种 , 与java.util.function.Function 不同的是 SerializableFunction 可以序列化 , 可以抛异常 。 引入该函数是因为 JavaSparkContext#map() 函数能接收的入参必须可序列 , 同时在hudi的逻辑中 , 有多处需要抛异常 , 而在 Lambda 表达式中进行 try catch 代码会略显臃肿 , 不太优雅 。6.现状和后续计划6.1 工作时间轴
2020 年 4 月 , T3 出行(杨华@vinoyang , 王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方案;
2020 年 4 月 , T3 出行(王祥虎@wangxianghu)在内部完成了编码实现 , 并进行了初步验证 , 得出方案可行的结论;
2020 年 7 月 , T3 出行(王祥虎@wangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社区(HUDI-1089);
2020 年 9 月 26 日 , 顺丰科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上使用 Flink 将数据写 Hudi 的企业 。
- 创意|wacom one万与创意数位屏测评
- 黑莓(BB.US)盘前涨逾32%,将与亚马逊开发智能汽车数据平台|美股异动 | US
- 巅峰|realme巅峰之作:120Hz+陶瓷机身+5000mAh 做到了颜值与性能并存
- 抖音小店|抖音进军电商,短视频的商业模式与变现,创业者该如何抓住机遇?
- YFI正式宣布与Sushiswap合作|金色DeFi日报 | 合作
- 小店|抖音小店无货源是什么?与传统模式有什么区别?
- 星期一|亚马逊:黑五与网络星期一期间 第三方卖家销售额达到48亿美元
- 迁徙|网红迁徙记:哪里才是奶与蜜之地?
- 与用户|掌握好这4个步骤,实现了规模性的盈利
- 按键|苹果与宜家合作智能家居快捷按键,定价9.99美元