上一篇笔记看到restorestate时找不到operatorid的问题:
Cannotmapcheckpoint/savepointstateforoperator77fec41789154996bfa76055dea29472tothenewprogram
这些数字的变化乍看非常奇怪,这篇笔记尝试分析下这些数字是如何生成的并且修复上个例子。
1.唯一OperatorIDOperatorID对应的就是StreamGraph里的节点id,不过在Transformation-StreamGraph阶段,都还是自增的id。在生成JobGraph时,会首先为每个StreamNode生成hashID,代码入口在createJobGraph,通过得到。
定义唯一id最简单朴素高效的方式大概就是UUID,但考虑最基础的情况,如果只是程序重启或者某个transform内部修改,id可以保持不变,也就是当这个JobGraph没有发生改变时节点id无须改变。当节点只有后续节点改变,且后续节点改变对该节点没有影响时,该节点的id也无须变化。
因此,节点的id取决于3个因素:
节点在图中的位置,用于判断前序节点的部分图是否发生了变化
输入节点的id,判断输入节点是否发生变化
输出节点(chainable)的个数
其实不同的用途,id取决的因素也会不同。这里也只是大概的猜测,flink从通用性的角度,实现上也会更严格些,感兴趣的可以直接看下traverseStreamGraphAndGenerateHashes这块代码。
为了提高易用性,flink还提供了手动设置AssigningOperatorIDs的方式,uid接口用于生成节点id.
2.traverseStreamGraphAndGenerateHashes我们模仿traverseStreamGraphAndGenerateHashes实现一个简单的版本:
MicroStreamGraph记录了影响hash值的几个元素,如果没有设置uid,通过(输入节点id,chainable的出边个数,节点位置)计算,如果设置了uid,则基于设置值计算。
其中构造了两个StreamNodes,对应前面的两个例子:StreamWithStateSample、WordCount
objectJobVertexGeneratorextsApp{valhashFunction=_128(0)valhashes=[Int,Array[Byte]]//Plan//{"nodes":[{"id":1,"type":"Source:CustomSource","pact":"DataSource","contents":"Source:CustomSource","parallelism":4},{"id":2,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink:","pact":"DataSink","contents":"Sink:","parallelism":4,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}caseclassMicroStreamGraph(nodeId:Int,userSpecifiedUID:String,chainableOutEdgeCnt:Int,inEdgeNodeIds:List[Int])//(MicroStreamGraph(1,"source_uid",1,[Int]),MicroStreamGraph(2,null,0,List(1)),MicroStreamGraph(4,"count_uid",1,List(2)),MicroStreamGraph(5,null,0,List(4)))/*//(MicroStreamGraph(1,null,1,[Int]),MicroStreamGraph(2,null,0,List(1)),MicroStreamGraph(4,null,0,List(2)),MicroStreamGraph(5,null,0,List(4)))*///{==null=//=()()(0).foreach(_=())valhash=().asBytes(){inEdgeNodeId=valinEdgeNodeHash=hashes(inEdgeNodeId)println(s"inEdgeNodeHash:${byteToHexString(inEdgeNodeHash)}")(0).foreach(i=hash(i)=(hash(i)*37^inEdgeNodeHash(i)).toByte)}println(s"hash:${byteToHexString(hash)}")(,hash)!=null=////=()(,("UTF-8"))(,().asBytes())}(streamNode=println(s"${}${byteToHexString(hashes())}"))}对于第一个StreamNodes,输出结果为
164248066b88fd35e9203cd469ffb4a532d216482dd1005af6d275607ff9eabe2c477fec41789154996bfa76055dea294725f0bb9ed0d20321fef7413e1942e21550
可以看到这里跟读取State时得到的operatorID是一致的。也可以确认operatorID:77fec41789154996bfa76055dea29472的statesize=2114,对应的就是map(newCountFunction)。
对于第二个StreamNodes,输出结果为
1cbc357ccb763df2852fee8c4fc7d55f227df19f87deec5680128845fd9a6ca18d49dd63673dd41ea021b896d5203f3ba7c51a936cb48657826a536f331e9fb33b5e
跟这里的日志一致。
注:搜cbc357ccb763df2852fee8c4fc7d55f2,大部分都是关于flink的结果
事实上,启动StreamWithStateSample的日志里也记录了非uid的hash值(对于uid的hash值,flink默认不会记录到日志)
'd216482dd1005af6d275607ff9eabe2c'fornode'Map-2'{id:2,parallelism:1,userfunction:$anon$4}'f0bb9ed0d20321fef7413e1942e21550'fornode'Sink:'{id:5,parallelism:1,userfunction:}到了这里,关于OperatorID报错的修复也就比较简单了,增加相同的uid即可。
//(newMockKafkaSource).print()(newMockKafkaSource).uid("source_uid").print()注:对比flink源码里的java实现,scala的这个例子虽然代码行数更少,但是可读性并不高。源码里的这段逻辑并不简单,相比之前State的代码实现要优雅很多
3.OperatorID应用3.1.Stateflink支持有状态的计算,状态是跟每一个JobVertexID关联的,通过接口声明也可以说明这点。从state恢复时也依赖逐个遍历且映射每个OperatorID.
3.2.metrics比如我们通过RESTAPI查看例子里的job状态,可以看到这么一组vertices:
{"vertices":[{"id":"64248066b88fd35e9203cd469ffb4a53","name":"Source:CustomSource-Map",},{"id":"77fec41789154996bfa76055dea29472","name":"Map-Sink:",}]}也就是实际运行时节点chain优化为了两条:

metrics看到的节点都是每条chain的首节点,开启了Latencytracking后,也可以看到每个OperatorID之间的latency,形如「__subtask___subtask__p99」.
4.总结StreamGraph生成JobGraph的过程中,会为每个节点生成hashid,生成规则为:
如果指定了uid,则使用uidhash结果,如果确认state可以复用,就可以手动指定。
如果没有指定uid,则跟输入节点的个数以及输入节点是否发生变化,所在图节点的位置,以及可chain的出边个数有关。
JobGraphchain时,符合条件的节点被chain到首节点,相关监控都使用首节点id.当然全部节点id也都记录下来,用于latency这类metrics,state存储等。例如在latencymetrci的命名结构__subtask___subtask__p99」,其中XY都是对应的VertexID,如果能够使用uidname等可读性明显更高一些,最开始接触flink时在社区里也提过疑问,不过没有回应,后来看到LatencyMetricscopeshouldincludeoperatornames,flink的owner暂时不打算增加这个易用性的功能。
5.RefAddoperatornametolatencymetrics
LatencyMetricscopeshouldincludeoperatornames
latencymetrics里使用operatorname