天天看點

大資料平台的資料同步服務實踐

作者:閃念基因

引言

在大資料系統中,我們往往無法直接對線上系統中的資料直接進行檢索和計算。線上系統所使用關系型資料庫、緩存資料庫存儲資料的方式都非常不同,很多存儲系統并不适合分析型(OLAP)的查詢,也不允許分析查詢影響到線上業務的穩定性。從數倉建設的角度思考,資料倉庫需要依賴于穩定和規範的資料源,資料需要經過采集加工後才能真正被數倉所使用。推動資料同步服務的平台化,才有可能從源頭規範資料的産出。資料同步服務不像資料挖掘一樣可以直接産生價值,但它更像是連接配接線上系統和離線系統的高速公路,好的同步工具可以很大程度上提升資料開發的效率。

本文主要介紹知乎在資料同步這方面的建設,工具選型和平台化的實踐。

業務場景及架構

由于線上業務的資料庫在知乎内部還是以 MySQL 為主,在資料同步的資料源方面主要考慮 MySQL 和 Hive 的互相同步,後續可以考慮支援 HBase。早期資料同步使用 Oozie + Sqoop 來完成,基本滿足業務需求。但是随着資料同步任務的不斷變多,出現了很多重複同步的例子,對同步任務的負載管理也是空白。淩晨同步資料高峰導緻 MySQL 不斷報警,DBA 苦不堪言。對于業務來說,哪些表已經被同步了,哪些表還沒有也是一個黑盒子,依賴其他業務方的資料都隻能靠口頭的約定。為了解決這些問題,決定對資料同步做一個統一的平台,簡化同步任務的配置,排程平衡負載,管理元資訊等等。到現在為止,資料同步平台支撐了上千張表的同步,每天同步的資料量超過 10TB。

技術選型

資料同步工具市面上有很多解決方案,面向批的主要有 和阿裡開源的 ,其他商業的資料同步工具不在本文讨論範圍。下面主要對比這兩種資料同步工具。

Sqoop

Pros:

  • 基于 MapReduce 實作,容易并行和利用現有叢集的計算資源
  • 和 Hive 相容性好,支援 Parquet,ORC 等格式
  • 支援自動遷移 Schema
  • 社群強大,遇到的問題容易解決

Cons:

  • 支援的資料源不算太豐富(比如 ES),擴充難度大
  • 不支援限速,容易對 MySQL 造成壓力

DataX

Pros:

  • 支援的資料源豐富尤其是支援從非關系型資料庫到關系型資料庫的同步
  • 支援限速
  • 擴充友善,插件開發難度低

Cons:

  • 需要額外的運作資源,當任務比較多的時候費機器
  • 沒有原生支援導出到 Hive,需要做很多額外的工作才能滿足需求

考慮到同步本身要消耗不少的計算和帶寬資源,Sqoop 可以更好的利用 Hadoop 叢集的資源,而且和 Hive 适配的更好,最終選擇了 Sqoop 作為資料同步的工具。

平台化及實踐

平台化的目标是建構一個相對通用的資料同步平台,更好的支援新業務的接入,和公司内部的系統內建,滿足業務需求。平台初期設計的目标有以下幾個:

  • 簡單的任務配置界面,友善新的任務接入
  • 監控和報警
  • 屏蔽 MySQL DDL 造成的影響
  • 可擴充新資料源

整體系統架構如下圖:

大資料平台的資料同步服務實踐

API Server 用于提供使用者界面和 RESTFul API。資料源中心存儲資料源資訊,并從真實的資料源定期更新,保持比較新的資料。Scheduler 負責規劃任務的執行資源,保護 MySQL 叢集避免負載過高。Worker 真實的執行任務,分布在多個節點上。

簡化任務接入

平台不應該要求每個使用者都了解底層資料同步的原理,對使用者而言,應該是描述資料源 (Source) 和目标存儲 (Sink),還有同步周期等配置。所有提供的同步任務應該經過稽核,防止未經許可的資料被同步,或者同步配置不合理,增加平台負擔。最後暴露給使用者的 UI 大概如下圖。

大資料平台的資料同步服務實踐

增量同步

