天天看點

網易遊戲基于 Flink 的流式 ETL 建設

網易遊戲資深開發工程師林小鉑為大家帶來網易遊戲基于 Flink 的流式 ETL 建設的介紹。内容包括:
  1. 業務背景
  2. 專用 ETL
  3. EntryX 通用 ETL
  4. 調優實踐
  5. 未來規劃

一. 業務背景

網易遊戲 ETL 服務概況

網易遊戲的基礎資料主要日志方式采集,這些日志通常是非結構化或半結構化資料,需要經過資料內建 ETL 才可以入庫至實時或離線的資料倉庫。此後,業務使用者才可以友善地用 SQL 完成大部分資料計算,包括實時的 Flink SQL 和離線的 Hive 或 Spark。

網易遊戲基于 Flink 的流式 ETL 建設

網易遊戲資料內建的資料流與大多數公司大同小異,主要有遊戲用戶端日志、遊戲服務端日志和其他周邊基礎的日志,比如 Nginx access log、資料庫日志等等。這些日志會被采集到統一的 Kafka 資料管道,然後經由 ETL 入庫服務寫入到 Hive 離線資料倉庫或者 Kafka 實時資料倉庫。

這是很常見的架構,但在我們在需求方面是有一些比較特殊的情況。

網易遊戲流式 ETL 需求特點

網易遊戲基于 Flink 的流式 ETL 建設

首先,不同于網際網路、金融等行業基本常用 MySQL、Postgres 等的關系型資料庫,遊戲行業常常使用 MongoDB 這類 schema-free 的文檔型資料庫。這給我們 ETL 服務帶來的問題是并沒有一個線上業務的準确的 schema 可以依賴,在實際資料進行中,多字段或少字段,甚至一個字段因為玩法疊代變更為完全不同的格式,這樣的情況都是可能發生的。這樣的資料異構問題給我們 ETL 的資料清洗帶來了比較高的成本。

其次,也是由于資料庫選型的原因,大部分業務的資料庫模式都遵循了反範式設計,會刻意以複雜内嵌的字段來避免表間的 join。這種情況給我們帶來的一個好處是,在資料內建階段我們不需要去實時地去 join 多個資料流,壞處則是資料結構可能會非常複雜,多層嵌套十分常見。

然後,由于近年來實時數倉的流行,我們也同樣在逐漸建設實時資料倉庫,是以複用現有的 ETL 管道,提取轉換一次,加載到實時離線兩個資料倉庫,成為一個很自然的發展方向。

最後,我們的日志類型多且變更頻繁,比如一個玩法複雜的遊戲,可能有 1,000 個以上的日志類型,每兩周可能就會有一次發版。在這樣的背景下 ETL 出現異常資料是不可避免的。是以我們需要提供完善的異常處理,讓業務可以及時得知資料異常和通過流程修複資料。

日志分類及特點

網易遊戲基于 Flink 的流式 ETL 建設

為了更好地針對不同業務使用模式優化,我們對不同日志類型的業務提供了不同的服務。我們的日志通常分為三個類型:營運日志、業務日志和程式日志。

營運日志記錄的是玩家行為事件,比如登入帳号、領取禮包等。這類日志是最為重要日志,有固定的格式,也就是特定 header + json 的文本格式。資料的主要用途是做資料報表、資料分析還有遊戲内的推薦,比如玩家的組隊比對推薦。

業務日志記錄的是玩家行為以外的業務事件,這個就比較廣泛,比如 Nginx access log、CDN 下載下傳日志等等,這些完全沒有固定格式,可能是二進制也可能是文本。主要用途類似于營運日志,但更加豐富和定制化。

程式日志記錄是程式的運作情況,也就是平時我們通過日志架構打的 INFO、ERROR 這類日志。程式日志主要用途是檢索定位運作問題,通常是寫入 ES,但有時數量過大或者需要提取名額分析時,也會寫入資料倉庫。

