privateWatermarkGaugeExposingOutput createOutputCollector(StreamTask, ?> containingTask,StreamConfig operatorConfig,Map chainedConfigs,ClassLoader userCodeClassloader,Map> streamOutputs,List> allOperators) {List>, StreamEdge>> allOutputs = new ArrayList<>(4);// create collectors for the network outputsfor (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {@SuppressWarnings("unchecked")RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);allOutputs.add(new Tuple2<>(output, outputEdge));}// Create collectors for the chained outputsfor (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {int outputId = outputEdge.getTargetId();StreamConfig chainedOpConfig = chainedConfigs.get(outputId);WatermarkGaugeExposingOutput output = createChainedOperator(containingTask,chainedOpConfig,chainedConfigs,userCodeClassloader,streamOutputs,allOperators,outputEdge.getOutputTag());allOutputs.add(new Tuple2<>(output, outputEdge));}// 以下略......}
该方法从上一节提到的 StreamConfig 中分别取出出边和链接边的数据 , 并创建各自的 Output 。 出边的 Output 就是将数据发往算子链之外下游的 RecordWriterOutput , 而链接边的输出要靠 createChainedOperator() 方法 。
private WatermarkGaugeExposingOutput> createChainedOperator(StreamTask, ?> containingTask,StreamConfig operatorConfig,Map chainedConfigs,ClassLoader userCodeClassloader,Map> streamOutputs,List> allOperators,OutputTag outputTag) {// create the output that the operator writes to first. this may recursively create more operatorsWatermarkGaugeExposingOutput> chainedOperatorOutput = createOutputCollector(containingTask,operatorConfig,chainedConfigs,userCodeClassloader,streamOutputs,allOperators);// now create the operator and give it the output collector to write its output toStreamOperatorFactory chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);OneInputStreamOperator chainedOperator = chainedOperatorFactory.createStreamOperator(containingTask, operatorConfig, chainedOperatorOutput);allOperators.add(chainedOperator);WatermarkGaugeExposingOutput> currentOperatorOutput;if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);}else {TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);}// wrap watermark gauges since registered metrics must be uniquechainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);return currentOperatorOutput;}
我们一眼就可以看到 , 这个方法递归调用了上述 createOutputCollector() 方法 , 与逻辑计划阶段类似 , 通过不断延伸 Output 来产生 chainedOperator(即算子链中除了headOperator 之外的算子) , 并逆序返回 , 这也是 allOperators 数组中的算子顺序为倒序的原因 。
chainedOperator 产生之后 , 将它们通过 ChainingOutput 连接起来 , 形成如下图所示的结构 。
文章插图
最后来看看 ChainingOutput.collect() 方法是如何输出数据流的 。
@Overridepublic void collect(StreamRecord record) {if (this.outputTag != null) {// we are only responsible for emitting to the main inputreturn;}pushToOperator(record);}@Overridepublic void collect(OutputTag outputTag, StreamRecord record) {if (this.outputTag == null || !this.outputTag.equals(outputTag)) {// we are only responsible for emitting to the side-output specified by our// OutputTag.return;}pushToOperator(record);}protected void pushToOperator(StreamRecord record) {try {// we know that the given outputTag matches our OutputTag so the record// must be of the type that our operator expects.@SuppressWarnings("unchecked")StreamRecord castRecord = (StreamRecord) record;numRecordsIn.inc();operator.setKeyContextElement1(castRecord);operator.processElement(castRecord);}catch (Exception e) {throw new ExceptionInChainedOperatorException(e);}}
可见是通过调用链接算子的 processElement() 方法 , 直接将数据推给下游处理了 。 也就是说 , OperatorChain 完全可以看做一个由 headOperator 和 streamOutputs组成的单个算子 , 其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽 , 同时没有引入任何 overhead 。
打通了算子链在执行层的逻辑 , 看官应该会明白 chainEntryPoint 的含义了 。 由于它位于递归返回的终点 , 所以它就是流入算子链的起始 Output , 即上图中指向 headOperator 的 RecordWriterOutput 。