深入解析 Flink 的算子链机制( 三 )
文章插图
最后 , 还需要将各个节点的算子链数据写入各自的 StreamConfig 中 , 算子链的起始节点要额外保存下 transitiveOutEdges 。 StreamConfig 在后文的物理执行阶段会再次用到 。
形成算子链的条件来看看 isChainable() 方法的代码 。由此可得 , 上下游算子能够 chain 在一起的条件还是非常苛刻的(老生常谈了) , 列举如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);StreamOperatorFactory> headOperator = upStreamVertex.getOperatorFactory();StreamOperatorFactory> outOperator = downStreamVertex.getOperatorFactory();return downStreamVertex.getInEdges().size() == 1}
- 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);
- 下游算子的链接策略(ChainingStrategy)为 ALWAYS ——既可以与上游链接 , 也可以与下游链接 。 我们常见的 map()、filter() 等都属此类;
- 上游算子的链接策略为 HEAD 或 ALWAYS 。 HEAD 策略表示只能与下游链接 , 这在正常情况下是 Source 算子的专属;
- 两个算子间的物理分区逻辑是 ForwardPartitioner, 可参见之前写过的《聊聊Flink DataStream 的八种物理分区逻辑》;
- 两个算子间的 shuffle 方式不是批处理模式;
- 上下游算子实例的并行度相同;
- 没有禁用算子链 。
@PublicEvolvingpublic SingleOutputStreamOperator disableChaining() {return setChainingStrategy(ChainingStrategy.NEVER);}@PublicEvolvingpublic SingleOutputStreamOperator startNewChain() {return setChainingStrategy(ChainingStrategy.HEAD);}
如果要在整个运行时环境中禁用算子链 , 调用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可 。物理计划中的算子链在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后 , 会生成调度执行的基本任务单元 ——StreamTask , 负责执行具体的 StreamOperator 逻辑 。 在StreamTask.invoke() 方法中 , 初始化了状态后端、checkpoint 存储和定时器服务之后 , 可以发现:
operatorChain = new OperatorChain<>(this, recordWriters);headOperator = operatorChain.getHeadOperator();
构造出了一个 OperatorChain 实例 , 这就是算子链在实际执行时的形态 。 解释一下OperatorChain 中的几个主要属性 。private final StreamOperator>[] allOperators;private final RecordWriterOutput>[] streamOutputs;private final WatermarkGaugeExposingOutput> chainEntryPoint;private final OP headOperator;
- headOperator:算子链的第一个算子 , 对应 JobGraph 中的算子链起始节点;
- allOperators:算子链中的所有算子 , 倒序排列 , 即 headOperator 位于该数组的末尾;
- streamOutputs:算子链的输出 , 可以有多个;
- chainEntryPoint:算子链的“入口点” , 它的含义将在后文说明 。
OperatorChain 构造方法中的核心代码如下 。
for (int i = 0; i < outEdgesInOrder.size(); i++) {StreamEdge outEdge = outEdgesInOrder.get(i);RecordWriterOutput> streamOutput = createStreamOutput(recordWriters.get(i),outEdge,chainedConfigs.get(outEdge.getSourceId()),containingTask.getEnvironment());this.streamOutputs[i] = streamOutput;streamOutputMap.put(outEdge, streamOutput);}// we create the chain of operators and grab the collector that leads into the chainList> allOps = new ArrayList<>(chainedConfigs.size());this.chainEntryPoint = createOutputCollector(containingTask,configuration,chainedConfigs,userCodeClassloader,streamOutputMap,allOps);if (operatorFactory != null) {WatermarkGaugeExposingOutput> output = getChainEntryPoint();headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());} else {headOperator = null;}// add head operator to end of chainallOps.add(headOperator);this.allOperators = allOps.toArray(new StreamOperator>[allOps.size()]);
首先会遍历算子链整体的所有出边 , 并调用 createStreamOutput() 方法创建对应的下游输出 RecordWriterOutput 。 然后就会调用 createOutputCollector() 方法创建物理的算子链 , 并返回 chainEntryPoint , 这个方法比较重要 , 部分代码如下 。
- 高像素|加持高像素只为解析力?vivo S7丛林秘境展对样张细节的要求更严苛
- 启动|拼多多深入布局母婴产业带 补贴+直播启动“母婴产品溯源”行动
- 《深入理解Java虚拟机》:对象创建、布局和访问全过程
- 用了就停不下来,解析全网视频,不仅免费还能下载
- 深入理解Netty编解码、粘包拆包、心跳机制
- 详解mysql执行计划
- 在美国当快递小哥赚钱吗?西瓜视频解析除了努力,运气也很重要
- 标识|食品行业工业互联网标识解析二级节点、“星火·链网”骨干节点在漯河上线
- 《深入理解Java虚拟机》:锁优化
- Lock、Synchronized锁区别解析