網易遊戲 ETL 服務剖析

網易遊戲基于 Flink 的流式 ETL 建設

針對這些日志分類,我們具體提供了三類 ETL 入庫的服務。首先是營運日志專用的 ETL,這會根據營運日志的模式進行定制化。然後是通用的面向文本日志的 EntryX ETL 服務,它會服務于營運日志以外的所有日志。最後是 EntryX 無法支援的特殊 ETL 需求,比如有加密或者需要進行特殊轉換的資料,這種情況下我們就會針對性地開發 ad-hoc 作業來處理。

二. 營運日志專用 ETL

營運日志 ETL 發展曆程

網易遊戲基于 Flink 的流式 ETL 建設

營運日志 ETL 服務有着一個比較久的曆史。大概在 2013 年,網易遊戲就建立了基于 Hadoop Streaming + Python 預處理/後處理的第一版離線 ETL 架構。這套架構是平穩運作了多年。

在 2017 年的時候,随着 Spark Streaming 的嶄露頭角,我們開發了基于 Spark Streaming 的第二個版本,相當于一個 POC,但因為微批調優困難且小檔案多等問題沒有上線應用。

時間來到 2018 年,當時 Flink 已經比較成熟,我們也決定将業務遷移到 Flink 上,是以我們很自然地開發了基于 Flink DataStream 的第三版營運日志 ETL 服務。這裡面比較特殊的一點就是,因為長久以來我們業務方積累了很多 Python 的 ETL 腳本,然後新版最重要的一點就是要支援這些 Python UDF 的無縫遷移。

營運日志 ETL 架構

接下來看下兩個版本的架構對比。

網易遊戲基于 Flink 的流式 ETL 建設

在早期 Hadoop Streaming 的版本裡面,資料首先會被 dump 到 HDFS 上,然後 Hadoop Streaming 啟動 Mapper 來讀取資料并通過标準輸入的方式傳遞給 Python 腳本。Python 腳本裡面會分為三個子產品:首先預處理 UDF,這裡通常會進行基于字元串的替換,一般用作規範化資料,比如有些海外合作廠商的時間格式可能跟我們不同,那麼就可以在這裡進行統一。預處理完的資料會進入通用的解析/轉換子產品,這裡我們會根據營運日志的格式來解析資料,并進行通用轉換,比如濾掉測試服資料。通用子產品之後,最後還有一個後處理子產品進行針對字段的轉換,比如常見的匯率轉換。之後資料會通過标準輸出傳回給 Mapper,然後 Mapper 再将資料批量寫到 Hive 目錄中。

我們用 Flink 重構後,資料源就由 HDFS 改為直接對接 Kafka,而 IO 子產品則用 Flink 的 Source/Sink Operator 來代替原本的 Mapper,然後中間通用子產品可以直接重寫為 Java,剩餘的預處理和後處理則是我們需要支援 Python UDF 的地方。

Python UDF 實作

網易遊戲基于 Flink 的流式 ETL 建設

在具體實作上,我們在 Flink ProcessFunction 之上加入了 Runner 層,Runner 層負責跨語言的執行。技術選型上是選了 Jython,而沒有選擇 Py4j,主要因為 Jython 可以直接在 JVM 裡面去完成計算,不需要額外啟動 Python 程序,這樣開發和運維管理成本都比較低。而 Jython 帶來的限制,比如不支援 pandas 等基于 c 的庫,這些對于我們的 Python UDF 來說都是可接受的。

整個調用鍊是,ProcessFunction 在 TaskManager 被調用時會在 open 函數延遲初始化 Runner,這是因為 Jython 是不可序列化的。Runner 初始化時會負責資源準備,包括将依賴的子產品加入 PYTHONPATH,然後根據配置反射調用 UDF 函數。

