作者:董亭亭、徐明
摘要:作為短視訊分享跟直播的平台,快手有諸多業務場景應用了 Flink,包括短視訊、直播的品質監控、使用者增長分析、實時資料處理、直播 CDN 排程等。此次主要介紹在快手使用 Flink 在實時多元分析場景的應用與優化。主要内容包括:Tips:點選下方連結可檢視作者原版PPT及分享視訊~ https://ververica.cn/developers/flink-forward-asia-2019/
- Flink 在快手應用場景及規模
- 快手實時多元分析平台
- SlimBase-更省 IO、嵌入式共享 state 存儲
首先看 Flink 在快手的應用場景和規模。
1. 快手應用場景
快手計算鍊路是從 DB/Binlog 以及 WebService Log 實時入到 Kafka 中,然後接入 Flink 做實時計算,其中包括實時數倉、實時分析以及實時訓練,最後的結果存到 Druid、Kudu、HBase 或者 ClickHouse 裡面;同時 Kafka 資料實時 Dump 一份到 Hadoop 叢集,然後通過 Hive、MapReduce 或者 Spark 來做離線計算;最終實時計算和離線計算的結果資料會用内部自研 BI 工具 KwaiBI 來展現出來。
Flink 在快手典型的應用場景主要分為三大類:
- 80% 統計監控:實時統計,包括各項資料的名額,監控項報警,用于輔助業務進行實時分析和監控;
- 15% 資料處理:對資料的清洗、拆分、Join 等邏輯處理,例如大 Topic 的資料拆分、清洗;
- 5% 資料處理:實時業務處理,針對特定業務邏輯的實時處理,例如實時排程。
Flink 在快手應用的典型場景案例包括:
- 快手是分享短視訊跟直播的平台,快手短視訊、直播的品質監控是通過 Flink 進行實時統計,比如直播觀衆端、主播端的播放量、卡頓率、開播失敗率等跟直播品質相關的多種監控名額;
- 使用者增長分析,實時統計各投放管道拉新情況,根據效果實時調整各管道的投放量;
- 實時資料處理,廣告展現流、點選流實時 Join,用戶端日志的拆分等;
- 直播 CDN 排程,實時監控各 CDN 廠商品質,通過 Flink 實時訓練調整各個 CDN 廠商流量配比。
2. Flink 叢集規模
快手目前叢集規模有 1500 台左右,日處理條目數總共有3萬億,峰值處理條目數大約是 3億/s 左右。叢集部署都是 On Yarn 模式,實時叢集和離線叢集混合部署,通過 Yarn 标簽進行實體隔離,實時叢集是 Flink 專用叢集,針對隔離性、穩定性要求極高的業務部署。注:本文所涉及資料僅代表嘉賓分享時的資料。
此處重點和大家分享下快手的實時多元分析平台。
1. 快手實時多元分析場景
快手内部有這樣的應用場景,每天的資料量在百億級别,業務方需要在資料中任選五個以内的次元組合進行全維的模組化進而計算累計的 PV ( Page View 通路量 )、UV ( Unique Visitor 獨立訪客 )、新增或者留存等這樣的名額,然後名額的計算結果要實時進行圖形化報表展示供給業務分析人員進行分析。
2. 方案選型
現在社群已經有一些 OLAP 實時分析的工具,像 Druid 和 ClickHouse;目前快手采用的是 Flink+Kudu 的方案,在前期調研階段對這三種方案從計算能力、分組聚合能力、查詢并發以及查詢延遲四個方面結合實時多元查詢業務場景進行對比分析:
- 計算能力方面:多元查詢這種業務場景需要支援 Sum、Count 和 count distinct 等能力,而 Druid 社群版本不支援 count distinct,快手内部版本支援數值類型、但不支援字元類型的 count distinct;ClickHouse 本身全都支援這些計算能力;Flink 是一個實時計算引擎,這些能力也都具備。
- 分組聚合能力方面:Druid 的分組聚合能力一般,ClickHouse 和 Flink 都支援較強的分組聚合能力。
- 查詢并發方面:ClickHouse 的索引比較弱,不能支援較高的查詢并發,Druid 和 Flink 都支援較高的并發度,存儲系統 Kudu,它也支援強索引以及很高的并發。
- 查詢延遲方面:Druid 和 ClickHouse 都是在查詢時進行現計算,而 Flink+Kudu 方案,通過 Flink 實時計算後将名額結果直接存儲到 Kudu 中,查詢直接從 Kudu 中查詢結果而不需要進行計算,是以查詢延遲比較低。
采用 Flink+Kudu 的方案主要思想是借鑒了 Kylin 的思路,Kylin 可以指定很多元度和名額進行離線的預計算然後将預計算結果存儲到 Hbase 中;快手的方案是通過 Flink 實時計算名額,再實時地寫到 Kudu 裡面。
3. 方案設計
實時多元分析的整體的流程為:
- 使用者在快手自研的 BI 分析工具 KwaiBI 上配置 Cube 資料立方體模型,指定次元列和名額列以及基于名額做什麼樣的計算;
- 配置過程中選擇的資料表是經過處理過後存儲在實時數倉平台中的資料表;
- 然後根據配置的計算規則通過 Flink 任務進行模組化名額的預計算,結果存儲到 Kudu 中;
- 最後 KwaiBI 從 Kudu 中查詢資料進行實時看闆展示。
接下來詳細介紹一下實時多元分析的主要子產品。
■ 資料預處理
KwaiBI 配置次元模組化時選擇的資料表,是經過提前預處理的:
- 首先内部有一個元資訊系統,在元資訊系統中提供統一的 schema 服務,所有的資訊都被抽象為邏輯表;
- 例如 Kafka 的 topic、Redis、Hbase 表等中繼資料資訊都抽取成 schema 存儲起來;
- 快手 Kafka 的實體資料格式大部分是 Protobuf 和 Json 格式,schema 服務平台也支援将其映射為邏輯表;
- 使用者隻需要将邏輯表建好之後,就可以在實時數倉對資料進行清洗和過濾。
■ 模組化計算名額
資料預處理完成後,最重要的步驟是進行模組化名額計算,此處支援 Cube、GroupingSet 方式次元組合來計算小時或者天累計的 UV ( Unique Visitor )、新增和留存等名額,可以根據使用者配置按固定時間間隔定期輸出結果;次元聚合邏輯中,通過逐層降維計算的方式會讓 DAG 作業圖十分複雜,如上圖右上角模型所示;是以快手設計了兩層降維計算模型,分為全次元層和剩餘次元層,這樣既利用了全次元層的聚合結果又簡化了 DAG 作業圖。
以 UV 類名額計算舉例,兩個黃色虛線框分别對應兩層計算子產品:全維計算和降維計算。
- 全維計算分為兩個步驟,為避免資料傾斜問題,首先是次元打散預聚合,将相同的次元值先哈希打散一下。因為 UV 名額需要做到精确去重,是以采用 Bitmap 進行去重操作,每分鐘一個視窗計算出增量視窗内資料的 Bitmap 發送給第二步按次元全量聚合;在全量聚合中,将增量的 Bitmap 合并到全量 Bitmap 中最終得出準确的 UV 值。然而有人會有問題,針對使用者 id 這種的數值類型的可以采用此種方案,但是對于 deviceid 這種字元類型的資料應該如何處理?實際上在源頭,資料進行次元聚合之前,會通過字典服務将字元類型的變量轉換為唯一的 Long 類型值,進而通過 Bitmap 進行去重計算 UV。
- 降維計算中,通過全維計算得出的結果進行預聚合然後進行全量聚合,最終将結果進行輸出。
再重點介紹下,模組化名額計算中的幾個關鍵點。在模組化名額計算中,為了避免次元資料傾斜問題,通過預聚合 ( 相同次元 hash 打散 ) 和全量聚合 ( 相同次元打散後聚合 ) 兩種方式來解決。
為了解決 UV 精确去重問題,前文有提到,使用 Bitmap 進行精确去重,通過字典服務将 String 類型資料轉換成 Long 類型資料進而便于存儲到 Bitmap 中,因為統計 UV 要統計曆史的資料,比如說按天累計,随着時間的推移,Bitmap 會越來越大,在 Rocksdb 狀态存儲下,讀寫過大的 KV 會比較耗性能,是以内部自定義了一個 BitmapState,将 Bitmap 進行分塊存儲,一個 blockid 對應一個局部的 bitmap,這樣在 RocksDB 中存儲時,一個 KV 會比較小,更新的時候也隻需要根據 blockid 更新局部的 bitmap 就可以而不需要全量更新。
接下來,看新增類的名額計算,和剛剛 UV 的不同點是需要判斷是否為新增使用者,通過異步地通路外部的曆史使用者服務進行新增使用者判斷,再根據新增使用者流計算新增 UV,這塊計算邏輯和 UV 計算一緻。
然後,再來看留存類名額計算,與 UV 計算不同的時候,不僅需要當天的資料還需要前一天的曆史資料,這樣才能計算出留存率,内部實作的時候是采用雙 buffer state 存儲,在計算的時候将雙 buffer 資料相除就可以計算出留存率。
■ Kudu 存儲
最後經過上面的計算邏輯後,會将結果存儲到 Kudu 裡面,其本身具有低延遲随機讀寫以及快速列掃描等特點,很适合實時互動分析場景;在存儲方式上,首先對次元進行編碼,然後按時間+次元組合+次元值組合作為主鍵,最終按次元組合、次元值組合、時間進行分區,這樣有利于提高查詢的效率快速擷取到資料。
4. KwaiBI 展示
界面為配置 Cube 模型的截圖,配置一些列并指定類型,再通過一個 SQL 語句來描述名額計算的邏輯,最終結果也會通過 KwaiBI 展示出來。
接下來介紹一種比 RocksDB 更省 IO、嵌入式的共享 state 存儲引擎:SlimBase。
1. 面臨的挑戰
首先看一下 Flink 使用 RocksDB 遇到的問題,先闡述一下快手的應用場景、廣告展現點選流實時 Join 場景:打開快手 App 可能會收到廣告服務推薦的廣告視訊,使用者可能會點選展現的廣告視訊。
這樣的行為在後端會形成兩份資料流,一份是廣告展現日志,一份是用戶端點選日志。這兩份資料進行實時 Join,并将 Join 結果作為樣本資料用于模型訓練,訓練出的模型會被推送到線上的廣告服務。
該場景下展現以後20分鐘的點選被認為是有效點選,實時 Join 邏輯則是點選資料 Join 過去20分鐘内的展現。其中,展現流的資料量相對比較大,20分鐘資料在 1TB 以上。檢查點設定為五分鐘,Backend 選擇 RocksDB。
在這樣的場景下,面臨着磁盤 IO 開銷70%,其中50%開銷來自于 Compaction;在 Checkpoint 期間,磁盤 IO 開銷達到了100%,耗時在1~5分鐘,甚至會長于 Checkpoint 間隔,業務能明顯感覺到反壓。經過分析找出問題:
- 首先,在 Checkpoint 期間會産生四倍的大規模資料拷貝,即:從 RocksDB 中全量讀取出來然後以三副本形式寫入到 HDFS 中;
- 其次,對于大規模資料寫入,RocksDB 的預設 Level Compaction 會有嚴重的 IO 放大開銷。
2. 解決方案
由于出現上文闡述的問題,開始尋找解決方案,整體思路是在資料寫入時直接落地到共享存儲中,避免 Checkpoint 帶來的資料拷貝問題。手段是嘗試使用更省 IO 的 Compaction,例如使用 SizeTieredCompation 方式,或者利用時序資料的特點使用并改造 FIFOCompaction。綜合比較共享存儲、SizeTieredCompation、基于事件時間的 FIFOCompaction 以及技術棧四個方面得出共識:HBase 代替 RocksDB 方案。
- 共享存儲方面,HBase 支援, RocksDB 不支援
- SizeTieredCompation 方面,RocksDB 預設不支援,HBase 預設支援
- 基于事件時間下推的 FIFOCompaction 方面,RocksDB 不支援,但 HBase 開發起來比較簡單
- 技術棧方面,RocksDB 使用 C++,HBase 使用 java,HBase 改造起來更友善
但是 HBase 有些方面相比 RocksDB 較差:
- HBase 是一個依賴 zookeeper、包含 Master 和 RegionServer 的重量級分布式系統;而 RocksDB 僅是一個嵌入式的 Lib 庫,很輕量級。
- 在資源隔離方面,HBase 比較困難,記憶體和 cpu 被多個 Container 共享;而 RocksDB 比較容易,記憶體和 cpu 伴随 Container 天生隔離。
- 網絡開銷方面,因為 HBase 是分布式的,所有比嵌入式的 RocksDB 開銷要大很多。
綜合上面幾點原因,快手達成了第二個共識,将 HBase 瘦身,改造為嵌入式共享存儲系統。
3. 實作方案
接下來介紹一下将 HBase 改造成 SlimBase 的實作方案,主要是分為兩層:
- 一層是 SlimBase 本身,包含三層結構:Slim HBase、擴充卡以及接口層;
- 另一層是 SlimBaseStateBackend,主要包含 ListState、MapState、ValueState 和 ReduceState。
後面将從 HBase 瘦身、适配并實作操作接口以及實作 SlimBaseStateBackend 三個步驟分别進行詳細介紹。
■ HBase 瘦身
先講 HBase 瘦身,主要從減肥和增瘦兩個步驟,在減肥方面:
- 先對 HBase 進行減裁,去除 client、zookeeper 和 master,僅保留 RegionServer
- 再對 RegionServer 進行剪裁,去除 ZK Listener、Master Tracker、Rpc、WAL 和 MetaTable
- 僅保留 RegionServer 中的 Cache、Memstore、Compaction、Fluster 和 Fs
在增瘦方面:
- 将原來 Master 上用于清理 Hfile 的 HFileCleaner 遷移到 RegionServer 上
- RocksDB 支援讀放大寫的 merge 接口,但是 SlimBase 是不支援的,是以要實作 merge 的接口
接口層主要有以下三點實作:
- 仿照 RocksDB,邏輯視圖分為兩級:DB 和 ColumnFamily
- 支援一些基本的接口:put/get/delete/merge 和 snapshot
- 額外支援了 restore 接口,用于從 snapshot 中恢複
适配層主要有以下兩個概念:
- 一個 SlimBase 适配為 Hbase 的 namespace
- 一個 SlimBase 的 ColumnFamily 适配為 HBase 的 table
SlimBaseStateBackend 實作上主要展現在兩個方面:
- 一是多種 States 實作,支援多種資料結構,ListState、MapState、ValueState 和 ReduceState
- 二是改造 Snapshot 和 Restore 的流程,從下面的兩幅圖可以看出,SlimBase 在磁盤 IO 上節省了大量的資源,避免了多次的 IO 的問題。
4. 測試結論
上線對比測試後,得出測試結論:
- Checkpoint 和 Restore 的時延從分鐘級别降到秒級。
- 磁盤 IO 下降了66%
- 磁盤寫吞吐下降50%
- CPU 開銷下降了33%
5. 後期優化
目前用的 Compaction 政策是 SizeTieredCompaction,後期要實作基于 OldestUnexpiredTime 的 FiFOCompaction 政策,目标是做到無磁盤 IO 開銷。
FiFOCompaction 是一種基于 TTL 的無 IO 的 Compaction 政策;OldestUnexpiredTime 是指例如設定 OldestUnexpiredTime=t2,表示 t2 時刻前的資料全部過期,可以被 Compaction 清理,基于時間點的 FIFOCompaction 理論上可以做到無磁盤 IO 開銷。
後續還有四點優化,前三點是基于 HBase 的優化,最後是針對 HDFS 做的優化:
- SlimBase 使用 InMemoryCompaction,降低記憶體 Flush 和 Compaction 開銷
- SlimBase 支援 prefixBloomFilter,提高 Scan 性能
- SlimBase 支援短路讀
- HDFS 副本落盤改造:非本地副本使用 DirectIO 直接落盤,提高本地讀 pagecache 命中率;此條主要是在測試使用時發現單副本比多副本讀寫效率高這一問題
6. 未來規劃
從語言、存儲、壓縮政策、事件事件下推、垃圾回收、檢查點時間、重加載時間七個方面來看,SlimBase 都比 RocksDB 更适合快手實時計算任務的開發,未來的規劃是對 Slimbase 的性能做進一步優化,願景是将快手 Flink 上的所有業務場景全部用 SlimBase 替代掉 RocksDB。
作者介紹:
董亭亭,快手大資料架構團隊,實時計算引擎團隊負責人。目前負責 Flink 引擎在快手公司内的研發和應用實踐。2013 年畢業于大連理工大學,曾就職于奇虎360,58集團,接觸過的領域包括:分布式計算、排程、分布式存儲等。
徐明,快手大資料架構研發工程師。畢業于南開大學,目前在快手資料架構團隊,負責 HBase 引擎及周邊生态維護和研發。