天天看點

數倉大法好!跨境電商 Shopee 的實時數倉之路

作者:黃良輝

本文講述 Flink 在 Shopee 新加坡資料組(Shopee Singapore Data Team)的應用實踐,主要内容包括:
  • 實時數倉建設背景
  • Flink 在實時資料數倉建設中結合 Druid、Hive 的應用場景
  • 實時任務監控
  • Streaming SQL 平台化
  • Streaming Job 管理
  • 未來規劃優化方向

建設背景

Shopee 是東南亞與台灣領航電商平台,覆寫新加坡、馬來西亞、菲律賓、台灣、印度尼西亞、泰國及越南七大市場,同時在中國深圳、上海和香港設立跨境業務辦公室。

  • Shopee在2020年第一季的總訂單量高達4.298億,同比增長111.2%。
  • 根據App Annie, Shopee在 2020年第一季強勢跻身全球購物類 App下載下傳量前三名。
  • 同時斬獲東南亞及台灣市場購物類 App 年度總下載下傳量、平均月活數、安卓使用總時長三項冠軍,并領跑東南亞兩大頭部市場,拿下印尼及越南年度購物類 App 下月活量雙冠王。

其中包括訂單商品、物流,支付,數字産品等各方面的業務。為了支援這些網際網路化産品,應對越來的越多的業務挑戰,于是我們進行了資料倉庫的設計和架建構設。

資料倉庫挑戰

目前随着業務發展,資料規模的膨脹和商務智能團隊對實時需求的不斷增長,業務挑戰越來越大:

  • 業務次元

    而言,業務需求越來越複雜,有需要明細資料查詢,又有實時各種次元聚合報表,實時标簽教育訓練和查詢需求。同時大量業務共享了一些業務邏輯,造成大量業務耦合度高,重複開發。
  • 平台架構

    而言,目前任務越來越多,管理排程,資源管理,資料品質異常監控等也越來越重要。實時化也越來急迫,目前大量業務還是離線任務形式,導緻淩晨服務負載壓力巨大,同時基于 T+1(天、小時級)架構業務無法滿足精細化、實時化營運需要。
  • 技術實作

    而言,現在實時業務大量采用 Spark Structured Streaming 實作,嚴重依賴 HBase 做 Stateful 需求,開發複雜;在異常故障事故,Task 失敗,缺乏 Exactly Once 特性支援,資料易丢失、重複。

為了解決上述問題,于是開始了 Flink 實時數倉的探索。

資料倉庫架構

為了支援這些網際網路化産品不斷增長的的資料和複雜的業務,Shopee 建構如下圖資料倉庫架構,從下到上層來看:

數倉大法好!跨境電商 Shopee 的實時數倉之路
  • 最底層是

    資料收集層

    ,這一層負責實時資料,包括 Binlog、Service Log, Tracking Service Log,經過 Real-time Ingestion 團隊資料将會被收集到 Kafka 、Hbase 中。Auto-Ingestion 團隊負責資料庫數離線日常收集到 HDFS。
  • 然後是

    存儲層

    ,這層主要是 Kafka 儲存實時消息,加上 HDFS 儲存 Hive 資料存儲等,HBase 儲存次元資料。
  • 存儲層

    上面是

    Spark, Flink 計算引擎, Presto SQL 查詢引擎

  • 排程管理層

    ,各種資源管理,任務管理,任務排程,管理各種 Spark,Flink 任務。
  • 資源管理層

    上一層是 OLAP

    資料存儲層

    ,Druid 用于存儲時間序列資料,Phoenix(HBase)存儲聚合報表資料、次元表資料、标簽資料,Elastic Search 存儲需要多元度字段索引的資料如廣告資料、使用者畫像等。
  • 最上層是

    應用層

    ,資料報表,資料業務服務,使用者畫像等。

Flink 實時資料數倉實踐

數倉大法好!跨境電商 Shopee 的實時數倉之路

