天天看點

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

本文源碼基于flink1.14

上一篇文章分析了《flink的minibatch微批處理》的源碼

乘熱打鐵分析一下兩階段聚合的源碼,因為使用兩階段要先開啟minibatch,至于為什麼後面會分析到

兩階段聚合的原理,還是簡單提一下

如下圖,當聚合發生熱點的時候,可以在聚合前,先進行一個本地的聚合,先減小資料量,後接正常的資料交換以後聚合,來達到一個解熱點的目的,

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

先來看下兩階段聚合的Calcite優化rule

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

 看下什麼情況會比對上

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

并且在onmatch方法中會判斷開啟了minibatch,以及二階段聚合的時候會調用

來看下具體邏輯match方法

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

整個兩階段聚合會将原來的一個StreamPhysicalGroupAggregate實體節點,轉換成一個

StreamPhysicalLocalGroupAggregate本地聚合節點 + StreamPhysicalGlobalGroupAggregate聚合節點

來看下這個新添加的StreamPhysicalLocalGroupAggregate本地聚合算子的計算邏輯是什麼樣子的

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

StreamExecLocalGroupAggragate就是StreamPhysicalLocalGroupAggregate本地聚合具體的ExecNode節點了

來看下具體的operator

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

看到這裡是不是看到了熟悉的 MapBundleOperator ,如果看過上一篇minibatch優化的就知道,兩階段送出也是使用的這個有界operator作為抽象

在了解一下這個MapBundleOperator

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

就是每來一條資料,都會調用傳入的fun的addInput方法

然後把每個key的結果put儲存在一個本地變量,就是個map<Rowdata,Rowdata>裡面

然後調用自己的trigger觸發器,當這條資料可以觸發觸發器就會調用finishBundle

這裡說到觸發器,回到初始化mapBundle的時候通過createMiniBatchTrigger建立的一個minibatch的觸發器,看看具體邏輯

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

其實就是一個普通的count觸發器,觸發條件就是直接使用的minibatch配置的size參數,  是以這裡知道了為什麼兩階段送出要先開minibatch了

先看下每來一條資料會觸發的addInput方法,在來看看攢一個批次後觸發的finishBundle

minibatch會包裝成一個MiniBatchLocalGroupAggFunction這個funtion的addInput來看看

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

就是來一條資料直接調用聚合函數的accumulate直接計算結果了,雖然計算結果但是還沒有往下遊發送

 來看下當攢一批後,集體是怎麼往下遊發送的 finishBundle 方法

Flink sql 之 兩階段聚合與 TwoStageOptimizedAggregateRule(源碼分析)

 結果都已經計算好了,攢一個批次還能幹嘛,就是把目前的計算結果往下遊發送呗

那整個二次聚合的優化就講完了

總結一下

sql會将agg拆成 localminiagg + agg

先在本地聚合localConbine一遍,再往下遊發送

下遊就正常聚合,優化了熱點的問題

繼續閱讀