“为什么我的 Flink 作业 Web UI 中只显示出了一个框 , 并且 Records Sent 和Records Received 指标都是 0 ?是我的程序写得有问题吗?”
Flink 算子链简介笔者在 Flink 社区群里经常能看到类似这样的疑问 。 这种情况几乎都不是程序有问题 , 而是因为 Flink 的 operator chain ——即算子链机制导致的 , 即提交的作业的执行计划中 , 所有算子的并发实例(即 sub-task )都因为满足特定条件而串成了整体来执行 , 自然就观察不到算子之间的数据流量了 。
当然上述是一种特殊情况 。 我们更常见到的是只有部分算子得到了算子链机制的优化 , 如官方文档中出现过多次的下图所示 , 注意 Source 和 map() 算子 。
文章插图
【深入解析 Flink 的算子链机制】算子链机制的好处是显而易见的:所有 chain 在一起的 sub-task 都会在同一个线程(即 TaskManager 的 slot)中执行 , 能够减少不必要的数据交换、序列化和上下文切换 , 从而提高作业的执行效率 。
文章插图
铺垫了这么多 , 接下来就通过源码简单看看算子链产生的条件 , 以及它是如何在 Flink Runtime 中实现的 。
逻辑计划中的算子链对 Flink Runtime 稍有了解的看官应该知道 , Flink 作业的执行计划会用三层图结构来表示 , 即:
- StreamGraph —— 原始逻辑执行计划
- JobGraph —— 优化的逻辑执行计划(Web UI 中看到的就是这个)
- ExecutionGraph —— 物理执行计划
算子链是在优化逻辑计划时加入的 , 也就是由 StreamGraph 生成 JobGraph 的过程中 。 那么我们来到负责生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 类 , 查看其核心方法 createJobGraph() 的源码 。
private List createChain(Integer startNodeId,Integer currentNodeId,Map hashes,List