天天看點

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Flink在快手應用場景

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

快手計算鍊路是從 DB/Binlog 以及 WebService Log 實時入到 Kafka 中,然後接入 Flink 做實時計算,其中包括實時 ETL、實時分析、Interval Join 以及實時訓練,最後的結果存到 Druid、ES 或者 HBase 裡面,後面接入一些資料應用産品;同時這一份 Kafka 資料實時 Dump 一份到 Hadoop 叢集,然後接入離線計算。

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Flink 在快手應用的類别主要分為三大類:

  • 80% 統計監控:實時統計,包括各項資料的名額,監控項報警,用于輔助業務進行實時分析和監控;
  • 15% 資料處理:對資料的清洗、拆分、Join 等邏輯處理,例如大 Topic 的資料拆分、清洗;
  • 5% 資料處理:實時業務處理,針對特定業務邏輯的實時處理,例如實時排程;
Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Flink在快手應用的典型場景包括:

  • 快手是分享短視訊跟直播的平台,快手短視訊、直播的品質監控是通過 Flink 進行實時統計,比如直播觀衆端、主播端的播放量、卡頓率、開播失敗率等跟直播品質相關的多種監控名額;
  • 使用者增長分析,實時統計各投放管道拉新情況,根據效果實時調整各管道的投放量;
  • 實時資料處理,廣告展現流、點選流實時 Join,用戶端日志的拆分等;
  • 直播 CDN 排程,實時監控各 CDN 廠商品質,通過 Flink 實時訓練調整各個 CDN 廠商流量配比;

Flink叢集規模

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

快手目前叢集規模有1500台左右,作業數量大約是500左右,日處理條目數總共有1.7萬億,峰值處理條目數大約是3.7千萬。叢集部署都是On Yarn模式,分為離線叢集和實時叢集兩類叢集,其中離線叢集混合部署,機器通過标簽進行實體隔離,實時叢集是 Flink 專用叢集,針對隔離性、穩定性要求極高的業務部署。

快手Flink技術演進

快手 Flink 技術演進主要分為三部分:

  • 基于特定場景優化,包括 Interval Join 場景優化;
  • 穩定性改進,包括資料源控速、JobManager 穩定性、作業頻繁失敗;
  • 平台建設;

場景優化

Interval Join應用場景

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Interval Join 在快手的一個應用場景是廣告展現點選流實時 Join 場景:打開快手 App 可能會收到廣告服務推薦的廣告視訊,使用者有時會點選展現的廣告視訊。這樣在後端形成兩份資料流,一份是廣告展現日志,一份是用戶端點選日志。這兩份資料需進行實時 Join,将 Join 結果作為樣本資料用于模型訓練,訓練出的模型會被推送到線上的廣告服務。

該場景下展現以後 20 分鐘的點選被認為是有效點選,實時 Join 邏輯則是點選資料 Join 過去 20 分鐘展現。其中,展現流的資料量相對比較大,20 分鐘資料在 1 TB 以上。最初實時 Join 過程是業務自己實作,通過 Redis 緩存廣告展現日志,Kafka 延遲消費用戶端點選日志實作 Join 邏輯,該方式缺點是實時性不高,并且随着業務增長需要堆積更多機器,運維成本非常高。基于 Flink 使用 Interval Join 完美契合此場景,并且實時性高,能夠實時輸出 Join 後的結果資料,對業務來說維護成本非常低,隻需要維護一個 Flink 作業即可。

Interval Join場景優化

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Interval Join原理:

Flink 實作Interval join的原理:兩條流資料緩存在内部State中,任意一資料到達,擷取對面流相應時間範圍資料,執行 joinFunction進行Join。随着時間的推進,State中兩條流相應時間範圍的資料會被清理。

在前面提到的廣告應用場景Join過去20分鐘資料,假設兩個流的資料完全有序到達,Stream A作為展現流緩存過去20分鐘資料,Stream B 作為點選流每來一條資料到對面Join過去20分鐘資料即可。

Flink 實作 Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB)

.between(Time.minutes(0),Time.minutes(20))
     .process(joinFunction)           