目前在 Shopee Data Team 主要從資料分庫 Binlog 以及 Tracking Service 同步到 Kafka 叢集中,通過 Flink/Spark 計算,包含實時訂單商品銷量優惠活動分析,訂單物流分析、産品使用者标新、使用者印象行為分析,電商活動遊戲營運分析等。最後的結果存到 Druid、 HBase、 HDFS 等,後面接入一些資料應用産品。目前已經有不少核心作業從 Spark Structured Streaming 遷移到 Flink Streaming 實作。

Flink 與 Druid 結合的實時數倉應用

在實時訂單銷量分析産品中,通過 Flink 處理訂單流,将處理後的明細資料實時注入Druid,達到公司實時營運活動分析等用途。

我們使用 T-1(天)的 Lambda 架構來實時和曆史訂單資料産品分析,Flink 隻處理實時今天的訂單資料,每日會定時将昨日的資料通過離線任務索引到 Druid 中覆寫修正實時資料的微小誤差。整體的 Flink 實時處理流程如下圖,從上中下看共三條流水線:

數倉大法好!跨境電商 Shopee 的實時數倉之路

第一條流水線,通過 Kafka 接入 訂單 Binlog 事件。

  • 首先,解析反序列化訂單事件,通過訂單時間過濾無效訂單,隻保留今日訂單。通過訂單主鍵

    KeyBy

    進入

    ProcessWindowFunction

    ,因為上遊資料是 Binlog 會有重複訂單事件,是以會通過 ValueState 來對訂單進行去重。
  • 然後,通過查詢 HBase (Phoenix 表)進行Enrichment 次元字段,從 Phoenix 表中擷取訂單商品資訊,分類,使用者資訊等。
  • 最後,通過判斷是否所有字段成功關聯,如果所有字段都關聯成功将會把消息打入下遊kafka,并實時注入到 Druid;如果有字段關聯失敗将會把訂單事件通過

    Side Output

    進入另一個 Slow Kafka Topic,以便處理異常訂單。

第二條流水線比較複雜,通過多個實時任務将各分表

Slave Binlog

同步到 Hbase Phoenix 表,以便做成實時訂單流的次元表。目前遇到比較多問題還是經常 Binlog 延遲等問題,以及資料熱點問題。

第三條流基本與第一條類似,類似消息隊列中的 dead message 異常處理。因為大量次元表依賴,不能保證 Phoenix 都在訂單被處理前就被同步到 Phoenix 表,比如新訂單商品,新使用者,新店鋪,新分類,新商品等。是以我們引入一條實時 backfill 處理流将會對第一條主流,處理失敗的訂單重複處理,直到所有字段都關聯成功才會進入下遊 Druid。

另外為了避免一些過期消息進入死循環,同樣有個事件過濾視窗,保證隻保留今日的訂單事件在流水線中被處理。不同的是,因為需要區分付款訂單和未付款訂單事件類型(可能一個訂單有兩個狀态事件,當使用者下單時,會有一個下單事件,當使用者完成支付會有一個支付完成事件),是以将訂單是否被處理狀态放在enrichment之後标記重複成功。

訂單事件狀态維護

因為上遊資料源是 Binlog,是以随着訂單狀态的更新,會有大量的訂單重複事件。

通過使用 Flink State 功能儲存在記憶體中(FsSateBackend),以 ValueState 來标記訂單是否被處理,通過設定 TTL,保證訂單狀态儲存24小時過期,現在活動高峰期大概2G State,平均每個TaskManager大約100M State。Checkpoint interval 設定為10秒一次,HDFS 負載并不高。同時因為流使用了視窗和自定義 Trigger,導緻 State 需要緩沖少量視窗資料。視窗的使用将會在

Enrihcment 流程

優化部分詳細說明。

Enrichment 流程優化

