天天看點

flink - accumulator

jobmanager

在job finish的時候會彙總accumulator的值,

flink - accumulator
flink - accumulator

在client請求accumulation時,

flink - accumulator
flink - accumulator

消息傳到job manager

flink - accumulator
flink - accumulator

executegraph

擷取accumulator的值

flink - accumulator
flink - accumulator

execution的accumulator聚合,

flink - accumulator
flink - accumulator

具體merge的邏輯,

flink - accumulator
flink - accumulator

收到task發來的heartbeat,其中附帶accumulators

根據jobid,更新到executiongraph

flink - accumulator
flink - accumulator

根據executionattemptid, 更新execution中

flink - accumulator
flink - accumulator

對于execution,隻要狀态不是結束,就直接更新

flink - accumulator
flink - accumulator

再看taskmanager如何更新accumulator,并發送heartbeat,

flink - accumulator
flink - accumulator

可以看到會把每個running task的accumulators放到accumulatorevents,然後通過heartbeat消息發出

而task的accumlators是通過,task.getaccumulatorregistry.getsnapshot得到

flink - accumulator
flink - accumulator

snapshot的邏輯也很簡單,

flink - accumulator
flink - accumulator

最後,我們如何将統計資料累加到accumulator上的?

直接看看flink内部的accumulator是如何更新的,都是通過這個reporter來更新的

flink - accumulator
flink - accumulator

何處調用到這個report的接口,

對于in, 在反序列化到record的時候會統計bytesin和recordsin

flink - accumulator
flink - accumulator

是以對于out,反之則序列化的時候寫入

flink - accumulator
flink - accumulator

繼續閱讀