狀态存儲政策選擇

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

關于狀态存儲政策選擇,生産環境狀态存儲Backend有兩種方式:

  • FsStateBackend:State存儲在記憶體,Checkpoint時持久化到HDFS;
  • RocksDBStateBackend:State存儲在RocksDB 執行個體,可增量Checkpoint,适合超大State。在廣告場景下展現流20分鐘資料有1 TB以上,從節省記憶體等方面綜合考慮,快手最終選擇的是RocksDBStateBackend;

在 Interval join場景下,RocksDB狀态存儲方式是将兩個流的資料存在兩個Column Family裡,RowKey根據keyGroupId+joinKey+ts方式組織。

RocksDB通路性能問題

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Flink作業上線遇到的第一個問題是RocksDB通路性能問題,表現為:

  • 作業在運作一段時間之後出現反壓,吞吐下降;
  • 通過 Jstack 發現程式邏輯頻繁處于 RocksDB get 請求處;
  • 通過 Top 發現存在單線程 CPU 持續被打滿;

進一步對問題分析,發現:該場景下,Flink 内部基于 RocksDB State 狀态存儲時,擷取某個 Join key 值某段範圍的資料,是通過字首掃描的方式擷取某個 Join key 字首的 entries 集合,然後再判斷哪些資料在相應的時間範圍内。字首掃描的方式會導緻掃描大量的無效資料,掃描的資料大多緩存在 PageCache 中,在 Decode 資料判斷資料是否為 Delete 時,消耗大量 CPU。

以上圖場景為例,藍色部分為目标資料,紅色部分為上下邊界之外的資料,字首掃描時會過多掃描紅色部分無用資料,在對該大量無效資料做處理時,将單線程 CPU 消耗盡。

針對RocksDB通路性能優化

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

快手在Interval join該場景下對RocksDB的通路方式做了以下優化:

  • 在Intervaljoin 場景下,是可以精确的确定需通路的資料邊界範圍。是以用全 Key 範圍掃描代替字首掃描,精确拼出查詢上下邊界 Full Key 即 keyGroupId+joinKey+ts[lower,upper];
  • 範圍查詢 RocksDB ,可以更加精确 Seek 到上下邊界,避免無效資料掃描和校驗;

優化後的效果:P99 查詢時延性能提升 10 倍,即 nextKey 擷取 RocksDB 一條資料, P99 時延由 1000 毫秒到 100 毫秒以内。作業吞吐反壓問題進而得到解決。

RocksDB 磁盤壓力問題

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Flink 作業上線遇到的第二個問題是随着業務的增長, RocksDB 所在磁盤壓力即将達到上限,高峰時磁盤 util 達到 90%,寫吞吐在 150 MB/s。詳細分析發現,該問題是由以下幾個原因疊加導緻:

  • Flink機器選型為計算型,大記憶體、單塊HDD盤,在叢集規模不是很大的情況下,單個機器會有4-5個該作業Container,同時使用一塊 HDD盤;
  • RocksDB背景會頻繁進行Compaction有寫放大情況,同時Checkpoint也在寫磁盤;

針對RocksDB磁盤壓力,快手内部做了以下優化:

  • 針對RocksDB參數進行調優,目的是減少Compaction IO量。優化後IO總量有一半左右的下降;
  • 為更加友善的調整RocksDB參數,在 Flink 架構層新增 Large State RocksDB配置套餐。同時支援 RocksDBStateBackend 自定義配置各種RocksDB參數;

未來計劃,考慮将State用共享存儲的方式存儲,進一步做到減少IO總量,并且快速Checkpoint和恢複。

穩定性改進

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

首先介紹下視訊品質監控排程應用背景,有多個Kafka Topic存儲短視訊、直播相關品質日志,包括短視訊上傳/下載下傳、直播觀衆端日志,主播端上報日志等。Flink Job讀取相應Topic資料實時統計各類名額,包括播放量、卡頓率、黑屏率以及開播失敗率等。名額資料會存到 Druid提供後續相應的報警監控以及多元度的名額分析。同時還有一條流是進行直播CDN排程,也是通過Flink Job實時訓練、調整各 CDN廠商的流量配比。