調用時,對于預處理 UDF Runner 會把字元串轉化為 Jython 的 PyUnicode 類型,而對于後處理 UDF 則會把解析後的 Map 對象轉為 Jython 的 PyDcitionary,分别作為兩者的輸入。UDF 可以調用其他子產品進行計算,最終傳回 PyObject,然後 Runner 再将其轉換成 Java String 或者 Map,傳回給 ProcessFunction 輸出。

營運日志 ETL 運作時

網易遊戲基于 Flink 的流式 ETL 建設

剛剛是 UDF 子產品的局部視圖,我們再來看下整體的 ETL 作業視圖。首先在我們提供了通用的 Flink jar,當我們生成并送出 ETL 作業到作業平台時,排程器會執行通用的 main 函數建構 Flink JobGraph。這時會從我們的配置中心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中包含使用到的 Python 子產品,後端服務會掃描其中引用到的其他子產品,把它們統一作為資源檔案通過 YARN 分發功能上傳到 HDFS 上。在 Flink JobManager 和 TaskManager 啟動時,這些 Python 資源會被 YARN 自動同步到工作目錄上備用。這就是整個作業初始化的過程。

然後因為 ETL 規則的小變更是很頻繁的,比如新增一個字段或者變更一下過濾條件,如果我們每次變更都需要重新開機作業,那麼作業重新開機帶來的不可用時間會對我們的下遊使用者造成比較糟糕的體驗。是以,我們對變更進行了分類,對于一些不影響 Flink JobGraph 的輕量級變更支援熱更新。實作的方式是每個 TaskManager 啟動一個熱更新線程,定時輪詢配置中心同步配置。

三. EntryX 通用 ETL

接下來介紹我們的通用 ETL 服務 EntryX。這裡的通用可以分為兩層意義,首先是資料格式上的通用,支援非結構化到結構化的各種文本資料,其次是使用者群體的通用,目标使用者覆寫資料分析、資料開發等傳統使用者,和業務程式、策劃這些資料背景較弱的使用者。

EntryX 基本概念

網易遊戲基于 Flink 的流式 ETL 建設

先介紹 EntryX 的三個基本概念,Source、StreamingTable 和 Sink。使用者需要分别配置這個三個子產品,系統會根據這些自動生成 ETL 作業。

Source 是 ETL 作業的輸入源,通常是從業務端采集而來的原始日志 topic,或者是經過分發過濾後的 topic。這些 topic 可能隻包含一種日志,但更多情況下會包含多種異構日志。

接下來 StreamingTable,一個比較通俗的名稱就是流表。流表定義了 ETL 管道的主要中繼資料,包括如何轉換資料,還有根據轉換好的資料定義的流表 schema,将資料 schema 化。流表 schema 是最為關鍵的概念,它相當于 Table DDL,主要包括字段名、字段資料類型、字段限制和表屬性等。為了更友善對接上下遊,流表 schema 使用的是自研的 SQL-Like 的類型系統,裡面會支援我們一些拓展的資料類型,比如 JSON 類型。

最後 Sink 負責流表到目标存儲的實體表的映射,比如映射到目标 Hive 表。這裡主要需要 schema 的映射關系,比如流表哪個字段映射到目标表哪個字段,流表哪個字段用作目标 Hive 表分區字段。在底層,系統會自動根據 schema 映射關系來提取字段,并将資料轉換為目标表的存儲格式,加載到目标表。

EntryX ETL 管道

網易遊戲基于 Flink 的流式 ETL 建設

再來看下 EntryX ETL 管道的具體實作。藍色部分是外部存儲系統,而綠色部分則是 EnrtyX 的内部子產品。

資料首先從對接采集的原始資料 Topic 流入,經過 Source 攝入到 Filter。Filter 負責根據關鍵詞過濾資料,通常來說我們要求過濾完的資料是有相同 schema 的。經過這兩步資料完成 Extract,來到 Transform 階段。

