Flink 1.11:流批一体 Hive 数仓

导读:Flink 1.11 中流计算结合 Hive 批处理数仓 , 给离线数仓带来 Flink 流处理实时且 Exactly-once 的能力 。
文章摘取自Flink中文社区:“深度解读 Flink 1.11:流批一体 Hive 数仓”作者:李劲松 CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE) PARTITIONED BY (dt STRING,hour STRING) STORED AS PARQUET TBLPROPERTIES (-- 使用partition中抽取时间 , 加上watermark决定partiton commit的时机'sink.partition-commit.trigger'='partition-time',-- 配置hour级别的partition时间抽取策略 , 这个例子中dt字段是yyyy-MM-dd格式的天 , hour是0-23的小时 , timestamp-pattern定义了如何从这两个partition字段推出完整的timestamp'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,-- 配置dalay为小时级 , 当 watermark > partition时间 + 1小时 , 会commit这个partition'sink.partition-commit.delay'='1 h',-- partitiion commit的策略是:先更新metastore(addPartition) , 再写SUCCESS文件'sink.partition-commit.policy.kind’='metastore,success-file') SET table.sql-dialect=default;CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) -- 可以结合Table Hints动态指定table properties [3]INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;2、Hive streaming source
Hive 数仓中存在大量的 ETL 任务 , 这些任务往往是通过调度工具来周期性的运行 , 这样做主要有两个问题:

  • 实时性不强 , 往往调度最小是小时级 。
  • 流程复杂 , 组件多 , 容易出现问题 。
针对这些离线的 ETL 作业 , Flink 1.11 为此开发了实时化的 Hive 流读 , 支持:
  • Partition 表 , 监控 Partition 的生成 , 增量读取新的 Partition 。
  • 非 Partition 表 , 监控文件夹内新文件的生成 , 增量读取新的文件 。
你甚至可以使用10分钟级别的分区策略 , 使用 Flink 的 Hive streaming source 和Hive streaming sink 可以大大提高 Hive 数仓的实时性到准实时分钟级 [4][5] , 在实时化的同时 , 也支持针对 Table 全量的 Ad-hoc 查询 , 提高灵活性 。
SELECT * FROM hive_table/*+ OPTIONS('streaming-source.enable'=’true’,'streaming-source.consume-start-offset'='2020-05-20') */;【Flink 1.11:流批一体 Hive 数仓】3、实时数据关联 Hive 表
在 Flink 与 Hive 集成的功能发布以后 , 我们收到最多的用户反馈之一就是希望能够将 Flink 的实时数据与离线的 Hive 表进行关联 。 因此 , 在 Flink 1.11 中 , 我们支持将实时表与 Hive 表进行 temporal join [6] 。 沿用 Flink 官方文档中的例子 , 假定 Orders 是实时表 , 而 LatestRates 是一张 Hive 表 , 用户可以通过以下语句进行temporal join:
SELECTo.amout, o.currency, r.rate, o.amount * r.rateFROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency与 Hive 表进行 temporal join 目前只支持 processing time , 我们会把 Hive 表的数据缓存到内存中 , 并按照固定的时间间隔去更新缓存的数据 。 用户可以通过参数“lookup.join.cache.ttl” 来控制缓存更新的间隔 , 默认间隔为一个小时 。
“lookup.join.cache.ttl” 需要配置到 Hive 表的 property 当中 , 因此每张表可以有不同的配置 。 另外 , 由于需要将整张 Hive 表加载到内存中 , 因此目前只适用于 Hive 表较小的场景 。
Hive 增强1、Hive Dialect 语法兼容
Flink on Hive 用户并不能很好的使用 DDL , 主要是因为:
  • Flink 1.10 中进一步完善了 DDL , 但由于 Flink 与 Hive 在元数据语义上的差异 , 通过 Flink DDL 来操作 Hive 元数据的可用性比较差 , 仅能覆盖很少的应用场景 。
  • 使用 Flink 对接 Hive 的用户经常需要切换到 Hive CLI 来执行 DDL 。
针对上述两个问题 , 我们提出了 FLIP-123 [7] , 通过 Hive Dialect 为用户提供 Hive语法兼容 。 该功能的最终目标 , 是为用户提供近似 Hive CLI/Beeline 的使用体验 , 让用户无需在 Flink 和 Hive 的 CLI 之间进行切换 , 甚至可以直接迁移部分 Hive 脚本到 Flink 中执行 。
在 Flink 1.11中 , Hive Dialect 可以支持大部分常用的 DDL , 比如 CREATE/ALTER TABLE、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等 。 为此 , 我们为 Hive Dialect 实现了一个独立的 parser , Flink 会根据用户指定的 Dialect 决定使用哪个 parser 来解析 SQL 语句 。 用户可以通过配置项“ table.sql-dialect ” 来指定使用的 SQL Dialect 。 它的默认值为 “default” , 即 Flink 原生的 Dialect , 而将其设置为 “hive” 时就开启了 Hive Dialect 。 对于 SQL 用户 , 可以在 yaml 文件中设置“table.sql-dialect” 来指定 session 的初始 Dialect , 也可以通过 set 命令来动态调整需要使用的 Dialect , 而无需重启 session 。