天天看點

Flink在唯品會的實踐

本文來自于王新春在2018年7月29日 Flink China社群線下 Meetup·上海站的分享。王新春目前在唯品會負責實時平台相關内容,主要包括實時計算架構和提供實時基礎資料,以及機器學習平台的工作。之前在美團點評,也是負責大資料平台工作。他已經在大資料實時處理方向積累了豐富的工作經驗。
           

本文主要内容主要包括以下幾個方面:

  1. 唯品會實時平台現狀
  2. Flink在唯品會的實踐
  3. Flink On K8S
  4. 後續規劃

一、唯品會實時平台現狀

目前在唯品會實時平台并不是一個統一的計算架構,而是包括 Storm,Spark,Flink 在内的三個主要計算架構。由于曆史原因,目前在 Storm 平台上的 job 數量是最多的,但是從去年開始,業務重心逐漸切換到 Flink 上面,是以今年在 Flink 上面的應用數量有了大幅增加。

實時平台的核心業務包含八大部分:實時推薦作為電商的重點業務,包含多個實時特征;大促看闆,包含各種次元的統計名額(例如:各種次元的訂單、UV、轉化率、漏鬥等),供上司層、營運、産品決策使用;實時資料清洗,從使用者埋點收集來資料,進行實時清洗和關聯,為下遊的各個業務提供更好的資料;此外還有網際網路金融、安全風控、與友商比價等業務,以及 Logview、Mercury、Titan 作為内部服務的監控系統、VDRC 實時資料同步系統等。

實時平台的職責主要包括實時計算平台和實時基礎資料。實時計算平台在 Storm、Spark、Flink 等計算架構的基礎上,為監控、穩定性提供了保障,為業務開發提供了資料的輸入與輸出。實時基礎資料包含對上遊埋點的定義和規範化,對使用者行為資料、MySQL 的 Binlog 日志等資料進行清洗、打寬等處理,為下遊提供品質保證的資料。

在架構設計上,包括兩大資料源。一種是在App、微信、H5等應用上的埋點資料,原始資料收集後發送到在kafka中;另一種是線上實時資料的 MySQL Binlog 日志。資料在計算架構裡面做清洗關聯,把原始的資料通過實時ETL為下遊的業務應用(包括離線寬表等)提供更易于使用的資料。

二、Flink在唯品會的實踐

場景一:Dataeye實時看闆

Dataeye 實時看闆是支援需要對所有的埋點資料、訂單資料等進行實時計算時,具有資料量大的特點,并且需要統計的次元有很多,例如全站、二級平台、部類、檔期、人群、活動、時間次元等,提高了計算的複雜程度,統計的資料輸出名額每秒鐘可以達到幾十萬。

以 UV 計算為例,首先對 Kafka 内的埋點資料進行清洗,然後與Redis資料進行關聯,關聯好的資料寫入Kafka中;後續 Flink 計算任務消費 Kafka 的關聯資料。通常任務的計算結果的量也很大(由于計算次元和名額特别多,可以達到上千萬),資料輸出通過也是通過 Kafka 作為緩沖,最終使用同步任務同步到 HBase 中,作為實時資料展示。同步任務會對寫入 HBase 的資料限流和同類型的名額合并,保護 HBase。與此同時還有另一路計算方案作為容災。

在以 Storm 進行計算引擎中進行計算時,需要使用 Redis 作為中間狀态的存儲,而切換到 Flink 後,Flink 自身具備狀态存儲,節省了存儲空間;由于不需要通路 Redis,也提升了性能,整體資源消耗降低到了原來的1/3。

在将計算任務從 Storm 逐漸遷移到Flink的過程中,對兩路方案先後進行遷移,同時将計算任務和同步任務分離,緩解了資料寫入 HBase 的壓力。

切換到 Flink 後也需要對一些問題進行追蹤和改進。對于 FlinkKafkaConsumer,由于業務原因對 kafka 中的 Aotu Commit 進行修改,以及對 offset 的設定,需要自己實作支援 kafka 叢集切換的功能。對不帶 window 的state 資料需要手動清理。還有計算架構的通病——資料傾斜問題需要處理。同時對于同步任務追數問題,Storm可以從 Redis 中取值,Flink 隻能等待。

場景二:Kafka資料落地HDFS

之前都是通過 Spark Streaming 的方式去實作,現在正在逐漸切換到 Flink 上面,通過 OrcBucketingTableSink 将埋點資料落地到 HDFS上 的 Hive 表中。在 Flink 進行中單 Task Write 可達到3.5K/s左右,使用 Flink 後資源消耗降低了90%,同時将延遲30s降低到了3s以内。目前還在做 Flink 對 Spark Bucket Table 的支援。

場景三:實時的ETL

對于 ETL 處理工作而言,存在的一個痛點就是字典表存儲在 HDFS 中,并且是不斷變化的,而實時的資料流需要與字典表進行 join。字典表的變化是由離線批處理任務引起的,目前的做法是使用ContinuousFileMonitoringFunction 和 ContinuousFileReaderOperator 定時監聽 HDFS 資料變化,不斷地将新資料刷入,使用最新的資料去做 join 實時資料。