Transform 第一步是解析資料,也就是這裡的 Parser。Parser 支援 JSON/Regex/Csv 三種解析,基本可以覆寫所有案例。第二步是對資料進行轉換,這是由 Extender 負責的。Extender 通過内置函數或 UDF 計算衍生字段,最常見的是将 JSON 對象拉平展開,提取出内嵌字段。最後是 Formatter,Formatter 會根據之前使用者定義的字段邏輯類型,将字段的值轉為對應的實體類型。比如一個邏輯類型為 BIGINT 的字段,我們在這裡會統一轉為 Java long 的實體類型。

資料完成 Transform 之後來到最後的 Load 階段。Load 第一步是決定資料應該加載到哪個表。Splitter 子產品會根據每個表的入庫條件(也就是一個表達式)來分流資料,然後再到第二步的 Loader 來負責将資料寫到具體的外部存儲系統。目前我們支援 Hive/Kafka 兩種存儲,Hive 支援 Text/Parquet/JSON 三種格式,而 Kafka 支援 JSON 和 Avro 兩種格式。

實時離線統一 Schema

網易遊戲基于 Flink 的流式 ETL 建設

在 Entryx 的設計裡資料可以被寫入實時和離線兩個資料倉庫,也就是說同一份資料,但在不同的存儲系統中以不同格式表示。從 Flink SQL 的角度來說是 schema 部分相同,但 connector 和 format 不同的兩個表。而 schema 部分經常會随業務變更,而 connector 和 format(也就是存儲系統和存儲格式)是相對穩定的。那麼一個很自然的想法就是,能不能将 schema 部分提取出來獨立維護?實際上,這個抽象的 schema 已經存在了,就是我們在 ETL 提取的流表 schema。

在 EntryX 裡面,流表 schema 是與序列化器、存儲系統無關的 schema,作為 Single Source of Truth。基于流表 schema,加上存儲系統資訊和存儲格式資訊,我們就可以衍生出具體的實體表的 DDL。目前我們主要是支援 Hive/Kafka,如果之後要拓展至支援 ES/HBase 表也是非常友善。

實時資料倉庫內建

網易遊戲基于 Flink 的流式 ETL 建設

EntryX 一個重要的定位是作為實時倉庫的統一入口。剛剛其實已經多次提到 Kafka 表,但還沒有說實時數倉是怎麼做的。實時數倉的常見問題是 Kafka 并沒有原生支援 schema 中繼資料的持久化。目前社群的主流解決方案是基于 Hive MetaStore 來儲存 Kafka 表的中繼資料,并複用 HiveCatalog 來直接對接到 Flink SQL。

但這對于我們來說使用 Hive MetaStore 主要有幾個問題:一是在實時作業裡引入 Hive 依賴并與 Hive 耦合,這是很重的依賴,導緻定義的表很難被其他元件複用,包括 Flink DataStream 使用者;二是我們已經有 Kafka SaaS 平台 Avatar 來管理實體 schema,比如 Avro schema,如果再引入 Hive MetaStore 會導緻中繼資料的割裂。是以,我們是拓展了 Avatar 平台的 schema 注冊中心,同時支援邏輯 schema 和實體 schema。

那麼實時數倉和 EntryX 的內建關系是:首先我們有 EntryX 的流表 schema,在建立 Sink 的時候調用 Avatar 的 schema 接口,根據映射關系生成邏輯 schema,而 Avatar 再根據 Flink SQL 類型與實體類型的映射關系生成 topic 的實體 schema。

與 Avatar schema 注冊中心配套的還有我們自研的 KafkaCatalog,它負責讀取 topic 的邏輯和實體 schema 來生成 Flink SQL 的 TableSource 或 TableSink。而對于一些 Flink SQL 以外的使用者,比如 Flink DataStream API 的使用者,他們也可以直接讀取實體 schema 來享受到資料倉庫的便利。

EntryX 運作時

和營運日志 ETL 類似,在 EntryX 運作時,系統會基于通用的 jar 和配置生成 Flink 作業,但這裡有兩種情況需要特别處理。

網易遊戲基于 Flink 的流式 ETL 建設

