<>();}}可见 , 该方法会先计算出 StreamGraph 中各个节点的哈希码作为唯一标识 , 并创建一个空的 Map 结构保存即将被链在一起的算子的哈希码 , 然后调用 setChaining() 方法 , 如下源码所示 。
private void setChaining(Map hashes, List
可见是逐个遍历 StreamGraph 中的 Source 节点 , 并调用 createChain() 方法 。 createChain() 是逻辑计划层创建算子链的核心方法 , 完整源码如下 , 有点长 。
private List createChain(Integer startNodeId,Integer currentNodeId,Map hashes,List
先解释一下方法开头创建的 3 个 List 结构:
- transitiveOutEdges:当前算子链在 JobGraph 中的出边列表 , 同时也是 createChain() 方法的最终返回值;
- chainableOutputs:当前能够链在一起的 StreamGraph 边列表;
- nonChainableOutputs:当前不能够链在一起的 StreamGraph 边列表 。
接下来 , 从 Source 开始遍历 StreamGraph 中当前节点的所有出边 , 调用 isChainable() 方法判断是否可以被链在一起(这个判断逻辑稍后会讲到) 。 可以链接的出边被放入 chainableOutputs 列表 , 否则放入 nonChainableOutputs 列表 。
对于 chainableOutputs 中的边 , 就会以这些边的直接下游为起点 , 继续递归调用createChain() 方法延展算子链 。 对于 nonChainableOutputs 中的边 , 由于当前算子链的延展已经到头 , 就会以这些“断点”为起点 , 继续递归调用 createChain() 方法试图创建新的算子链 。 也就是说 , 逻辑计划中整个创建算子链的过程都是递归的 , 亦即实际返回时 , 是从 Sink 端开始返回的 。
然后要判断当前节点是不是算子链的起始节点 。 如果是 , 则调用 createJobVertex()方法为算子链创建一个 JobVertex( 即 JobGraph 中的节点) , 也就形成了我们在Web UI 中看到的 JobGraph 效果: