天天看點

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

(一)流平台通用架構

目前流平台通用的架構一般來說包括消息隊列、計算引擎和存儲三部分,通用架構如下圖所示。用戶端或者 web 的 log 日志會被采集到消息隊列;計算引擎實時計算消息隊列的資料;實時計算結果以 Append 或者 Update 的形式存放到實時存儲系統中去。

目前,我們常用的消息隊列是 Kafka,計算引擎一開始我們采用的是 Spark Streaming,随着 Flink 在流計算引擎的優勢越來越明顯,我們最終确定了 Flink 作為我們統一的實時計算引擎。

(二)為什麼選 Kafka?

Kafka 是一個比較早的消息隊列,但是它是一個非常穩定的消息隊列,有着衆多的使用者群體,網易也是其中之一。我們考慮 Kafka 作為我們消息中間件的主要原因如下:

高吞吐,低延遲:每秒幾十萬 QPS 且毫秒級延遲;

高并發:支援數千用戶端同時讀寫;

容錯性,可高性:支援資料備份,允許節點丢失;

可擴充性:支援熱擴充,不會影響目前線上業務。

(三)為什麼選擇 Flink?

Apache Flink 是近年來越來越流行的一款開源大資料流式計算引擎,它同時支援了批處理和流處理,考慮 Flink 作為我們流式計算引擎的主要因素是:

高吞吐,低延遲,高性能;

高度靈活的流式視窗;

狀态計算的 Exactly-once 語義;

輕量級的容錯機制;

支援 EventTime 及亂序事件;

流批統一引擎。

(四)Kafka + Flink 流計算體系

基于 Kafka 和 Flink 的在消息中間件以及流式計算方面的耀眼表現,于是産生了圍繞 Kafka 及 Flink 為基礎的流計算平台體系,如下圖所示:基于 APP、web 等方式将實時産生的日志采集到 Kafka,然後交由 Flink 來進行常見的 ETL,全局聚合以及Window 聚合等實時計算。

(五)網易雲音樂使用 Kafka 的現狀

目前我們有 10+個 Kafka 叢集,各個叢集的主要任務不同,有些作為業務叢集,有些作為鏡像叢集,有些作為計算叢集等。目前 Kafka 叢集的總節點數達到 200+,單 Kafka 峰值 QPS 400W+。目前,網易雲音樂基于 Kafka+Flink 的實時任務達到了 500+。

基于以上情況,我們想要對 Kafka+Flink 做一個平台化的開發,減少使用者的開發成本和運維成本。實際上在 2018 年的時候我們就開始基于 Flink 做一個實時計算平台,Kafka 在其中發揮着重要作用,今年,為了讓使用者更加友善、更加容易的去使用 Flink 和 Kafka,我們進行了重構。

基于 Flink 1.0 版本我們做了一個 Magina 版本的重構,在 API 層次我們提供了 Magina SQL 和 Magina SDK 貫穿 DataStream 和 SQL 操作;然後通過自定義 Magina SQL Parser 會把這些 SQL 轉換成 Logical Plan,在将 LogicalPlan 轉化為實體執行代碼,在這過程中會去通過 catalog 連接配接中繼資料管理中心去擷取一些中繼資料的資訊。我們在 Kafka 的使用過程中,會将 Kafka 中繼資料資訊登記到中繼資料中心,對實時資料的通路都是以流表的形式。在 Magina 中我們對 Kafka 的使用主要做了三部分的工作:

叢集 catalog 化;

Topic 流表化;

Message Schema 化。

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

使用者可以在中繼資料管理中心登記不同的表資訊或者 catalog 資訊等,也可以在 DB 中建立和維護 Kafka 的表,使用者在使用的過程隻需要根據個人需求使用相應的表即可。下圖是對 Kafka 流表的主要引用邏輯。

(一)在解決問題中發展

Kafka 在實時數倉使用的過程中,我們遇到了不同的問題,中間也嘗試了不同的解決辦法。

在平台初期, 最開始用于實時計算的隻有兩個叢集,且有一個采集叢集,單 Topic 資料量非常大;不同的實時任務都會消費同一個大資料量的 Topic,Kafka 叢集 IO 壓力異常大;

是以,在使用的過程發現 Kafka 的壓力異常大,經常出現延遲、I/O 飙升。

我們想到把大的 Topic 進行實時分發來解決上面的問題,基于 Flink 1.5 設計了如下圖所示的資料分發的程式,也就是實時數倉的雛形。基于這種将大的 Topic 分發成小的 Topic 的方法,大大減輕了叢集的壓力,提升了性能,另外,最初使用的是靜态的分發規則,後期需要添加規則的時候要進行任務的重新開機,對業務影響比較大,之後我們考慮了使用動态規則來完成資料分發的任務。

解決了平台初期遇到的問題之後,在平台進階過程中 Kafka 又面臨新的問題:

雖然進行了叢集的擴充,但是任務量也在增加,Kafka 叢集壓力仍然不斷上升;

叢集壓力上升有時候出現 I/O 相關問題,消費任務之間容易互相影響;

使用者消費不同的 Topic 過程沒有中間資料的落地,容易造成重複消費;

任務遷移 Kafka 困難。