以上Kafka Topic資料會同時落一份到Hadoop叢集,用于離線補資料。實時計算跟離線補資料的過程共用同一份Flink代碼,針對不同的資料源,分别讀 Kafka資料或HDFS資料。

資料源控速

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

視訊應用場景下遇到的問題是:作業 DAG 比較複雜,同時從多個 Topic 讀取資料。一旦作業異常,作業失敗從較早狀态恢複,需要讀取部分曆史資料。此時,不同 Source 并發讀取資料速度不可控,會導緻 Window 類算子 State 堆積、作業性能變差,最終導緻作業恢複失敗。另外,離線補資料,從不同 HDFS 檔案讀資料同樣會遇到讀取資料不可控問題。在此之前,實時場景下臨時解決辦法是重置 GroupID 丢棄曆史資料,使得從最新位置開始消費。

針對該問題我們希望從源頭控制多個 Source 并發讀取速度,是以設計了從 Source 源控速的政策。

Source 控速政策

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

Source 控速政策是 :

  • SourceTask共享速度狀态提供給JobManager;
  • JobManager引入SourceCoordinator,該Coordinator擁有全局速度視角,制定相應的政策,并将限速政策下發給 SourceTask;
  • SourceTask根據JobManager下發的速度調節資訊執行相應控速邏輯
  • 一個小細節是 DAG 圖有子圖的話, 不同子圖 Source 源之間互相不影響;

Source控速政策詳細細節

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

SourceTask共享狀态

  • SourceTask定期彙報狀态給JobManager,預設10s間隔;
  • 彙報内容為;

協調中心SourceCoordinator

  • 限速門檻值:最快并發Watermark - 最慢并發 Watermark > ∆t(預設5分鐘)。隻要在達到限速門檻值情況下,才進行限速政策制定;
  • 全局預測:各并發 targetWatermark=base+speed*time;Coordinator 先進行全局預測,預測各并發接下來時間間隔能運作到的 Watermark位置;
  • 全局決策:targetWatermark = 預測最慢 Watermark+∆t/2;Coordinator 根據全局預測結果,取預測最慢并發的 Watermark 值再浮動一個範圍作為下個周期全局限速決策的目标值;
  • 限速資訊下發:。将全局決策的資訊下發給所有的 Source task,限速資訊包括下一個目标的時間和目标的Watermark位置;

以上圖為例,A時刻,4個并發分别到達如圖所示位置,為A+interval的時刻做預測,圖中藍色虛線為預測各并發能夠到達的位置,選擇最慢的并發的Watermark位置,浮動範圍值為Watermark + ∆t/2 的時間,圖中鮮紅色虛線部分為限速的目标 Watermark,以此作為全局決策發給下遊Task。

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

SourceTask 限速控制

  • SourceTask 擷取到限速資訊後,進行限速控制;
  • 以 KafkaSource 為例,KafkaFetcher 擷取資料時,根據限速資訊 Check 目前進度,确定是否需要限速等待;

該方案中,還有一些其他考慮,例如:

  • 時間屬性:隻針對 EventTime 情況下進行限速執行;
  • 開關控制:支援作業開關控制是否開啟 Source 限速政策;
  • DAG 子圖 Source 源之間互相不影響;
  • 是否會影響 CheckPoint Barrier 下發;
  • 資料源發送速度不恒定,Watermark 突變情況;

Source 控速結果

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

拿線上作業,使用 Kafka 從最早位置(2 days ago)開始消費。如上圖,不限速情況下 State 持續增大,最終作業挂掉。使用限速政策後,最開始 State 有緩慢上升,但是 State 大小可控,最終能平穩追上最新資料,并 State 持續在 40 G 左右。

JobManager穩定性

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

