深入解析 Flink 的算子链机制( 三 )


深入解析 Flink 的算子链机制文章插图
最后 , 还需要将各个节点的算子链数据写入各自的 StreamConfig 中 , 算子链的起始节点要额外保存下 transitiveOutEdges 。 StreamConfig 在后文的物理执行阶段会再次用到 。
形成算子链的条件来看看 isChainable() 方法的代码 。由此可得 , 上下游算子能够 chain 在一起的条件还是非常苛刻的(老生常谈了) , 列举如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);StreamOperatorFactory headOperator = upStreamVertex.getOperatorFactory();StreamOperatorFactory outOperator = downStreamVertex.getOperatorFactory();return downStreamVertex.getInEdges().size() == 1}

  • 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);
  • 下游算子的链接策略(ChainingStrategy)为 ALWAYS ——既可以与上游链接 , 也可以与下游链接 。 我们常见的 map()、filter() 等都属此类;
  • 上游算子的链接策略为 HEAD 或 ALWAYS 。 HEAD 策略表示只能与下游链接 , 这在正常情况下是 Source 算子的专属;
  • 两个算子间的物理分区逻辑是 ForwardPartitioner, 可参见之前写过的《聊聊Flink DataStream 的八种物理分区逻辑》;
  • 两个算子间的 shuffle 方式不是批处理模式;
  • 上下游算子实例的并行度相同;
  • 没有禁用算子链 。
禁用算子链用户可以在一个算子上调用 startNewChain() 方法强制开始一个新的算子链 , 或者调用 disableOperatorChaining() 方法指定它不参与算子链 。 代码位于 SingleOutputStreamOperator 类中 , 都是通过改变算子的链接策略实现的 。
@PublicEvolvingpublic SingleOutputStreamOperator disableChaining() {return setChainingStrategy(ChainingStrategy.NEVER);}@PublicEvolvingpublic SingleOutputStreamOperator startNewChain() {return setChainingStrategy(ChainingStrategy.HEAD);}如果要在整个运行时环境中禁用算子链 , 调用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可 。
物理计划中的算子链在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后 , 会生成调度执行的基本任务单元 ——StreamTask , 负责执行具体的 StreamOperator 逻辑 。 在StreamTask.invoke() 方法中 , 初始化了状态后端、checkpoint 存储和定时器服务之后 , 可以发现:
operatorChain = new OperatorChain<>(this, recordWriters);headOperator = operatorChain.getHeadOperator();构造出了一个 OperatorChain 实例 , 这就是算子链在实际执行时的形态 。 解释一下OperatorChain 中的几个主要属性 。
private final StreamOperator[] allOperators;private final RecordWriterOutput[] streamOutputs;private final WatermarkGaugeExposingOutput> chainEntryPoint;private final OP headOperator;
  • headOperator:算子链的第一个算子 , 对应 JobGraph 中的算子链起始节点;
  • allOperators:算子链中的所有算子 , 倒序排列 , 即 headOperator 位于该数组的末尾;
  • streamOutputs:算子链的输出 , 可以有多个;
  • chainEntryPoint:算子链的“入口点” , 它的含义将在后文说明 。
由上可知 , 所有 StreamTask 都会创建 OperatorChain 。 如果一个算子无法进入算子链 , 也会形成一个只有 headOperator 的单个算子的 OperatorChain 。
OperatorChain 构造方法中的核心代码如下 。
for (int i = 0; i < outEdgesInOrder.size(); i++) {StreamEdge outEdge = outEdgesInOrder.get(i);RecordWriterOutput streamOutput = createStreamOutput(recordWriters.get(i),outEdge,chainedConfigs.get(outEdge.getSourceId()),containingTask.getEnvironment());this.streamOutputs[i] = streamOutput;streamOutputMap.put(outEdge, streamOutput);}// we create the chain of operators and grab the collector that leads into the chainList> allOps = new ArrayList<>(chainedConfigs.size());this.chainEntryPoint = createOutputCollector(containingTask,configuration,chainedConfigs,userCodeClassloader,streamOutputMap,allOps);if (operatorFactory != null) {WatermarkGaugeExposingOutput> output = getChainEntryPoint();headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());} else {headOperator = null;}// add head operator to end of chainallOps.add(headOperator);this.allOperators = allOps.toArray(new StreamOperator[allOps.size()]);首先会遍历算子链整体的所有出边 , 并调用 createStreamOutput() 方法创建对应的下游输出 RecordWriterOutput 。 然后就会调用 createOutputCollector() 方法创建物理的算子链 , 并返回 chainEntryPoint , 这个方法比较重要 , 部分代码如下 。