我們計劃做更加通用的方式,去支援 Hive 表和 stream 的 join,實作Hive表資料變化之後,資料自動推送的效果。

三、Flink On K8S

在唯品會内部有一些不同的計算架構,有實時計算的,有機器學習的,還有離線計算的,是以需要一個統一的底層架構來進行管理,是以将 Flink 遷移到了 K8S 上。

在 K8S 上使用了思科的網絡元件,每個docker容器都有獨立的 ip,對外也是可見的。實時平台的融合器整體架構如下圖所示。

唯品會在K8S上的實作方案與 Flink 社群提供的方案差異還是很大的。唯品會使用 K8S StatefulSet 模式部署,内部實作了cluster相關的一些接口。一個job對應一個mini cluster,并且支援HA。對于Flink來說,使用 StatefulSet 的最大的原因是 pod 的 hostname 是有序的;這樣潛在的好處有:

1.hostname為-0和-1的pod可以直接指定為jobmanager;可以使用一個statefulset啟動一個cluster,而deployment必須2個;Jobmanager和TaskManager分别獨立的deployment。

  1. pod由于各種原因fail後,由于StatefulSet重新拉起的pod的hostname不變,叢集recover的速度理論上可以比deployment更快(deployment每次主機名随機)。

鏡像的docker entrypoint腳本裡面需要設定的環境變量設定說明:

環境變量名稱 參數 示例内容 說明
JOB_MANGER_HOSTS StatefulSet.name-0,StatefulSet.name-1 flink-cluster-0,flink-cluster-1 JM的主機名,短主機名;可以不用FQDN
FLINK_CLUSTER_IDENT namespace/StatefulSet.name default/flink-cluster 用來做zk ha設定和hdfs checkpiont的根目錄
TASK_MANAGER_NUMBER_OF_TASK_SLOTS containers.resources.cpu.limits 2 TM的slot數量,根據resources.cpu.limits來設定
FLINK_ZK_QUORUM env:FLINK_ZK_QUORUM 10.198.199.112:2181 HA ZK的位址
JOB_MANAGER_HEAP_MB env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 4096 JM的Heap大小,由于存在堆外記憶體,需要小于container.resources.memory.limits;否則容易OOM kill
TASK_MANAGER_HEAP_MB env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 TM的Heap大小,由于存在Netty的堆外記憶體,需要小于container.resources.memory.limits;否則容易OOM kill

對應 Flink 叢集所依賴的 HDFS 等其他配置,則通過建立 configmap 來管理和維護。

kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml

四、後續計劃

目前實時系統,機器學習平台要處理的資料分布在各種資料存儲元件中,如Kafka、Redis、Tair和HDFS等,如何友善高效的通路,處理,共享這些資料是一個很大的挑戰,對于目前的資料通路和解析常常需要耗費很多的精力,主要的痛點包括:

  1. 對于Kafka,Redis,Tair中的 binary(PB/Avro等格式)資料,使用者無法快速直接的了解資料的 schema 與資料内容,采集資料内容及與寫入者的溝通成本很高。
  2. 由于缺少獨立的統一資料系統服務,對Kafka,Redis,Tair等中的binary資料通路需要依賴寫入者提供的資訊,如proto生成類,資料格式wiki定義等,維護成本高,容易出錯。
  3. 缺乏 relational schema 使得使用者無法直接基于更高效易用的 SQL 或 LINQ 層 API 開發業務。
  4. 無法通過一個獨立的服務友善的釋出和共享資料。
  5. 實時資料無法直接提供給Batch SQL引擎使用。
  6. 此外,對于目前大部分的資料源的通路也缺少審計,權限管理,通路監控,跟蹤等特性。

UDM(統一資料管理系統) 包括 Location Manager, Schema Metastore 以及 Client Proxy 等子產品,主要的功能包括:

  1. 提供從名字到位址的映射服務,使用者通過抽象名字而不是具體位址通路資料。
  2. 使用者可以友善的通過Web GUI界面友善的檢視資料Schema,探查資料内容。
  3. 提供支援審計,監控,溯源等附加功能的Client API Proxy。
  4. 在Spark/Flink/Storm等架構中,以最适合使用的形式提供這些資料源的封裝。

UDM的整體架構如下圖所示。

UDM的使用者包括實時,機器學習以及離線平台中資料的生産者和使用者。在使用Sql API或Table API的時候,首先完成Schema的注冊,之後使用Sql進行開發,降低了開發代碼量。

以Spark通路Kafka PB資料的時序圖來說明UDM的内部流程

在Flink中,使用UDMExternalCatalog來打通Flink計算架構和UDM之間的橋梁,通過實作ExternalCatalog的各個接口,以及實作各自資料源的TableSourceFactory,完成Schema和接入管控等各項功能。

繼續閱讀