首先是一個 Kafka topic 往往有幾十甚至上千種日志,那麼對應其實有也幾十甚至上千的流表,如果每個流表都單獨運作在一個作業裡,那麼一個 topic 會可能會被讀上千遍,這是非常大的浪費。是以,在作業運作時提供一個優化政策,可以将同個 source 的不同流表合并到一個作業裡跑。比如圖中,某個手遊上傳了 3 種日志到 Kafka,使用者分别配置了玩家注冊、玩家登入、領取禮包三個流表,那麼我們可以這三個流表合并起來到一個作業,共享同一個 Kafka Source。

另外的一個優化是,一般情況下我們可以按照之前“提取轉換一次,加載一次”的思路來将資料同時寫到 Hive 和 Kafka,但是由于 Hive 或者說 HDFS 畢竟是離線系統,實時性比較差,寫入在一些負載比較高的 HDFS 老叢集經常會出現反壓,同時阻塞上遊,導緻 Kafka 的寫入也受到影響。在這種情況下,我們通常要分離加載到實時和離線的 ETL 管道,具體會取決于業務的 SLA 還有 HDFS 的性能。

四.調優實踐

接下來給大家分享下我們在 ETL 建設中的調優實踐經驗。

HDFS 寫入調優

網易遊戲基于 Flink 的流式 ETL 建設

首先是 HDFS 寫入的調優。流式寫入 HDFS 場景中老生常談的一個問題便是小檔案過多。通常來說小檔案和實時性是魚與熊掌不可兼得。如果要延遲低,那麼我們需要頻繁地滾動檔案來送出資料,必然導緻小檔案過多。

小檔案過多主要造成兩個問題:一從 HDFS 叢集管理角度看,小檔案會占用大量的檔案數和 block 數,浪費 NameNode 記憶體;二是從使用者角度看,讀寫效率都會降低,因為寫的時候要更頻繁地調用 RPC 和 flush 資料,造成更多的阻塞,有時甚至造成 checkpoint 逾時,而讀時則需要打開更多的檔案才能讀完資料。

HDFS 寫入調優 - 資料流預分區

我們在優化小檔案問題時做的一點調優是對資料流先做一遍預分區,具體來說,便是在 Flink 作業内部先基于目标 Hive 表進行一次 keyby 分區,讓同一個表的資料盡量集中在少數的幾個 subtask 上。

網易遊戲基于 Flink 的流式 ETL 建設

舉個例子,假設 Flink 作業并行度為 n,而目标 Hive 分區數為 m 個。因為每個 subtask 都有可能讀到任意分區的資料,在預設的各 subtask 完全并行的情況下,每個 subtask 都會寫所有分區,造成總體的寫入檔案數是 n * m。假設 n 是 100,m 是 1000,按 10 分鐘滾一次檔案算,每天會造成 14,400,000 個檔案,這對于很多老叢集來說是非常大的壓力。

如果經過資料流分區的優化之後,我們就可以限制住 Flink 并行度帶來的增長。比如我們 keyby hive 表字段,并加入範圍為 0-s 整數的鹽來避免資料傾斜,那麼分區最多會被 s 個 subtask 讀寫。假設 s 是 5,比起原先 n 是 100,那麼我們就将原本的檔案數降低為原來 20 分之一。

基于 OperatorState 的 SLA 統計

網易遊戲基于 Flink 的流式 ETL 建設

第二個我想分享的是我們的 SLA 統計工具。背景是我們的使用者經常會通過 Web UI 來進行調試和問題的排查,比如不同 subtask 的輸入輸出數目,但這些 metric 會因為作業重新開機或者 failover 而重置,是以我們開發了基于 OperatorState 的 SLA-Utils 工具來統計資料的輸入和分類輸出。這個工具設計得非常輕量級,可以很容易內建到我們自己的服務或者使用者的作業裡面。