關于JobManager穩定性,遇到了兩類Case,表現均為:JobManager在大并發作業場景WebUI卡頓明顯,作業排程會逾時。進一步分析了兩種場景下的問題原因:

  • 場景一,JobManager 記憶體壓力大問題。JobManager 需要控制删除已完成的 Checkpoint 在 HDFS 上的路徑。在 NameNode 壓力大時,Completed CheckPoint 路徑删除慢,導緻 CheckPoint Path 在記憶體中堆積。原來删除某一次 Checkpoint 路徑政策為:每删除目錄下一個檔案,需 List 該目錄判斷是否為空,如為空将目錄删除。在大的 Checkpoint 路徑下, List 目錄操作為代價較大的操作。針對該邏輯進行優化,删除檔案時直接調用 HDFS delete(path,false) 操作,語義保持一緻,并且開銷小;
  • 場景二,該Case發生在Yarn Cgroup功能上線之後,JobManager G1 GC過程變慢導緻阻塞應用線程。AppMaster申請CPU個數寫死為1,在上線Cgroup之後可用的CPU資源受到限制。解決該問題的方法為,支援AppMaster申請CPU個數參數化配置;

作業頻繁失敗

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

機器故障造成作業頻繁失敗,具體的場景也有兩種:

  • 場景一:磁盤問題導緻作業持續排程失敗。磁盤出問題導緻一些 Buffer 檔案找不到。又因為 TaskManager 不感覺磁盤健康狀況,會頻繁排程作業到該 TaskManager,作業頻繁失敗;
  • 場景二:某台機器有問題導緻 TaskManager 在某台機器上頻繁出 Core,陸續配置設定新的 TaskManager 到這台機器上,導緻作業頻繁失敗;

針對機器故障問題解決方法:

  • 針對磁盤問題,TaskManager 增加 DiskChecker 磁盤健康檢查,發現磁盤有問題 TaskManager 自動退出;
  • 針對有些機器頻繁出現 TaskManager 出現問題,根據一定的政策将有問題機器加到黑名單中,然後通過軟黑名單機制,告知 Yarn 盡量不要排程 Container 到該機器;

平台化建設

平台建設:

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

快手的平台化建設主要展現在青藤作業托管平台。通過該平台可進行作業操作、作業管理以及作業詳情檢視等。作業操作包括送出、停止作業。作業管理包括管理作業存活、性能報警,自動拉起配置等;詳情檢視,包括檢視作業的各類 Metric 等。

上圖為青藤作業托管平台的一些操作界面。

問題定位流程優化:

Flink在快手的應用實踐與技術演進之路Flink在快手應用場景Flink叢集規模快手Flink技術演進場景優化Interval Join應用場景Interval Join場景優化Interval Join原理:狀态存儲政策選擇RocksDB通路性能問題針對RocksDB通路性能優化RocksDB 磁盤壓力問題穩定性改進資料源控速JobManager穩定性作業頻繁失敗平台化建設平台建設:問題定位流程優化:未來計劃

我們也經常需要給業務分析作業性能問題,幫助業務 debug 一些問題,過程相對繁瑣。是以該部分我們也做了很多工作,盡量提供更多的資訊給業務,友善業務自主分析定位問題。

  • 将所有Metric入Druid,通過Superset可從各個次元分析作業各項名額;
  • 針對Flink的WebUI做了一些完善,支援Web實時列印jstack,Web DAG為各Vertex增加序号,Subtask資訊中增加各并發 SubtaskId;
  • 豐富異常資訊提示,針對機器當機等特定場景資訊進行明确提示;
  • 新增各種Metric;

未來計劃

未來規劃主要分為兩個部分:

  • 目前在建設的 Flink SQL 相關工作。因為 SQL 能夠減少使用者開發的成本,包括我們現在也在對接實時數倉的需求,是以 Flink SQL 是我們未來計劃的重要部分之一;
  • 我們希望進行一些資源上的優化。目前業務在提作業時存在需求資源及并發預估不準确的情況,可能會過多申請資源導緻資源浪費。另外如何提升整體叢集資源的使用率問題,也是接下來需要探索的問題;

備注:本文轉載自

https://www.infoq.cn/article/sEMcN3uK-3jk9EBCiUuS

繼續閱讀