針對以上問題,我們進行了如下圖所示的 Kafka 叢集隔離和資料分層處理。其過程簡單來說,将叢集分成 DS 叢集、日志采集叢集、分發叢集,資料通過分發服務分發到 Flink 進行處理,然後通過資料清洗進入到 DW 叢集,同時在 DW 寫的過程中會同步到鏡像叢集,在這個過程中也會利用 Flink 進行實時計算的統計和拼接,并将生成的 ADS 資料寫入線上 ADS 叢集和統計 ADS 叢集。通過上面的過程,確定了對實時計算要求比較高的任務不會受到統計報表的影響。

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

通過上面的過程,確定了對實時計算要求比較高的任務不會受到統計報表的影響。但是我們分發了不同的叢集以後就不可避免的面臨新的問題:

如何感覺 Kafka 叢集狀态?

如何快速分析 Job 消費異常?

針對上面兩個問題,我們做了一個 Kafka 監控系統,其監控分為如下兩個次元,這樣在出現異常的時候就可以進行具體判斷出現問題的詳細情況:

叢集概況的監控:可以看到不同叢集對應的 Topic 數量以及運作任務數量,以及每個 Topic消費任務資料量、資料流入量、流入總量和平均每條資料大小;

名額監控:可以看到 Flink 任務以及對應的Topic、GroupID、所屬叢集、啟動時間、輸入帶寬、InTPS、OutTPS、消費延遲以及 Lag 情況。

(二)Flink + Kafka 在 Lambda 架構下的運用

流批統一是目前非常火的概念,很多公司也在考慮這方面的應用,目前常用的架構要麼是 Lambda 架構,要麼是 Kappa 架構。對于流批統一來講需要考慮的包括存儲統一和計算引擎統一,由于我們目前基建沒有統一的存儲,那麼我們隻能選擇了 Lamda 架構。

下圖是基于 Flink 和 Kafka 的 Lambda 架構在雲音樂的具體實踐,上層是實時計算,下層是離線計算,橫向是按計算引擎來分,縱向是按實時數倉來區分。

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

在具體的應用過程中,我們也遇到了很多問題,最主要的兩個問題是:

多 Sink 下 Kafka Source 重複消費問題;

同交換機流量激增消費計算延遲問題。

(一)多 Sink 下 Kafka Source 重複消費問題

Magina 平台上支援多 Sink,也就是說在操作的過程中可以将中間的任意結果插入到不同的存儲中。這個過程中就會出現一個問題,比如同一個中間結果,我們把不同的部分插入到不同的存儲中,那麼就會有多條 DAG,雖然都是臨時結果,但是也會造成 Kafka Source 的重複消費,對性能和資源造成極大的浪費。

于是我們想,是否可以避免臨時中間結果的多次消費。在 1.9 版本之前,我們進行了 StreamGraph 的重建,将三個 DataSource 的 DAG 進行了合并;在 1.9 版本,Magina 自己也提供了一個查詢和 Source 合并的優化;但是我們發現如果是在同一個 data update 中有對同一個表的多個 Source 的引用,它自己會合并,但是如果不是在同一個 data update 中,是不會立即合并的,于是在 1.9 版本之後中我們對 modifyOperations 做了一個 buffer 來解決這個問題。

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

(二)同交換機流量激增消費計算延遲問題

這個問題是最近才出現的問題,也可能不僅僅是同交換機,同機房的情況也可能。在同一個交換機下我們部署了很多機器,一部分機器部署了 Kafka 叢集,還有一部分部署了 Hadoop 叢集。在 Hadoop 上面我們可能會進行 Spark、Hive 的離線計算以及 Flink 的實時計算,Flink 也會消費 Kafka 進行實時計算。在運作的過程中我們發現某一個任務會出現整體延遲的情況,排查過後沒有發現其他的異常,除了交換機在某一個時間點的浏覽激增,進一步排查發現是離線計算的浏覽激增,又因為同一個交換機的帶寬限制,影響到了 Flink 的實時計算。

網易雲音樂基于 Flink + Kafka 的實時數倉建設實踐

為解決這個問題,我們就考慮要避免離線叢集和實時叢集的互相影響,去做交換機部署或者機器部署的優化,比如離線叢集單獨使用一個交換機,Kafka 和 Flink 叢集也單獨使用一個交換機,從硬體層面保證兩者之間不會互相影響。

Q1:Kafka 在實時數倉中的資料可靠嗎?

A1:這個問題的答案更多取決于對資料準确性的定義,不同的标準可能得到不同的答案。自己首先要定義好資料在什麼情況下是可靠的,另外要在處理過程中有一個很好的容錯機制。

Q2:我們在學習的時候如何去學習這些企業中遇到的問題?如何去積累這些問題?

A2:個人認為學習的過程是問題推動,遇到了問題去思考解決它,在解決的過程中去積累經驗和自己的不足之處。

Q3:你們在處理 Kafka 的過程中,異常的資料怎麼處理,有檢測機制嗎?

A3:在運作的過程中我們有一個分發的服務,在分發的過程中我們會根據一定的規則來檢測哪些資料是異常的,哪些是正常的,然後将異常的資料單獨分發到一個異常的 Topic 中去做查詢等,後期使用者在使用的過程中可以根據相關名額和關鍵詞到異常的 Topic 中去檢視這些資料。

感謝大家看到這裡,文章有不足,歡迎大家指出