本文整理自 58 同城實時計算平台負責人馮海濤在 Flink Forward Asia 2020 分享的議題《Flink 在 58 同城應用與實踐》,内容包括:
- 實時計算平台架構
- 實時 SQL 建設
- Storm 遷移 Flink 實踐
- 一站式實時計算平台
- 後續規劃
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
一、實時計算平台架構
實時計算平台的定位是為 58 集團海量資料提供高效、穩定的實時計算一站式服務。一站式服務主要分為三個方向:
- 第一個方向是實時資料存儲,主要負責為線上業務接入提供高速度的實時存儲能力。
- 第二是實時資料計算,主要為海量資料的處理提供分布式計算架構。
- 第三是實時資料分發,主要負責将計算後的資料分發到後續的實時存儲,供上層應用。
平台建設主要分為兩個部分:
- 第一部分是基礎能力建設,目前主要包括 Kafka 叢集、storm 叢集、 Flink 叢集、SparkStreaming 叢集。
- 另一部分是平台化建設,主要是包括兩點:
-
第一個是資料分發,我們的資料分發是基于 Kafka Connect 打造的一個平台,目标是實作異構資料源的內建與分發。在實際使用資料場景過程中,經常需要将不同的資料源彙聚到一起進行計算分析。
傳統方式可能需要針對不同的存儲采用不同的資料同步方案。我們的資料分發是通過提供一套完整的架構,實作不同資料源的內建和分發。
- 第二個是我們基于 Flink 打造的一站式實時計算平台,後文會有詳細的介紹。
-
上圖是我們的實時計算平台的架構。
- 在實時資料接入這部分,我們采用的是 Kafka,binlog 提供 canal 和 debezium 兩種方式進行接入。
- 在業務日志這部分,我們主要采用 flume 進行線上業務的 log 的采集。
- 在實時計算引擎這部分,根據開源社群發展以及使用者的需求,從最早的 Storm 到後來引入 SparkStreaming,以及現在主流的 Flink。
- 在實時存儲這部分,為了滿足多元化的實時需求,我們支援 Kafka、Druid、Hbase、ES、ClickHouse。
- 同時在計算架構之上,我們建設了一些管理平台,比如叢集管理,它主要負責叢集的擴容,穩定性的管理。
- 另一個是 Nightfury,主要負責叢集治理,包括資料接入、權限治理、資源管理等等。
我們在業務發展過程中,引入了 Flink 計算架構。首先從業務來說,58 是一個一站式生活服務平台,包含很多業務線。随着業務的發展,資料量越來越大,場景越來越豐富,需要一個更加強大的計算架構來滿足使用者的需求。
- 第一個場景是實時 ETL,主要是針對原始日志進行資訊轉化,結構化處理,運用于後續計算,需要高吞吐低延遲的計算能力。
- 第二塊是實時數倉,它作為離線數倉的一個補充,主要是提升一些實時名額的時效性。第三種場景是實時監控,它需要比較靈活的時間視窗支援。
- 最後一種場景是實時資料流分析,比如說,資料亂序的處理、中間狀态的管理、Exactly once 語義保障。
我們前期基于 Storm 和 SparkStreaming 建構的計算叢集在很大程度上并不能滿足這些場景需求。于是對 Flink 進行了調研,發現 Flink 不論是在計算性能,還是流資料特性支援上,都展現出了非常大的優勢。是以,我們決定采用 Flink 作為主流的計算架構。
上圖是我們 Flink 叢集的建設情況。Flink 作為實時計算架構,經常需要 7×24 小時的可用性。我們在建設底層叢集的時候,需要考慮高可用的架構。
- 首先在部署模式上,主要是采用 Flink On YARN,實作叢集的高可用。
- 在底層的 HDFS 上,采用 HDFS federation 機制,既可以避免離線叢集的抖動對實時這邊造成影響,同時也減少了維護的 HDFS 數量。
- 在叢集隔離上,主要是采用 Node Labe 機制,就可以實作把重要業務運作在一些指定節點上。同時在這個基礎之上,引入了 Cgroup,對 CPU 進行隔離,避免任務間的 CPU 搶占。
- 在管理層面,不同的業務送出到不同的隊列進行管理,避免業務間的資源搶占。
- 在計算場景上,根據不同的計算場景,比如說計算型、IO 型,會送出到不同的節點,進而提升整個叢集的資源使用率。
Flink 計算架構在 58 經曆了大概兩年多的發展。目前我們的叢集有 900 多台機器,2000 多個實時任務,每天處理大概 2.5 萬億的實時資料,資料量峰值達到了 3000 萬每秒。
二、實時 SQL 建設
1. 實時 SQL 演進
SQL 程式設計具有低門檻、自動優化、版本統一等特點。同時 Flink SQL 作為實時數倉的主要工具,是我們在建設 Flink 平台時考慮的一個主要方向。
我們最早上線的 Flink 是基于 1.6 版本的,當時這個版本隻支援 DML,我們在當時的版本基礎上進行了一些擴充,主要是在 DDL 文法上的擴充支援。在使用者使用層面,為了簡化 DDL 的定義,也通過一個配置化的方式來實作自動生成 DDL。在開發的時候,提供可視化開發的功能和線上調試的功能。
随着社群的開源,我們将 Flink SQL 切換到了社群版本,之後也更新相關的版本,以及合并比較多的社群版本特性,比如說 Blink 相關、批流合一、對 Hive 的支援。
最後針對 Flink SQL 這塊的實時數倉,也做了一些數倉化的工作,主要包括中繼資料管理、血緣關系、數倉分層、權限管理等等。
2. 存儲擴充
關于存儲擴充這一塊,最開始我們是基于 Flink 自己實作的一套 DDL。随着社群開源,切換到社群的 Flink SQL 版本,然後在上面做了一些擴充,主要有幾個方面:
- 第一,打通了主流存儲和内部的實時存儲。比如說,在源表上支援了内部的 wmb,它是一個分布式消息隊列。在維表上支援這種 redis,内部的 wtable。在結果表上支援了 ClickHouse,redis,以及我們内部的 wtable;
- 第二,定制 format 支援。因為在實際業務中,很多資料格式并不是标準的,沒法通過 DDL 來定義一個表。我們提供了一種通用的方式,可以采用一個字段來代表一條日志,讓使用者可以通過 udf 去自定義,并解析一條日志。
- 最後,在 source 和 sink DDL 定義基礎上,增加了并發度的設定。這樣使用者就可以更靈活地控制任務的并發。
3. 性能優化
關于性能優化,主要是兩方面:
- 第一個是對 Blink 特性的引進,Blink 提供了大量的特性,比如通過 mini batch 的處理方式,提高任務的吞吐。通過 local global 兩階段聚合,緩解資料熱點問題。還有通過 emit,增強視窗的功能。把這些功能內建到我們的計算平台,使用者通過一些按鈕可以直接打開。
- 另一個是對異步 lO 的應用。在實時數倉化建設過程中,維表之間的關聯是比較大的應用場景,經常因為維表的性能導緻整個任務的吞吐不高。是以我們增加了一個異步 IO 的機制,主要有兩種實作:
- 一種針對目标存儲支援異步 client,直接基于異步 client 來實作。比如 MySQL 和 redis。
-
另一種不支援異步 client 的,我們就借助現成的機制來模拟,同時在這個基礎之上增加了一套緩存的機制,避免所有的資料直接查詢到目标存儲,減少目标存儲的壓力。同時在緩存基礎上,也增加 LRU 機制,更加靈活的控制整個緩存。
同樣,資料寫入這一塊遇到大并發量寫入的時候,盡量提高并發來解決寫入性的問題,這樣就會導緻整個任務的 CPU 使用率比較低,是以就采用單并發度多線程的寫入機制,它的實作是在 sink 算子裡面增加一個 buffer,資料流入到 sink 之後會首先寫入到 buffer,然後會啟動多線程機制去消費這個 buffer,最終寫到存儲裡面。
4. 數倉化建設
實時數倉作為 Flink 的一個比較典型的應用場景,相較于離線數倉它可能存在一些平台化不完善的方面:
- 首先,中繼資料管理功能不完善。
- 然後,Flink SQL 這一塊,對于每個任務我們都可能需要重新定義一個資料表。并且由于資料沒有分層的概念,導緻任務比較獨立,煙囪式開發,資料和資源使用率比較低下。
- 另外,也缺乏資料血緣資訊。
為了提升實時數倉建設的效率,我們提供了面向數倉化實時 SQL 能力,在數倉設計,任務開發,平台化管理方面全面對齊離線數倉的建設模式。
4.1 數倉化
數倉化主要是參考離線數倉的模型,對我們實時數倉這一塊進行模型建設。
比如說,最原始的資料會進入ODS 層,經過一些清洗落入到行為明細層,之後會拆分到具體的主題明細層,然後再将一些相關的維表資訊進行計算,再到彙總層,最終提供給最上層的應用,包括一些實時報表,Ad-hoc 查詢等。
4.2 數倉平台
實時數倉目前主要還是基于這種 Lambda 架構來進行平台化的建設。
- 首先,在中繼資料管理這一塊,Flink 預設采用記憶體對中繼資料進行管理,我們就采用了 HiveCatalog 機制對庫表進行持久化。
- 同時我們在資料庫的權限管理上,借助 Hive ACL 來進行權限管理。
- 有了中繼資料持久化之後,就可以提供全局的中繼資料檢索。
- 同時任務模式就可以由傳統的 DDL+DML 簡化為 DML。
- 最後,我們也做了血緣關系,主要是在 Flink SQL 送出過程中,自動發現 SQL 任務血緣依賴關系。
三、Storm 遷移 Flink 實踐
1. Flink 與 Storm 對比
Flink 相對于 Storm 來說,有比較多的優勢。
- 在資料保障上,Flink 支援 Exactly once 語義,在吞吐量、資源管理、狀态管理,使用者越來越多的基于 Flink 進行開發。
- 而 Storm 對使用者來說,程式設計模型簡單,開發成本高,流式計算特性缺乏,吞吐低無法滿足性能。在平台側,獨立叢集多、運維困難、任務缺少平台化管理、使用者體驗差。
是以我們決定遷移到 Flink。
2. Flink-Storm 工具
在 Storm 遷移到 Flink 的時候,如果讓使用者重新基于 Flink 進行邏輯開發,可能需要比較大的工作量。是以我們對 Flink 進行了調研,發現有個 Flink-Storm 工具。它實作了将 Storm Topology 轉到 Flink Topology。比如說,把 spout 轉換到 Flink 的 source function,把 bolt 轉換到 Transform 和 sink function。
在使用的過程中我們也發現一些問題,Flink-Storm 工具無法支援 Yarn 模式, 缺少 Storm 引擎功能,最後還有一個比較大的問題,我們的 storm 在發展過程中維護了很多版本,但是 Flink-Storm 工具隻支援基于一個版本進行開發。于是,我們做了一些改進。
3. 對 Flink-Storm 的改進
3.1 消息保障
Storm 有三個特點:
- 第一,ack 機制;
- 第二,依賴 zookeeper;
- 第三,at least once 語義保障。
我們做了四點改進:
- 第一,Flink-Storm 去掉 ack 支援;
- 第二,KafkaSpout 實作 CheckpointListener;
- 第三,KafkaSpout 實作 CheckpointedFunction;
- 第四,Flink-Storm 打開 checkpoint。
3.2 對 Storm 定時器的支援
在早期版本裡面其實是沒有視窗機制的,我們借助 Storm 定時機制來實作視窗計算。它的機制是這樣的,Storm 引擎會定時向 bolt 裡面發送一個系統信号,使用者就可以通過這個系統信号進行一個切分,模拟視窗操作。
同樣,Flink 也沒有這樣一個定時器的機制,于是我們就考慮從 Flink-Storm 層面來實作,改造了 BoltWrapper 類,它作為 bolt 類的一個封裝,實作機制跟 bolt 是一樣的,包括 5 點:
- 初始化 open 方式啟動異步線程。
- 模拟構造 tick 的 StreamRecord;
- 調用 processeElement 函數發送 tuple;
- 頻率由外部參數全局控制;
- close 中關閉線程。
3.3 Storm on Yarn
Storm on yarn 并不是直接送出到 YARN 叢集,它隻是送出到 local 或者 stand alone 的模式。Flink on yarn 主要是提供了 ClusterClient 這樣一個代理,實作方式有三個步驟:
- 初始化 YarnClusterConfiguration Flink 配置 執行 jar 包 / 資源配置 加載 classpath;
- 啟動 yarn client;
- 複用 Flink on yarn 機制 deploy 轉換後的 jobGraph。
4. 任務遷移
在完善上述的一些改進之後,遷移就比較容易了。首先我們會把改造後的版本打包,上傳到公司的私服上。然後使用者在他的工程裡面隻需要引入 jar 包。在代碼這一塊,隻需要将原來基于 storm 的送出方式改造成基于 Flink 的送出方式,邏輯是完全不用動的。在任務部署模式這一塊,也提供了 Flink 送出的模式,這樣一個腳本可以實作 Flink Perjob 模式。
總結一下,除了一些比較極端的複雜情況,基本上做到了無縫遷移所有的任務。遷移到 Flink 之後,大部分任務的延遲都降低到毫秒級别,整個吞吐提升 3~5 倍。同時,整體資源節省了大概 40%,約等于 80 台機器。完成了 5 個 storm 叢集完全下線,實作了任務平台化管理。
四、一站式實時計算平台
1. Wstream 平台
我們為了提升管理效率而打造了 Wstream 平台,它建構在底層引擎和上層應用之間,對使用者可以屏蔽底層的叢集資訊,比如跨機房多叢集的一些資訊。
- 在任務接入方式上,支援 Flink Jar,Flink SQL,Flink-Storm,PyFlink 這 4 種方式,來滿足多元化的使用者需求。
- 在産品功能上,主要支援了任務管理、任務的建立、啟動删除等。
- 另外,為了更好的讓使用者管理自己的任務和對任務進行問題定位,我們也提供了一個監控告警和任務診斷的系統。
- 針對數倉,提供了一些數倉平台化的功能,包括權限管理、血緣關系等等。
- 針對 Flink SQL 也提供了調試探查的功能。
使用者可以在 Wstream 平台之上很好的去建構他們的應用。
2. 狀态管理
狀态作為 Flink 一個比較重要的特性,在實際場景中有大量的應用。使用者在使用平台的時候,沒法跟底層的 Flink 工具進行互動,于是我們就将底層的一些能力進行了內建。
- 在任務儲存方面,支援 Checkpoint,Savepoint,Cancel With Savepoint。
- 在容錯方面,支援 allowNonRestoredState,跳過無法恢複的狀态。
- 在分析方面,支援 Queryable State 實時查詢,基于離線的 State Processor 的分析方式,我們會幫使用者把這個狀态下載下傳進行分析。
對于整個任務狀态管理來說,我們通過 jobgraph 設定定向到指定 Hdfs 目錄,進行統一目錄管理。在狀态小檔案這塊,控制并發度,jobgraph 優化,checkpoint 間隔時間,保留版本數量。
3. SQL 調試
針對 Flink SQL,我們也提供了一些調試功能。這裡主要包括兩塊:
- 第一,文法層面的功能包括:
- 智能提示;
- 文法校驗;
- 轉換 graph 邏輯校驗。
- 第二,邏輯層面的功能包括:
- 模拟輸入,DataGen 自定義資料源;
- 結果輸出,Print 重定向到标準輸出。
這樣我們可以更友善的對整個業務邏輯進行調試。
4. 任務監控
關于任務監控,對于 Flink 實時計算任務來說,我們主要關心的是任務的穩定性、性能方面、以及業務邏輯是否符合預期。對于如何監控這些名額,主要包括 4 個層面:
- 第一個是 Flink 自帶的 Flink-metrics,提供大量的資訊,比如流量資訊、狀态資訊、反壓、檢查點、CPU、網絡等等;
- 第二個是 yarn 層面,提供運作時長、任務狀态;
- 第三,從 kafka 層面提供消息堆積;
- 最後,通過使用者自定義的一些 metrics,我們可以了解業務邏輯是否符合預期。
5. 監控體系
為了采集這些名額,我們也基于 Prometheus 搭建了一套監控體系。對于所有的 Flink 任務,會實時将 metrics 推到 pushgateway,然後會将收集到的名額推到 Prometheus,這一塊我們主要是采用的 federation 的機制。所有子節點負責名額采集,之後彙聚到一個中心節點,由中心節點統一對外提供服務。最終可以實作整個名額的計算和告警。
6. 監控告警
有了上面這些名額之後,我們在告警這一塊就可以比較友善。針對實時計算比較關注的任務穩定性方面,我們可以從 Topic 消息消費堆積、任務計算 qps 波動、Flink task Restart、Flink Checkpoint failed、任務失敗、延遲等資訊來觀察整個任務的運作情況。
7. 名額可視化
在名額可視化這一塊,主要是兩個層面:
- 第一個層面是 Job 層面,這一塊主要是把一些比較核心的名額彙聚到我們的實時計算平台。比如說,qps 資訊、輸入輸出的資訊、延遲的資訊等等;
- 對于更底層的 task 級别的 metrics,通過 Grafana 可以了解具體的一些task資訊,比如流量資訊、反壓資訊等。
五、後續規劃
我們的後續規劃,主要包括 4 個方面:
- 第一個是社群比較流行的批流合一。因為我們目前這個實時架構大部分還是基于 Lambda 架構,這種架構會帶來很大的維護工作量,是以我們也希望借助批流合一的能力來簡化架構;
- 第二個是資源調優,因為作為流式計算來說,缺少一些動态資源管理的機制,是以我們也希望有手段來進行這樣一些調優;
- 第三個是智能監控,我們目前的監控和告警是事後的,希望有某種方式在任務出現問題之前進行預警;
- 最後是擁抱社群的新能力,包括對新場景的探索。
熱點推薦
Flink Forward Asia 2021 正式啟動!議題火熱征集中! 30 萬獎金等你來!第三屆 Apache Flink 極客挑戰賽暨 AAIG CUP 報名開始更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
第一時間擷取最新技術文章和社群動态,請關注公衆号~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc