本文介紹了 SmartNews 利用 Flink 加速 Hive 日表的生産,将 Flink 無縫地內建到以 Airflow 和 Hive 為主的批處理系統的實踐。詳細介紹過程中遇到的技術挑戰和應對方案,以供社群分享。主要内容為:
- 項目背景
- 問題的定義
- 項目的目标
- 技術選型
- 技術挑戰
- 整體方案及挑戰應對
- 項目成果和展望
- 後記
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
一、項目背景
SmartNews 是一家機器學習驅動的網際網路公司。自 2012 年于日本東京成立,并在美國和中國設有辦公室。經過 8 年多的發展,SmartNews 已經成長為日本排名第一,美國成長最快的新聞類應用,覆寫全球超過 150 多個國家市場。據 2019 年初統計,SmartNews 的 iOS 和 Android 版本全球累計下載下傳量已經超過 5000 萬次。
SmartNews 在過去 9 年的時間,基于 Airflow, Hive, EMR 等技術棧建構了大量的資料集。随着資料量的增長,這些離線表的處理時間在逐漸拉長。另外,随着業務方疊代節奏的加快,對表的實時性也提出了更高的要求。是以,SmartNews 内部發起了 Speedy Batch 的項目,以加快現有離線表生産效率。
本次分享便是 Speedy Batch 項目中的一個例子,加速使用者行為 (actions) 表的實踐。
APP 端上報的使用者行為日志,每日通過 Hive 作業生成日表,這個表是許多其他表的源頭,至關重要。這個作業需要運作 3 個小時,進而拉高了許多下遊表的延遲 (Latency),明顯影響資料科學家、産品經理等使用者的使用體驗。是以我們需要對這些作業進行提速,讓各個表能更早可用。
公司業務基本上都在公有雲上,伺服器的原始日志以檔案形式上傳至雲存儲,按日分區;目前的作業用 Airflow 排程到 EMR 上運作,生成 Hive 日表,資料存儲在雲存儲。
二、問題的定義
1. 輸入
新聞伺服器每隔 30 秒上傳一個原始日志檔案,檔案上傳至相應日期和小時的雲存儲目錄。
2. 輸出
原始日志經過 ETL 處理之後,按日 (dt) 和行為 (action) 兩級分區輸出。action 種類約 300 個,不固定,常有增減。
3. 使用者
對這個表的使用是廣泛的,多途徑的。有從 Hive 裡查詢,也有從 Presto,Jupyter 和 Spark 裡查詢,我們甚至不能确定以上就是全部的通路途徑。
三、項目的目标
- 将 actions 表的時延從 3 小時縮短至 30 分鐘;
- 對下遊使用者保持透明。透明又分兩個方面:
- 功能方面:使用者無需修改任何代碼,做到完全無感
- 性能方面:新項目産生的表,不應該導緻下遊讀取時的性能下降
四、技術選型
在本項目之前,同僚已經對該作業做了多輪次改進,效果不是很顯著。
嘗試過的方案包括增加資源,投入更多的機器,但遇到了雲存儲的 IOPS 限制:每個 prefix 最多支援 3000 個并發讀寫,這個問題在輸出階段尤為明顯,即多個 reducer 同時向同一個 action 子目錄輸出的時候,容易碰到這個限制。另外還嘗試了按小時預處理,然後到每日淩晨再合并成日表,但合并過程亦耗時較多,整體時延還是在 2.5 小時左右,效果不夠顯著。
鑒于伺服器端的日志是近實時上傳至雲存儲,團隊提出了流式處理的思路,摒棄了批作業等待一天、處理 3 小時的模式,而是把計算分散在一整天,進而降低當天結束後的處理用時。團隊對 Flink 有比較好的背景,加上 Flink 近期對 Hive 的改進較多,是以決定采用基于 Flink 的方案。
五、技術挑戰
挑戰是多方面的。
1. 輸出 RC 檔案格式
目前 Hive 表的檔案格式為 RCFile,為了保證對使用者的透明,我們隻能在現有的 Hive 表上做 in-place 的 upgrade,也就是我們得重用目前表,那麼 Flink 輸出的檔案格式也得符合 RCFile 格式,因為一張 Hive 表隻能有一個格式。
RCFile 屬于 bulk format (相對應的是 row format),在每次 checkpoint 時必須一次性輸出。如果我們選擇 5 分鐘一次 checkpoint,那麼每個 action 每 5 分鐘必須輸出一個檔案,這會大量增加結果檔案數,進而影響下遊的讀取性能。特别是對于低頻 action,檔案數會上百倍的增加。我們了解了 Flink 的檔案合并功能,但那是在一個 checkpoint 内多個 sink 資料的合并,這并不能解決我們的問題,我們需要的是跨 checkpoint 的檔案合并。
團隊考慮過以 row format (e.g. CSV) 輸出,然後實作自定義的 Hive SerDe,使之相容 RCFile 和 CSV。但很快我們放棄了這個設想,因為那樣的話,需要為每個查詢場景實作這個 Hybrid 的 SerDe,例如需要為 Presto 實作,為 Spark 實作,等等。
- 一方面我們沒法投入這麼多資源;
-
另一方面那種方案也是使用者有感的,畢竟使用者還是需要安裝這個自定義的 SerDe。
我們之前提出了生成一個新格式的表,但也因為對使用者不夠透明而被否決。
2. Partition 的可感覺性和完整性
如何讓下遊作業能感覺到當天這個 partition 已經 ready?actions 表分兩級 partition, dt 和 action。action 屬于 Hive 的 dynamic partition,數量多且不固定。目前 Airflow 下遊作業是等待 insert_actions 這個 Hive 任務完成後,再開始執行的。這個沒問題,因為 insert_actions 結束時,所有 action 的 partition 都已經 ready 了。但對于 Flink 作業來說,沒有結束的信号,它隻能往 Hive 裡面送出一個個的 partition,如 dt=2021-05-29/action=refresh。因為 action 數量多,送出 partition 的過程可能持續數分鐘,是以我們也不能讓 Airflow 作業去感覺 dt 級别的 partition,那樣很可能在隻有部分 action 的情況下觸發下遊。
3. 流式讀取雲存儲檔案
項目的輸入是不斷上傳的雲存儲檔案,并非來自 MQ (message queue)。Flink 支援 FileStreamingSource,可以流式的讀入檔案,但那是基于定時 list 目錄以發現新的檔案。但這個方案不适合我們的場景,因為我們的目錄太大,雲存儲 list 操作根本無法完成。
4. Exactly Once 保證
鑒于 actions 表的重要性,使用者無法接受任何的資料丢失或者重複,是以整個方案需要保證恰好一次的處理。
六、整體方案及挑戰應對
1. 輸出 RCFile 并且避免小檔案
我們最終選擇的方案是分兩步走,第一個 Flink 作業以 json (row format) 格式輸出,然後用另外一個 Flink 作業去做 Json 到 RC 格式的轉化。以此解決 Flink 不能愉快的輸出合适大小 RC 檔案的問題。
輸出 json 的中間結果,這樣我們可以通過 Rolling Policy 控制輸出檔案的大小,可以跨多個 checkpoint 攢成足夠大,或者時間足夠長,然後再輸出到雲存儲。這裡 Flink 其實利用的是雲存儲的 Multi Part Upload (MPU) 的功能,即每次 checkpoint Flink 也是把目前 checkpoint 攢下來的資料上傳至 雲存儲,但輸出的不是檔案,而是一個 part。最後當多個 part 達到大小或者時間要求,就可以調用雲存儲的接口将多個 part 合并成一個檔案,這個合并操作在雲存儲端完成,應用端無需再次讀取這個 part 到本地合并然後再上傳。而 Bulk format 均需要一次性全局處理,是以無法分段上傳然後合并,必須一次性全部上傳。
當第二個作業感覺到一個新的 json 檔案上傳後,加載它,轉化成 RCFile,然後上傳到最終的路徑。這個過程帶來的延遲較小,一個檔案可以控制在 10s 以内,這是可以接受的。
2. 優雅的感覺輸入檔案
輸入端,沒有采用 Flink 的 FileStreamingSource,而是采用雲存儲的 event notification 來感覺新檔案的産生,接受到這個通知後再主動去加載檔案。
3. Partition 的可感覺性和完整性
輸出端,我們輸出 dt 級别的 success file,來讓下遊可靠地感覺日表的 ready。我們實作自定義的 StreamingFileWriter,使之輸出 partitionCreated 和 partitionInactive 的信号,并且通過實作自定義的 PartitionCommitter,來基于上述信号判斷日表的結束。
其機制如下,每個雲存儲 writer 開始寫某個 action,會發出一個 partitionCreated 信号,當它結束時又發出 partitionInactive 信号。PartitionCommitter 判斷某一天之内是否所有的 partittion 都 inactive 了,如果是,則一天的資料都處理了,輸出 dt 級别的 success file,在 Airflow 通過感覺這個檔案來判斷 Flink 是否完成了日表的處理。
4. Exactly Once
雲存儲的 event notification 提供 At Least once 保證。Flink 作業内對檔案級别進行去重,作業采用 Exactly Once 的 checkpoint 設定,雲存儲檔案輸出基于 MPU 機制等價于支援 truncate,是以雲存儲輸出等價于幂等,是以等價于端到端的 Exactly Once。
七、項目成果和展望
項目已經上線,時延維持在 34 分鐘上下,其中包括 15 分鐘的等待遲到檔案。
- 第一個 Flink 作業需要 8 分鐘左右完成 checkpoint 和輸出,json 轉 rc 作業需要 12 分鐘完成全部處理。我們可以把這個時間繼續壓縮,但是綜合時效性和成本,我們選擇目前的狀态。
- json 轉 rc 作業耗時比當初的預想的要大,因為上遊作業最後一個 checkpoint 輸出太多的檔案,導緻整體耗時長,這個可以通過增加作業的并發度線性的下降。
- 輸出的檔案數比批作業輸出的檔案數有所增加,增加 50% 左右。這是流式處理于批處理的劣勢,流式處理需要在時間到達時就輸出一個檔案,而此時檔案大小未必達到預期。好在這個程度的檔案數增加不明顯影響下遊的性能。
- 做到了下遊的完全透明,整個上線前後,沒有收到任何使用者異常回報。
該項目讓我們在生産環境驗證了利用流式處理架構 Flink 來無縫介入批處理系統,實作使用者無感的局部改進。将來我們将利用同樣的技術,去加速更多其他的 Hive 表的生産,并且廣泛提供更細粒度 Hive 表示的生産,例如小時級。另一方面,我們将探索利用 data lake 來管理批流一體的資料,實作技術棧的逐漸收斂。
八、後記
由于采用完全不同的計算架構,且需要與批處理系統完全保持一緻,團隊踩過不少的坑,限于篇幅,無法一一列舉。是以我們挑選幾個有代表的問題留給讀者思考:
- 為了驗證新作業産出的結果與原來 Hive 産出一緻,我們需要對比兩者的輸出。那麼,如何才能高效的比較兩個 Hive 表的一緻性呢?特别是每天有百億級資料,每條有數百個字段,當然也包含複雜類型 (array, map, array等)。
- 兩個 Flink 作業的 checkpoint 模式都必須是 Exactly Once 嗎?哪個可以不是,哪個必須是?
- StreamFileWriter 隻有在 checkpoint 時才接受到 partitionCreated 和 partitionInactive 信号,那麼我們可以在它的 snapshotState() 函數裡面輸出給下遊 (下遊會儲存到 state) 嗎?
- 最後一問:你們有更好的方案可供我們參考嗎?
更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
第一時間擷取最新技術文章和社群動态,請關注公衆号~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc