深入解析 Flink 的算子链机制( 二 )

<>();}}可见 , 该方法会先计算出 StreamGraph 中各个节点的哈希码作为唯一标识 , 并创建一个空的 Map 结构保存即将被链在一起的算子的哈希码 , 然后调用 setChaining() 方法 , 如下源码所示 。
private void setChaining(Map hashes, List> legacyHashes, Map> chainedOperatorHashes) {for (Integer sourceNodeId : streamGraph.getSourceIDs()) {createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);}}可见是逐个遍历 StreamGraph 中的 Source 节点 , 并调用 createChain() 方法 。 createChain() 是逻辑计划层创建算子链的核心方法 , 完整源码如下 , 有点长 。
private List createChain(Integer startNodeId,Integer currentNodeId,Map hashes,List> legacyHashes,int chainIndex,Map> chainedOperatorHashes) {if (!builtVertices.contains(startNodeId)) {List transitiveOutEdges = new ArrayList();List chainableOutputs = new ArrayList();List nonChainableOutputs = new ArrayList();StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);for (StreamEdge outEdge : currentNode.getOutEdges()) {if (isChainable(outEdge, streamGraph)) {chainableOutputs.add(outEdge);} else {nonChainableOutputs.add(outEdge);}}for (StreamEdge chainable : chainableOutputs) {transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));}for (StreamEdge nonChainable : nonChainableOutputs) {transitiveOutEdges.add(nonChainable);createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);}List> operatorHashes =chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());byte[] primaryHashBytes = hashes.get(currentNodeId);OperatorID currentOperatorId = new OperatorID(primaryHashBytes);for (Map legacyHash : legacyHashes) {operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));}chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));if (currentNode.getInputFormat() != null) {getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());}if (currentNode.getOutputFormat() != null) {getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());}StreamConfig config = currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes): new StreamConfig(new Configuration());setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);if (currentNodeId.equals(startNodeId)) {config.setChainStart();config.setChainIndex(0);config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());config.setOutEdgesInOrder(transitiveOutEdges);config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());for (StreamEdge edge : transitiveOutEdges) {connect(startNodeId, edge);}config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));} else {chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap());config.setChainIndex(chainIndex);StreamNode node = streamGraph.getStreamNode(currentNodeId);config.setOperatorName(node.getOperatorName());chainedConfigs.get(startNodeId).put(currentNodeId, config);}config.setOperatorID(currentOperatorId);if (chainableOutputs.isEmpty()) {config.setChainEnd();}return transitiveOutEdges;} else {return new ArrayList<>();}}先解释一下方法开头创建的 3 个 List 结构:

  • transitiveOutEdges:当前算子链在 JobGraph 中的出边列表 , 同时也是 createChain() 方法的最终返回值;
  • chainableOutputs:当前能够链在一起的 StreamGraph 边列表;
  • nonChainableOutputs:当前不能够链在一起的 StreamGraph 边列表 。
接下来 , 从 Source 开始遍历 StreamGraph 中当前节点的所有出边 , 调用 isChainable() 方法判断是否可以被链在一起(这个判断逻辑稍后会讲到) 。 可以链接的出边被放入 chainableOutputs 列表 , 否则放入 nonChainableOutputs 列表 。
对于 chainableOutputs 中的边 , 就会以这些边的直接下游为起点 , 继续递归调用createChain() 方法延展算子链 。 对于 nonChainableOutputs 中的边 , 由于当前算子链的延展已经到头 , 就会以这些“断点”为起点 , 继续递归调用 createChain() 方法试图创建新的算子链 。 也就是说 , 逻辑计划中整个创建算子链的过程都是递归的 , 亦即实际返回时 , 是从 Sink 端开始返回的 。
然后要判断当前节点是不是算子链的起始节点 。 如果是 , 则调用 createJobVertex()方法为算子链创建一个 JobVertex( 即 JobGraph 中的节点) , 也就形成了我们在Web UI 中看到的 JobGraph 效果: