作者 | 汪磊(網易雲音樂 / 資料平台開發專家)
整理 | 楊濤(Flink 社群志願者)
如何基于 Flink 的新 API 更新實時數倉架構?
背景介紹
網易雲音樂從 2018 年開始搭建實時計算平台,到目前為止已經發展至如下規模:
- 機器數量:130+
- 單 Kafka 峰值 QPS:400W+
- 線上運作任務數:500+
- 開發者:160+
- 業務覆寫:線上業務支援,實時報表統計,實時特征處理,實時索引支援
- 2020 年 Q1 任務數增長 100%,處于高速發展中
這是網易雲音樂實時數倉 18 年的版本,基于 Flink 1.7 版本開發,當時 Flink SQL 的整體架構也還不是很完善。我們使用了 Antlr (通用的程式設計語言解析器,它隻需編寫名為 G4 的文法檔案,即可自動生成解析的代碼,并且以統一的格式輸出,處理起來非常簡單。由于 G4 檔案是通過開發者自行定制的,是以由 Antlr 生成的代碼也更加簡潔和個性化)自定義了一些 DDL 完善了維表 Join 的文法。通過 Antlr 完成文法樹的解析以後,再通過 CodeGen(根據接口文檔生成代碼)技術去将整個 SQL 代碼生成一個 Jar 包,然後部署到 Flink 叢集上去。
此時還沒有統一的中繼資料管理系統。在 JAR 包任務的開發上, 我們也沒有任何架構的限制,平台也很難知道 JAR 的任務上下遊以及相關業務的重要性和優先級。這套架構我們跑了将近一年的時間,随着任務越來越多,我們發現了以下幾個問題:
重複的資料了解
由于沒有進行統一的中繼資料管理,每個任務的代碼裡面都需要預先定義 DDL 語句,然後再進行 Select 等業務邏輯的開發;消息的中繼資料不能複用,每個開發都需要進行重複的資料了解,需要了解資料從哪裡來、資料如何解析、資料的業務含義是什麼;整個過程需要多方溝通,整體還存在了解錯誤的風險;也缺乏統一的管理系統去查找自己想要的資料。
和官方版本越走越遠
由于早期版本很多 SQL 的文法都是我們自己自定義的,随着 Flink 本身版本的完善,文法和官方版本差别越來越大,功能完善性上也漸漸跟不上官方的版本,易用性自然也越來越差。如果你本身就是一名熟知 Flink SQL 的開發人員,可能還需要重新學習我們平台自己的文法,整體不是很統一,有些問題也很難在網際網路上找到相關的資料,隻能靠運維來解決。
任務運維問題
SQL 任務沒有統一的中繼資料管理、上下遊的資料源沒有統一的登記、JAR 包任務沒有統一的架構限制、平台方很難跟蹤整個平台資料流的走向,我們不知道平台上運作的幾百個任務分别是幹什麼的,哪些任務讀了哪個資料源?輸出了什麼資料源?任務的種類是什麼?是線上的,測試的,重要的還是不重要的。沒有這些資料的支撐,導緻整個運維工作非常局限。
網易雲音樂的業務發展非常快,資料量越來越大,線上庫和一些其它的庫變更十分頻繁,相關的實時任務也要跟着業務架構的調整,變更相關資料源的位址。此時我們就需要知道哪些任務用到了相關的資料源,如果平台沒有能力很快篩選出相關任務,整個流程處理起來就十分繁雜了。
首先,需要聯系平台所有的開發者确認是否有相關任務的資料源,整個流程非常浪費時間,而且還有可能産生疏漏;其次,假設出現平台流量激增,做運維工作時,如果我們不知道任務在幹什麼,自然也不能知道的任務的重要性,不知道哪些任務可以限流,哪些任務可以做暫時性的停止,哪些任務要重點保障。
實時數倉建設
帶着這些問題,我們開始進行新版本的建構工作。
- 在 Flink 1.9 版本以後,Flink 有了重大變化,重構了 Catalog 的 API,這和之前我們做的離線方向的工作有一定的契合。在離線的生态上,網易雲音樂有着一套非常完整的服務體系,打通中繼資料中心和 Spark SQL,可以通過 Spark SQL 連接配接中繼資料中心的中繼資料,進行異構資料源的聯邦查詢以及資料傳輸工作;
- 同樣基于 Flink 1.10,我們利用新的 Catalog 的 API 實作了一個中繼資料中心的 Catalog。将中繼資料中心作為 Flink SQL 的底層中繼資料元件,實作了 Kafka 到中繼資料中心任一資料源的實時的資料傳輸,以及 Redis、HBase、Kudu 等資料源的維表 JOIN 的實作;
- 除了純 SQL 的開發方式外,我們還提供了一套 SDK,讓使用者可以通過 SQL 加代碼混合使用的方式來實作自己的業務邏輯,提升整個 Flink API 的易用性,大大降低使用者的開發門檻,提升了平台對任務的管控能力;
- 有了統一的中繼資料的管理以及 SDK 的開發方式,血緣收集也變得水到渠成,有了上下遊資料的走向資訊,平台也很容易通過資料源的業務屬性來判斷任務的重要性。
中繼資料中心
不知道大家有沒有用過 Apache Atlas、Netflix 的 Metacat 等工具,網易雲音樂的中繼資料中心顧名思義就是一個中繼資料管理的程式,用于管理網易雲音樂所有資料源的中繼資料。你有可能在實際的開發中用到 Oracle、Kudu、Hive 等工具,也有可能是自研的分布式資料庫。如果沒有統一的中繼資料管理,我們很難知道我們有哪些資料,資料是如何流轉的,也很難快速找到自己想要的資料。
将它們統一管理的好處是,可以通過中繼資料中心快速找到自己想要的資料,了解資料表的連接配接資訊、schema 資訊,字段的業務含義,以及所有表的資料來源和走向。
我們的中繼資料中心系統有以下幾個特點:
1. 擴充性強:中繼資料中心系統理論上是可以管理所有的資料存儲中間件的,每個存儲中間件都可以通過插件的方式熱部署擴充上去,目前我們已經支援了雲音樂内部幾乎所有的存儲中間件;
2. 下推查詢:對于自身有中繼資料系統的存儲中間件,如剛剛提到的 Oracle、Kudu 、Hive 等,我們采用的是下推查詢的方式,直接去查詢它們的中繼資料的資料庫,擷取到相應的中繼資料資訊,這樣就不會存在中繼資料不一緻的問題;
3. Nest 中繼資料登記:對于像 Kafka、RocketMQ 這種自身并不存在中繼資料體系的,中繼資料中心内部有一個内嵌的中繼資料子產品 Nest,Nest 參考了 Hive 中繼資料的實作,使用者可以手動登記相關資料的 Schema 資訊;
4. 統一的類型系統:為了更好的管理不同類型的的資料源,友善外部查詢引擎對接,中繼資料中心有一套完善的類型系統,使用者在實作不同資料源的插件時需要實作自身類型體系到中繼資料類型的映射關系;
5. 中繼資料檢索:我們會定期用全量數和增量的方式将中繼資料同步到 ES 當中,友善使用者快速查找自己想要的資料;
6. 完善的血緣功能:隻要将任務的上下遊按照指定的格式上報到中繼資料中心,就可以通過它提供的血緣接口去拿到整個資料流的血緣鍊路。
建設流程
需要進行的工作包括:
- 使用中繼資料中心的 API 實作 Flink Catalog API。
- 中繼資料中心到 Flink 系統的資料類型轉換,因為中繼資料中有一套統一的類型系統,隻需要處理 Flink 的類型系統到中繼資料類型系統的映射即可,不需要關心具體資料源的類型的轉換。
- 資料源屬性和表屬性的轉換,Flink 中表的屬性決定了它的源頭、序列化方式等,但是中繼資料中心也有自己的一套屬性,是以需要手動轉換一些屬性資訊,主要是一些屬性 key 的對齊問題。
- 血緣解析上報。
- 序列化格式完善。
- Table Connector 的完善,完善常用的存儲中間件的 Table Connector,如 Kudu、網易内部的 DDB 以及雲音樂自研的 Nydus 等。
- 提供 SDK 的開發方式:SDK 開發類似于 Spark SQL 的開發方式,通過 SQL 讀取資料,做一些簡單的邏輯處理,然後轉換成 DataStream,利用底層 API 實作一些複雜的資料轉換邏輯,最後再通過 SQL 的方式 sink 出去。簡單來說就是,SQL 加代碼混編的方式,提升開發效率,讓開發專注于業務邏輯實作,同時保證血緣的完整性和便利性,且充分利用了中繼資料。
完成以上工作後,整體基本就能實作我們的預期。
在一個 Flink 任務的開發中,涉及的資料源主要有三類:
- 流式資料:來自 Kafka 或者 Nydus,可以作為源端和目标端;
- 維表 JOIN 資料:來自 HBase 、Redis、JDBC 等,這個取決于我們自己實作了哪些;
- 落地資料源:一般為 MySQL、HBase、Kudu、JDBC 等,在流處理模式下通常作為目标端。
對于流式資料,我們使用中繼資料中心自帶的中繼資料系統 Nest 登記管理(參考右上角的圖);對于維表以及落地資料源等,可以直接通過中繼資料中心擷取庫表 Schema 資訊,無需額外的 Schema 登記,隻需要一次性登記下資料源連接配接資訊即可(參考右下角的圖)。整體對應我們系統中數倉子產品的中繼資料管理、資料源登記兩個頁面。
完成登記工作以後,我們可以通過catalog.[table]等方式通路任一進制資料中心中登記的表,進行 SQL 開發工作。其中 Catalog 是在資料源登記時登記的名字;db 和 table 是相應資料源自身的 DB 和 Table,如果是 MySQL 就是 MySQL 自身中繼資料中的 DB 和 Table。
最終效果可以參考左下角讀取實時表資料寫入 Kudu 的的例子,其中紅框部分是一個 Kudu 資料表,在使用前隻需要登記相關連接配接資訊即可,無需登記表資訊,平台會從中繼資料中心擷取。
ABTest 項目實踐
項目說明
ABTest 是目前各大網際網路公司用來評估前端改動或模型上線效果的一種有效手段,它主要涉及了兩類資料:第一個是使用者分流資料,一個 AB 實驗中使用者會被分成很多組;然後就是相關名額統計資料,我們通過統計不同分組的使用者在相應場景下名額的好壞,來判斷相關政策的好壞。這兩類資料被分為兩張表:
- 使用者分流表:dt 表示時間,os 表示作業系統。ab_id 是某個 ABTest 的 id 号,group_id 就是分組 id ,group_type 分為兩種,對照組指的是 ABTest 裡面的基準,而實驗組即是這次 ABTest 需要去評估的這批資料。userId 就是使用者 id 了。
- 名額統計表:根據 dt、os 等不同次元來統計每個使用者的有效播放,曝光,點選率等名額,metric、metric_ext 組合成一個具體含義。
在早期版本中,我們使用 Spark 按照小時粒度完成從 ODS 到 DWD 層資料清洗工作,生成使用者分流表和名額統計表。然後再使用 Spark 關聯這兩張表的資料将結果寫入到 Kudu 當中,再使用 Impala 系統對接,供使用者進行查詢。
這套方案的最大的問題是延遲太高,往往需要延遲一到兩個小時,有些甚至到第二天才能看到結果。對于延遲歸檔的資料也不能及時對結果進行修正。
這個方案對我們的業務方比如算法來說,上線一個模型需要等到兩個小時甚至第二天才能看到線上的效果,試錯成本太高,是以後來使用新版的實時倉開發了一套實時版本。
如上圖所示,是我們實時版本 ABTest 的資料走向,我們整體采用了 Lambda 架構:
- 第一步:使用 Flink 訂閱 ODS 原始的資料日志,處理成 DWD 層的資料分流表和名額統計表,同時也将實時的 DWD 層資料同步到相同結構的 Hive 表當中。DWD 層處理的目的是将業務資料清洗處理成業務能看懂的資料,沒有聚合操作,實作比較簡單。但是流資料歸檔到 Hive 的過程中需要注意小檔案問題,檔案落地的頻率越高,延遲越低,同時落地的小檔案也會越多,是以需要在技術和需求上權衡這個問題。同時在下方,我們也會有一條離線的資料流來處理同樣的過程,這個離線不是必須的,如果業務方對資料的準确性要求非常高,我們需要用離線處理做一次修正,解決資料重複問題。這一步還涉及到一個埋點的複雜問題,如果一個名額的埋點非常複雜,比如需要依賴時間順序路徑的歸因,而且本身用戶端日志的延遲程度也非常不可靠的話,離線的修複政策就更加有必要了。
- 第二步:DWS 層處理,讀取第一步生成的 DWD 的流表資料使用 Flink 按照天和小時的次元做全局聚合,這一步利用了 Flink 狀态計算的特點将中間結果維護在 RocksDB 的狀态當中。然後使用 RetractionSink 将結果資料不斷寫入到 Kudu ,生成一個不斷修正的 DWS 層聚合資料。同樣我們也會使用 Spark 做一套同樣邏輯的計算曆史資料來做資料的修正。
- 這個步驟涉及到幾個問題:
- Flink 大狀态的運維和性能問題:為了解決了這個問題,我們使用 SSD 的機器專門用來運作這種大狀态的任務,保障 RocksDB 狀态的吞吐性能;
- Kudu 的 Update 性能問題:這裡做了一些 minibatch 的的優化降低 Kudu 寫入的壓力;
- Lambda 架構的運維成本:實時離線兩套代碼運維成本比較高。
- 第三步:結果資料對接
對于實時的結果資料我們使用 Impala 直接關聯使用者分流表和名額資料表,實時計算出結果回報給使用者;
對于 T+1 的曆史資料,因為資料已經落地,并且不會再變了,是以為了降低 Impala 的壓力,我們使用 Spark 将結果提前計算好存在 Kudu 的結果表中,然後使用 Impala 直接查詢出計算好的結果資料。
批流一體
前面介紹的 ABTest 實時化整個實作過程就是一套完整的批流一體 Lambda 架構的實作。ODS 和 DWD 層既可以訂閱通路,也可以批量讀取。DWD 層落地在支援更新操作的 Kudu 當中,和上層 OLAP 引擎對接,為使用者提供實時的結果。目前實作上還有一些不足,但是未來批流一體的努力方向應該能看得比較清楚了。
我們認為批流一體主要分以下三個方面:
■ 1. 結果的批流一體
使用資料的人不需要關心資料是批處理還是流處理,在送出查詢的那一刻,拿到的結果就應該是截止到目前這一刻最新的統計結果,對于最上層使用者來說沒有批和流的概念。
■ 2. 存儲的批流一體
上面的 ABTest 例子中我們已經看到 DWD、DWS 層資料的存儲上還有很多不足,業界也有一些相應解決方案等待去嘗試,我們希望的批流一體存儲需要以下幾個特性:
- 同時提供增量訂閱讀取以及批量讀取的能力,如 Apache Pulsar,我們可以批量讀取它裡面的歸檔資料,也可以通過 Flink 訂閱它的流式資料,解決 DWD 層兩套存儲的問題。
- 高性能的實時 / 批量 append 和 update 能力,讀寫互不影響,提供類似于 MVCC 的機制,類似于 Kudu 這種,但是性能需要更加強悍來解決 DWS 層存儲的問題。
- 和 OLAP 引擎的對接能力,比如 Impala、Presto 等,并且如果想要提升查詢效率可能還要考慮到列式存儲,具備較強的 scan 或者 filter 能力,來滿足上層使用者對業務結果資料查詢效率的訴求。
■ 3. 計算引擎的批流一體
做到一套代碼解決批流統一場景,降低開發運維成本,這個也是 Flink 正在努力的方向,未來我們也會在上面做一些嘗試。
一個小調研
為了更好的了解大家實時計算的需求,完善 Flink 的功能及使用體驗,請大家幫忙填寫一個小小的問卷,您的兩分鐘将使 Flink 更強大!
問卷填寫連結:
https://c.tb.cn/F3.ZkPTrj