在 Enrichment 步驟, 業務邏輯複雜,存在大量 IO,我們做了大量改進優化。

  • 首先,從 HBase 表關聯字段,通過增加

    Local RLU Memeory Cache

    層,減少 Hbase 的通路量,加速關聯;對 HBase Row Key Salt Bucket 避免訂單商品表通路熱點問題。
  • 第二,HBase 表直接通路層(Service)通過 Google Guice 管理依賴友善配置管理,記憶體 Cache 關聯等。
  • 第三,由于商品表和訂單商品同步到 HBase 有一定延遲,導緻大量的訂單事件進入 Slow Kafka topic,是以通過設定視窗和自定義 Trigger 保證訂單數量到一定數量或者視窗逾時才觸發視窗資料的處理,優化後能保證98%的訂單在主流被成功處理。
  • 最後,在訂單關聯訂單商品時,考慮過使用

    Interval Join

    來做,但是由于一個訂單有多條訂單商品資訊,加上上遊是 Binlog 事件,以及其他次元表資料延遲問題,導緻業務邏輯複雜,而且計算産出資料儲存在 Druid 隻能支援增量更新。是以選擇了使用 HBase 存儲來關聯訂單商品資訊,附加慢消息處理流來解決資料延遲問題。

資料品質保障和監控

目前将 Checkpoint 設定為

exactly once

模式,并開啟了

Kafka exactly once

生産者模式,通過

Two Phase Commit

功能保證資料的一緻性,避免 task 失敗,job 重新開機時導緻資料丢失。

監控方面,通過監控 Upstream Kafka Topic,以及 HBase 表寫入更新狀态,結合下遊 Druid 資料延遲監控,做到 end-to-end 的 lag 名額監控。通過 Flink Metric Report 彙報 Hbase 通路性能名額,緩存大小,延遲訂單數量等來對 Flink job 具體步驟性能分析。

Flink 與 Hive 結合的實時數倉應用

在訂單物流實時分析業務,接入 Binlog event 實作支援點更新的物流分析,使用

Flink Retract Stream

功能來支援每當訂單和物流有最新狀态變化事件就觸發下遊資料更新。

通過

Interval Join

訂單流和物流流,并使用 Rocksdb State 與 Incremental Checkpoint 來維護最近七天的狀态資料,從 Hbase 來增加使用者次元資訊等,次元字段 enrihcment 通過 Local LRU Memory Cache 層來優化查詢,最後定時從 Hbase 導出到 HDFS。

數倉大法好!跨境電商 Shopee 的實時數倉之路

現在将 Flink 任務産生的訂單物流事件儲存 HBase 來支援記錄級别的點更新,每小時從 HBase 導出到 HDFS 結果,通過 Presto 接入來做實時分析。HBase 導出到HDFS,通過對 Hbase Row Key Salt Bucket 避免熱點問題,優化減小 Region Size(預設10G)來減少導出時間。但是資料現在延遲還是比較嚴重,在一個半小時左右,而且鍊路繁瑣。将來考慮加入

Apache Hudi

組建接入 Presto,将延遲降到半小時内。

Streaming SQL 應用與管理

目前 Shopee 有大量的實時需求通過 SQL 實作,應用場景主要是應用層實時彙總資料報表、次元表更新等。業務通過 SDK 和一站式網站管理兩種方式實作。一是以 SDK 形式提供支援,使用者可以通過引入 JAR 依賴進行二次項目開發。二是制作了相關網站,通過以任務形式,使用者建立任務編輯儲存 SQL 來實作業務需求,目前支援如下:

  • 任務清單、分組管理,支援重新開機,停止,禁用任務功能。
  • 任務支援 crontab 規則定時執行排程模式和 Streaming 模式。
  • JAR 資源管理,任務自定義 JAR 引用,以便重複使用 UDF 等。
  • 通用 SQL 資源管理,任務引入共享 SQL 檔案,避免重複 SQL 邏輯、重複定義 View 以及環境配置等。
  • 使用者分組權限管理。
  • 內建 Garafna 做任務延遲報警。

下面是部分任務組織UI化形式:

數倉大法好!跨境電商 Shopee 的實時數倉之路

