jobmanager
在job finish的時候會彙總accumulator的值,
在client請求accumulation時,
消息傳到job manager
executegraph
擷取accumulator的值
execution的accumulator聚合,
具體merge的邏輯,
收到task發來的heartbeat,其中附帶accumulators
根據jobid,更新到executiongraph
根據executionattemptid, 更新execution中
對于execution,隻要狀态不是結束,就直接更新
再看taskmanager如何更新accumulator,并發送heartbeat,
可以看到會把每個running task的accumulators放到accumulatorevents,然後通過heartbeat消息發出
而task的accumlators是通過,task.getaccumulatorregistry.getsnapshot得到
snapshot的邏輯也很簡單,
最後,我們如何将統計資料累加到accumulator上的?
直接看看flink内部的accumulator是如何更新的,都是通過這個reporter來更新的
何處調用到這個report的接口,
對于in, 在反序列化到record的時候會統計bytesin和recordsin
是以對于out,反之則序列化的時候寫入