對于資料量非常大的資料源,如果每次同步都是全量,對于 MySQL 的壓力會特别大,同步需要的時間也會很長。是以需要一種可以每次隻同步新增資料的機制,減少對于 MySQL 端的壓力。但是增量同步不是沒有代價的,它要求業務在設計業務邏輯和表結構的時候,滿足下面任意條件:

  • 隻插入新資料,不做删除和修改(類似日志)
  • 隻有插入和更新操作,删除操作通過一個标志位的更新做軟删除代替,同時資料庫有一個字段用來标記該行記錄最後更新的時間戳

如果滿足上面條件,資料量比較大的表就可以采用增量同步的方式拉取。小資料量的表不需要考慮增量同步,因為資料和合并也需要時間,如果收益不大就不應該引入額外的複雜性。一個經驗值是行數 <= 2000w 的都屬于資料量比較小的表,具體還取決于存儲的資料内容(比如有很多 Text 類型的字段)。

處理 Schema 變更

做資料同步永遠回避不掉的一個問題就是 Schema 的變更,對 MySQL 來說,Schema 變更就是資料庫的 DDL 操作。資料同步平台應該盡可能屏蔽 MySQL DDL 對同步任務的影響,并且對相容的變更,及時變更推送到目标存儲。

資料同步平台會通過資料源中心定時的掃描每個同步任務上遊的資料源,儲存目前 Schema 的快照,如果發現 Schema 發生變化,就通知下遊做出一樣的變更。絕大部分的 DDL 還是增加字段,對于這種情況資料同步平台可以很好屏蔽變更對數倉的影響。對于删除字段的操作原則上禁止的,如果一定要做,需要走變更流程,通知到依賴該表的業務方,進行 Schema 同步的調整。

和排程平台的內建

MySQL 的資料通常會作為後續 ETL 的資料源,位于整個資料鍊路的最頂端。知乎内部自研了離線任務排程器,根據資料的依賴關系自動解析任務的依賴,按照合理的順序啟動 ETL 任務。資料同步平台和排程平台打通後,可以在每個同步任務結束後,通知排程器啟動下遊的後繼任務,而不用依賴平台和使用者口頭約定啟動時間。如果資料同步出現延時,排程器也可以很好的屏蔽這個問題。待資料同步恢複後,資料鍊路也随之恢複。

監控和報警

根據 USE 原則,大概整理出下面幾個需要監控的名額:

  • MySQL 機器的負載,IOPS,出入帶寬
  • 排程隊列長度,Yarn 送出隊列長度
  • 任務執行錯誤數

報警更多是針對隊列飽和度和同步錯誤進行的

平台優化和實踐

資源管理

當同步任務越來越多時,單純的按照任務啟動時間來觸發同步任務已經不能滿足需求。資料同步應該保證對于線上業務沒有影響,在此基礎上速度越快越好。這裡本質上是讓 Sqoop 充分又不過度利用 MySQL 的 IOPS,快速拉取資料同時避免資源過度競争。為了避免資料同步對線上服務的影響,對于需要資料同步的 MySQL 單獨建立一個從節點,隔離線上流量,隻提供給資料同步和業務離線查詢使用。除此之外,需要一個排程政策來決定一個任務何時執行。由于任務的總數量并不多,但是每個任務可能會執行非常長的時間,對排程器的壓力并不大,最終決定采用類似 YARN 的一個中央式的資源排程器,排程器的狀态都持久化在資料庫中,友善重新開機或者故障恢複。最終架構圖如下

大資料平台的資料同步服務實踐

最終任務的排程流程如下:

  1. 每個 MySQL 執行個體是排程器的一個隊列,根據同步的元資訊決定該任務屬于哪個隊列
  2. 根據要同步資料量預估資源消耗,向排程器申請該隊列對應大小的資源
  3. 排程器将任務送出到執行隊列,沒有意外的話會立刻開始執行
  4. Monitor 定時向排程器彙報 MySQL 節點的負載,如果負載過高就停止向該隊列送出新的任務
  5. 任務結束後向排程器釋放資源

從早期依靠 Crontab 排程任務到引入排程器,MySQL 叢集的資源被更充分的利用。在資料同步高峰期基本不會出現負載空置的情況,任務的平均執行時間隻有原先的一半。對 DBA 來說,MySQL 叢集的負載報警也大幅減少。

存儲格式