目前平台隻支援 Spark SQL 實作 Stream SQL,使用 Hive 存儲中繼資料,通過關聯次元表 JOIN Apache Phoenix 等外部表和外部服務實作 enrichment 等功能。通過對比 Flink SQL 與 Spark SQL,發現 Spark SQL 不少缺點:

  • Spark SQL 視窗函數種類少,沒有 Flink 的支援靈活,導緻大量聚合任務無法通過平台 SQL 化。
  • Spark Stateful 狀态控制差,沒有 Flink Rocksdb State 增量狀态支援。
  • Spark 關聯次元表時,以前在每次 micro-batch 中都需要加載全量次元表,現在已經改為 GET 方式,Lookup 性能方面已經有提升不少,但還是沒有像 Flink 異步 Lookup 那樣的異步功能,提高性能。
  • 沒有 Flink Snapshot 和 Two Phase Commit 功能的支援,導緻任務重新開機,失敗恢複會出現資料不一緻,失去準确性。

Spark SQL 支援還是有很多局限性,目前正在做 Flink SQL 需求導入評估階段,并計劃在 Stream SQL Platform 接入 Flink SQL 的支援。來滿足公司越來越複雜使用者畫像标簽标注和簡單實時業務 SQL 化,減少業務開發成本。同時需要引入更好的 UDF 管理方式,內建中繼資料服務簡化開發。

Shopee Data Team 擁有大量的實時任務是通過 Jar 包釋出的,目前在 Job 管理上通過網站頁面化,來減少 Job 維護成本。目前支援環境管理,任務管理,任務應用配置管理,和任務監控報警。

環境管理

目前可以配置 Flink / Spark Bin 路徑來支援不同的 Flink/Spark 版本,來支援 Flink 更新帶來的多版本問題,并支援一些顔色高亮來區分不同環境。

數倉大法好!跨境電商 Shopee 的實時數倉之路

任務管理

現在支援實時任務的環境檢索,狀态檢索,名字檢索等。支援重新開機,禁用,配置任務參數等。任務支援從 checkpoint/savepoint 恢複,停止任務自動儲存 savepoint,從 kafka timestamp 啟動。

數倉大法好!跨境電商 Shopee 的實時數倉之路

任務配置管理

同時實時任務也支援配置記憶體,CPU 等 Flink Job 運作參數、JAR 依賴配置等。目前支援預覽,編輯更新等,通過 Jekins CICD 內建與人工幹預結果,來完成 Job 的部署更新。

數倉大法好!跨境電商 Shopee 的實時數倉之路

任務應用配置管理

任務應用配置是使用 HOCON 配置格式支援,目前支援共享配置內建,并通過配置名約定将 Checkpoint 路徑自動綁定到配置中。網站支援預覽模式,編輯模式,配置高亮等,将來會內建配置版本復原等功能。

數倉大法好!跨境電商 Shopee 的實時數倉之路

任務監控報警

對于任務監控方面,現在支援任務異常處理報警。異常處理支援自動挂起失敗的任務,并從上次最新 checkpoint 恢複;通過 Flink REST API 檢測 Flink Job 狀态,來避免 Flink Job 異常造成的假活狀态。出現任務重新開機,異常情況會通過郵件等方式給任務負責人發報警,未來打算在網站內建 Grafana/Promethus 等監控工具來完成任務監控自動化等。

未來規劃

總體而言,Flink 在 Shopee 從 2019 年底開始調研,到項目落地不到半年時間,已經完成業務大量需求導入評估,對 Exactly Once,Kafka Exactly Once Semantics,Two Phase Commit,Interval Join,Rocksdb/FS State 一系列的功能進行了驗證。在未來規劃上:

  • 首先,會嘗試更多的實時任務 Flink SQL 化,進一步實作流批統一;
  • 其次,會對目前大量 Spark structured Streaming Job 遷移到 Flink 實作,并對新業務進行 Flink 探索。
  • 在 Streaming SQL Platform 也會加入 Flink SQL 支援,來解決目前平台遇到一些性能瓶頸和業務支援局限性。

作者簡介:

黃良輝,2019 年加入 Shopee,在 Shoppe Data Team 負責實時資料業務和資料産品開發。

繼續閱讀