天天看點

distinct去重多個字段_Flink去重第二彈:SQL去重

在Flink去重第一彈:MapState去重中介紹了使用編碼方式完成去重,但是這種方式開發周期比較長,我們可能需要針對不同的業務邏輯實作不同的編碼,對于業務開發來說也需要熟悉Flink編碼,也會增加相應的成本,我們更多希望能夠以sql的方式提供給業務開發完成自己的去重邏輯。本篇介紹如何使用sql方式完成去重。

為了與離線分析保持一緻的分析語義,Flink SQL 中提供了distinct去重方式,使用方式:

SELECT 
           

表示了簡單的對裝置ID進行去重,得到一個明細結果,那麼我們在使用distinct來統計去重結果通常有兩種方式, 仍然以統計每日網站uv為例:

第一種方式
SELECT datatime,count(DISTINCT devId) FROM pv group by datatime
           

該語義表示計算網頁每日的uv數量,其内部核心實作主要依靠DistinctAccumulator與CountAccumulator,DistinctAccumulator 内部包含一個map結構,key 表示的是distinct的字段,value表示重複的計數,CountAccumulator就是一個計數器的作用,這兩部分都是作為動态生成聚合函數的中間結果accumulator,透過之前的聚合函數的分析可知中間結果是存儲在狀态裡面的,也就是容錯并且具有一緻性語義的

其處理流程是:

1. 将devId 添加到對應的DistinctAccumulator對象中,首先會判斷map中是否存在該devId, 不存在則插入map中并且将對應value記1,并且傳回True;存在則将對應的value+1更新到map中,并且傳回False

2. 隻有當傳回True時才會對CountAccumulator做累加1的操作,以此達到計數目的

第二種方式
select count(*) ,datatime from (
                    select distinct devId,datatime from pv ) a
                        group by datatime
           

内部是一個對devId,datatime 進行distinct的計算,在flink内部會轉換為以devId,datatime進行分組的流并且進行聚合操作,在内部會動态生成一個聚合函數,該聚合函數createAccumulators方法生成的是一個Row(0) 的accumulator 對象,其accumulate方法是一個空實作,也就是該聚合函數每次聚合之後傳回的結果都是Row(0),通過之前對sql中聚合函數的分析(可檢視GroupAggProcessFunction函數源碼), 如果聚合函數處理前後得到的值相同那麼可能會不發送該條結果也可能發送一條撤回一條新增的結果,但是其最終的效果是不會影響下遊計算的,在這裡我們簡單了解為在處理相同的devId,datatime不會向下遊發送資料即可,也就是每一對devId,datatime隻會向下遊發送一次資料;

外部就是一個簡單的按照時間次元的計數計算,由于内部每一組devId,datatime 隻會發送一次資料到外部,那麼外部對應datatime次元的每一個devId都是唯一的一次計數,得到的結果就是我們需要的去重計數結果。

兩種方式對比

1. 這兩種方式最終都能得到相同的結果,但是經過分析其在内部實作上差異還是比較大,第一種在分組上選擇datatime ,内部使用的累加器DistinctAccumulator 每一個datatime都會與之對應一個對象,在該次元上所有的裝置id, 都會存儲在該累加器對象的map中,而第二中選擇首先細化分組,使用datatime+devId分開存儲,然後外部使用時間次元進行計數,簡單歸納就是:

第一種: datatime->Value{devI1,devId2....}

第二種: datatime+devId->row(0)

聚合函數中accumulator 是存儲在ValueState中的,第二種方式的key會比第一種方式數量上多很多,但是其ValueState占用空間卻小很多,而在實際中我們通常會選擇Rocksdb方式作為狀态後端,rocksdb中value大小是有上限的,第一種方式很容易到達上限,那麼使用第二種方式會更加合适;

2. 這兩種方式都是全量儲存裝置資料的,會消耗很大的存儲空間,但是我們的計算通常是帶有時間屬性的,那麼可以通過配置StreamQueryConfig設定狀态ttl。

以上就是關于distinct實作去重計數的實作分析,如有不對之處還望指正。

distinct去重多個字段_Flink去重第二彈:SQL去重