Hive 預設的格式是 Textfile,這是一種類似 CSV 的存儲方式,所有資料以文本的形式存儲,但是對于 OLAP 查詢來說壓縮比太低,查詢性能不好。通常我們會選擇一些列式存儲提高存儲和檢索的效率。Hive 中比較成熟的列式存儲格式有 Parquet 和 ORC。這兩個存儲的查詢性能相差不大,但是 ORC 和 Hive 內建更好而且對于非嵌套資料結構查詢性能是優于 Parquet 的。但是知乎内部因為也用了 Impala,早期的 Impala 版本不支援 ORC 格式的檔案,為了相容 Impala 最終選擇了 Parquet 作為預設的存儲格式。

關于列式存儲的原理和 Benchmark,可以參考這個 Slide

針對不同的資料源選擇合适的并發數

Sqoop 是基于 MapReduce 實作的,送出任務前先會生成 MapReduce 代碼,然後送出到 Hadoop 叢集。Job 整體的并發度就取決于 Mapper 的個數。Sqoop 預設的并發數是 4,對于資料量比較大的表的同步顯然是不夠的,對于資料量比較小的任務又太多了,這個參數一定要在運作時根據資料源的元資訊去動态決定。

優化 避免任務啟動對 HDFS 的壓力

在平台上線後,随着任務越來越多,發現如果 HDFS 的性能出現抖動,對同步任務整體的執行時間影響非常大,導緻夜間的很多後繼任務受到影響。開始推測是資料寫入 HDFS 性能慢導緻同步出現延時,但是任務大多數會卡在送出階段。随着進一步排查,發現 MapReduce 為了解決不同作業依賴問題,引入了 Distributed Cache 機制可以将 Job 依賴的 lib 上傳到 HDFS,然後再啟動作業。Sqoop 也使用了類似的機制,會依賴 Hive 的相關 lib,這些依賴加起來有好幾十個檔案,總大小接近 150MB,雖然對于 HDFS 來說是很小數字,但是當同步任務非常多的時候,叢集一點點的性能抖動都會導緻排程器的吞吐大幅度下降,最終同步的産出會有嚴重延時。最後的解決方法是将 Sqoop 安裝到叢集中,然後通過 Sqoop 的參數 --skip-distcache避免在任務送出階段上傳依賴的 jar。

關閉推測執行(Speculative Execution)

所謂推測執行是這樣一種機制:在叢集環境下運作 MapReduce,一個 job 下的多個 task 執行速度不一緻,比如有的任務已經完成,但是有些任務可能隻跑了10%,這些任務将會成為整個 job 的短闆。推測執行會對運作慢的 task 啟動備份任務,然後以先運作完成的 task 的結果為準,kill 掉另外一個 task。這個政策可以提升 job 的穩定性,在一些極端情況下加快 job 的執行速度。

Sqoop 預設的分片政策是按照資料庫的主鍵和 Mapper 數量來決定每個分片拉取的資料量。如果主鍵不是單調遞增或者遞增的步長有大幅波動,分片就會出現資料傾斜。對于一個資料量較大的表來說,适度的資料傾斜是一定會存在的情況,當 Mapper 結束時間不均而觸發推測執行機制時,MySQL 的資料被重複且并發的讀取,占用了大量 io 資源,也會影響到其他同步的任務。在一個 Hadoop 叢集中,我們仍然認為一個節點不可用導緻整個 MapReduce 失敗仍然是小機率事件,對這種錯誤,在排程器上增加重試就可以很好的解決問題而不是依賴推測執行機制。

展望

資料同步發展到比較多的任務後,新增的同步任務越來越多,删除的速度遠遠跟不上新增的速度,總體來說同步的壓力會越來越大,需要一個更好的機制去發現無用的同步任務并通知業務删除,減輕平台的壓力。

另外就是資料源的支援不夠,Hive 和 HBase、ElasticSearch 互通已經成了一個呼聲很強烈的需求。Hive 雖然可以通過挂外部表用 SQL 的方式寫入資料,但是效率不高有很難控制并發,很容易影響到線上叢集,需要有更好的實作方案才能在生産環境真正的運作起來。

另外這裡沒有談到的一個話題就是流式資料如何做同步,一個典型的場景就是 Kafka 的日志實時落地然後實時進行 OLAP 的查詢,或者通過 MySQL binlog 實時更新 Kudu 或者 ElasticSearch。關于這塊的基礎設定知乎也在快速建設中,非常歡迎感興趣同學投履歷到 [email protected] ,加入知乎大資料架構組。

參考資料

作者:lfyzjck

出處:https://zhuanlan.zhihu.com/p/50343423

繼續閱讀