在 SLA-Utils 裡面,我們支援了三種 metric。首先是标準的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,分别對應輸入記錄數/正常輸出記錄數/被過濾掉的記錄數/處理異常的記錄數。通常來說 recordsIn 就等于後面三者的總和。第二種使用者可以自定義的 metric,通常可以用于記錄更詳細的原因,比如是 recordsEventTimeDropped 代表資料是因為 event time 被過濾的。

那麼上述兩種 metric 靜态的,也就是說 metric key 在作業運作前就要确定,此外 SLA-Utils 還支援在運作時動态注冊的 TTL metric。這種 metric 通常有動态生成的日期作為字首,在經過 TTL 的時間之後被自動清理。TTL metric 主要可以用于做天級别時間視窗的統計。這裡比較特别的一點是,因為 OperatorState 是不支援 TTL 的,SLA-Utils 是在每次進行 checkpoint 快照的時候進行一次過濾,剔除掉過期的 metric,以實作 TTL 的效果。

那麼在 State 儲存了 SLA 名額之後要做的就是暴露給使用者。我們目前的做法是通過 Accumulater 的方式來暴露,優點是 Web UI 有支援,開箱即用,同時 Flink 可以自動合并不同的 subtask 的 metric。缺點在于沒有辦法利用 metric reporter 來 push 到監控系統,同時因為 Acuumulater 是不能在運作時動态登出的,是以使用 TTL metric 會有記憶體洩漏的風險。是以,在未來我們也考慮支援 metric group 來避免這些問題。

資料容錯及恢複

最後再分享下我們在資料容錯和恢複上的實踐。

網易遊戲基于 Flink 的流式 ETL 建設

以很多最佳實踐相似,我們用 SideOutput 來收集 ETL 各環節中出錯的資料,彙總到一個統一的錯誤流。錯誤記錄中包含我們預設的錯誤碼、原始輸入資料以及錯誤類和錯誤資訊。一般情況下,錯誤資料會被分類寫入 HDFS,使用者通過監控 HDFS 目錄可以得知資料是否正常。

網易遊戲基于 Flink 的流式 ETL 建設

那麼存儲好異常資料後,下一步就是要恢複資料。這通常有兩種情況。

一是資料格式異常,比如日志被截斷導緻不完整或者時間戳不符合約定格式,這種情況下我們一般通過離線批作業來修複資料,重新回填到原有的資料管道。

二是 ETL 管道異常,比如資料實際的 schema 有變更但流表配置沒有更新,可能會導緻某個字段都是空值,這時我們的處理辦法是:首先更新線上的流表配置為最新,保證不再産生更多異常資料,這時 Hive 裡面仍有部分分區是異常的。然後,我們釋出一個獨立的補數作業來專門修複異常的資料,輸出的資料會寫到一個臨時的目錄,并在 hive metastore 上切換 partition 分區的 location 來替換掉原來的異常目錄。是以這樣的一個補數流程對離線查詢的使用者來說是透明的。最後我們再在合适的時間替換掉異常分區的資料并恢複 location。

五.未來規劃

最後介紹下我們的未來規劃。

網易遊戲基于 Flink 的流式 ETL 建設
  • 第一個是資料湖的支援。目前我們的日志絕大多數都是 append 類型,不過随着 CDC 和 Flink SQL 業務的完善,我們可能會有更多的 update、delete 的需求,是以資料湖是一個很好的選擇。
  • 第二個會提供更加豐富的附加功能,比如實時的資料去重和小檔案的自動合并。這兩個都是對業務方非常實用的功能。
  • 最後是一個支援 PyFlink。目前我們的 Python 支援隻覆寫到資料內建階段,後續資料倉庫的 Python 支援我們是希望通過 PyFlink 來實作。

最新活動推薦

僅需99元即可體驗阿裡雲基于 Apache Flink 建構的企業級産品-實時計算 Flink 版!點選下方連結了解活動詳情:

https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
網易遊戲基于 Flink 的流式 ETL 建設