Flink 1.11:流批一体 Hive 数仓
导读:Flink 1.11 中流计算结合 Hive 批处理数仓 , 给离线数仓带来 Flink 流处理实时且 Exactly-once 的能力 。
文章摘取自Flink中文社区:“深度解读 Flink 1.11:流批一体 Hive 数仓”作者:李劲松 CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE) PARTITIONED BY (dt STRING,hour STRING) STORED AS PARQUET TBLPROPERTIES (-- 使用partition中抽取时间 , 加上watermark决定partiton commit的时机'sink.partition-commit.trigger'='partition-time',-- 配置hour级别的partition时间抽取策略 , 这个例子中dt字段是yyyy-MM-dd格式的天 , hour是0-23的小时 , timestamp-pattern定义了如何从这两个partition字段推出完整的timestamp'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,-- 配置dalay为小时级 , 当 watermark > partition时间 + 1小时 , 会commit这个partition'sink.partition-commit.delay'='1 h',-- partitiion commit的策略是:先更新metastore(addPartition) , 再写SUCCESS文件'sink.partition-commit.policy.kind’='metastore,success-file') SET table.sql-dialect=default;CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) -- 可以结合Table Hints动态指定table properties [3]INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;2、Hive streaming source
Hive 数仓中存在大量的 ETL 任务 , 这些任务往往是通过调度工具来周期性的运行 , 这样做主要有两个问题:
- 实时性不强 , 往往调度最小是小时级 。
- 流程复杂 , 组件多 , 容易出现问题 。
- Partition 表 , 监控 Partition 的生成 , 增量读取新的 Partition 。
- 非 Partition 表 , 监控文件夹内新文件的生成 , 增量读取新的文件 。
SELECT * FROM hive_table/*+ OPTIONS('streaming-source.enable'=’true’,'streaming-source.consume-start-offset'='2020-05-20') */;
【Flink 1.11:流批一体 Hive 数仓】3、实时数据关联 Hive 表在 Flink 与 Hive 集成的功能发布以后 , 我们收到最多的用户反馈之一就是希望能够将 Flink 的实时数据与离线的 Hive 表进行关联 。 因此 , 在 Flink 1.11 中 , 我们支持将实时表与 Hive 表进行 temporal join [6] 。 沿用 Flink 官方文档中的例子 , 假定 Orders 是实时表 , 而 LatestRates 是一张 Hive 表 , 用户可以通过以下语句进行temporal join:
SELECTo.amout, o.currency, r.rate, o.amount * r.rateFROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency
与 Hive 表进行 temporal join 目前只支持 processing time , 我们会把 Hive 表的数据缓存到内存中 , 并按照固定的时间间隔去更新缓存的数据 。 用户可以通过参数“lookup.join.cache.ttl” 来控制缓存更新的间隔 , 默认间隔为一个小时 。“lookup.join.cache.ttl” 需要配置到 Hive 表的 property 当中 , 因此每张表可以有不同的配置 。 另外 , 由于需要将整张 Hive 表加载到内存中 , 因此目前只适用于 Hive 表较小的场景 。
Hive 增强1、Hive Dialect 语法兼容
Flink on Hive 用户并不能很好的使用 DDL , 主要是因为:
- Flink 1.10 中进一步完善了 DDL , 但由于 Flink 与 Hive 在元数据语义上的差异 , 通过 Flink DDL 来操作 Hive 元数据的可用性比较差 , 仅能覆盖很少的应用场景 。
- 使用 Flink 对接 Hive 的用户经常需要切换到 Hive CLI 来执行 DDL 。
在 Flink 1.11中 , Hive Dialect 可以支持大部分常用的 DDL , 比如 CREATE/ALTER TABLE、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等 。 为此 , 我们为 Hive Dialect 实现了一个独立的 parser , Flink 会根据用户指定的 Dialect 决定使用哪个 parser 来解析 SQL 语句 。 用户可以通过配置项“ table.sql-dialect ” 来指定使用的 SQL Dialect 。 它的默认值为 “default” , 即 Flink 原生的 Dialect , 而将其设置为 “hive” 时就开启了 Hive Dialect 。 对于 SQL 用户 , 可以在 yaml 文件中设置“table.sql-dialect” 来指定 session 的初始 Dialect , 也可以通过 set 命令来动态调整需要使用的 Dialect , 而无需重启 session 。
- Flink中parallelism并行度和slot槽位的理解
- 网易云音乐基于Flink实时数仓实践
- flink消费kafka的offset与checkpoint
- 畅玩光追游戏 四款RTX3080显卡11.11力推
- 华为云“11.11上云嘉年华”狂欢倒计时,不玩套路福利送到底
- 京东11.11宣传片刷屏,媒体发起热爱联盟力挺京东11.11全球热爱季
- Apache Hudi 与 Apache Flink 集成
- 基于Flink+ClickHouse打造轻量级点击流实时数仓
- Lyft 基于 Flink 的大规模准实时数据分析平台
- 未来科技圈是什么样儿?20家品牌大佬坐镇京东11.11有话说