天天看點

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

1

背景

随着阿裡雲Flink執行個體的遷移下雲以及新增需求接入,自建Flink平台規模逐漸壯大,目前總計已超4萬核運作在自建的K8S叢集中,然而 Flink 任務數的增加,特别是大狀态任務,每次Checkpoint 時會産生脈沖式帶寬占用,峰值流量超過100Gb/s,早期使用阿裡雲OSS作為Checkpoint資料存儲,單個Bucket 每 1P資料量隻有免費帶寬10Gb/s,超出部分單獨計費,目前規模每月需要增加1x w+/月。

為了控制這部分成本,得物開展了自建HDFS在Flink Checkpoint場景下的落地工作,實作年度成本節省xxx萬元。

此次分享自建HDFS在實時計算checkpoint場景的實踐經驗,希望能為讀者提供一些參考。

2

Flink Checkpoint 介紹

2.1 Flink裡的Checkpoint是什麼?

Checkpoint:簡單的說,在某一時刻,将 Flink 任務本地機器中存儲在狀态後端的狀态去同步到遠端檔案存儲系統(比如 HDFS)的過程就叫 Checkpoint。

狀态後端:做狀态資料持久化的工具就叫做狀态後端。比如你在 Flink 中見到的 RocksDB、FileSystem 的概念就是指狀态後端,再引申一下,也可以了解為:應用中有一份狀态資料,把這份狀态資料存儲到 MySQL 中,這個 MySQL 就能叫做狀态後端。

2.2 Checkpoint解決什麼問題?

其實在實時計算中的狀态的功能主要展現在任務可以做到失敗重新開機後沒有資料品質、時效問題。

實時任務一般都是 7x24 小時 Long run 的,挂了之後,就會有以下兩個問題,首先給一個實際場景:一個消費上遊 Kafka,使用 Set 去重計算 DAU 的實時任務。

  • 資料品質問題:當這個實時任務挂了之後恢複,Set空了,這時候任務再繼續從上次失敗的 Offset 消費 Kafka 産出資料,則産出的資料就是錯誤資料了
  • 資料時效問題:一個實時任務,産出的名額是有時效性(主要是時延)要求的。你可以從今天 0 點開始重新消費,但是你回溯資料也是需要時間的。舉例:中午 12 點挂了,實時任務重新回溯 12 個小時的資料能在 1 分鐘之内完成嘛?大多數場景下是不能的!一般都要回溯幾個小時,這就是實時場景中的資料時效問題。
  • 而 Flink的Checkpoint就是把 Set 定期的存儲到遠端 HDFS 上,當任務挂了,我們的任務還可以從 HDFS 上面把這個資料給讀回來,接着從最新的一個 Kafka Offset 繼續計算就可以,這樣即沒有資料品質問題,也沒有資料時效性問題。
  • 2.3 Checkpoint的運作流程?
  1. JM 定時排程 Checkpoint 的觸發,接受到 JM 做 Checkpoint 的請求後,開始做本地 Checkpoint,暫停處理新流入的資料,将新資料緩存起來。
  2. 将任務的本地狀态資料,複制到一個遠端的持久化存儲(HDFS)空間上。
  3. 繼續處理新流入的資料,包括剛才緩存起來的資料。
一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

3

自建HDFS引入

3.1 為什麼用HDFS?

Flink 做為一個成熟的流計算引擎,對外宣稱可以實作 Exactly Once。為了實作業務上的 Exactly Once,Flink 肯定不能丢資料,也就是狀态資料必須保障高可靠性,而HDFS作為是一個分布式檔案系統,具備高容錯率、高吞吐量等特性,是業界使用最廣泛的開源分布式檔案系統,針對大狀态的Checkpoint任務非常契合,帶寬易擴充且成本低廉。

HDFS主要有如下幾項特點:

  • 和本地檔案系統一樣的目錄樹視圖
  • Append Only 的寫入(不支援随機寫)
  • 順序和随機讀
  • 超大資料規模
  • 易擴充,容錯率高

3.2 得物自建HDFS架構

架構層面是典型的主從結構,架構見下圖,核心思想是将檔案按照固定大小進行分片存儲,

  • 主節點:稱為 NameNode,主要存放諸如目錄樹、檔案分片資訊、分片存放位置等中繼資料資訊
  • 從節點:稱為 DataNode,主要用來存分片資料

比如使用者發出了一個1GB的檔案寫請求給HDFS用戶端,HDFS用戶端會根據配置(預設是128MB),對這個檔案進行切分,HDFS用戶端會切分成8個Block,然後詢問NameNode應該将這些切分好的Block往哪幾台DataNode上寫,此後client端和NameNode配置設定的多個DataNode構成pipeline管道,開始以packet為機關向Datanode寫資料。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

4

自建HDFS落地實踐

4.1 叢集規劃

早期使用OSS的主要瓶頸在于帶寬,為了比對将大狀态的任務從OSS遷移到Hdfs帶寬需求,支撐寫入流量100Gib+/s,對比OSS的帶寬成本,結合到成本與帶寬瓶頸考慮,内部大資料d2s.5xlarge機型做了一次性能壓測,單節點吞吐能達到12Gib/s,按100Gib/s預估,算上Buffer,3副本叢集需要xx台機器,滿足現在的帶寬及寫入吞吐需求,最終選擇d2s.5xlarge類型Ecs機器,對應執行個體詳情如下:

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐
一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

4.2 穩定性保障建設

4.2.1 Hdfs元件名額采集

為了確定HDFS叢集的穩定和可靠性,支撐線上實時Flink任務Checkpoint,監控告警建設是必不可少的,我們通過統一的采集程式Hadoop Exporter将叢集裡各元件的JMX資訊換為次元模型,将下述為扁平化的事實名額Jmx資料,轉換為次元結構,比如針對NameNode、DataNode,可以直接将名額使用預定義次元,例如:cluster、instance等次元,并存儲到Prometheus能夠識别的名額資料,存儲為一個二維字典結構,例如: _hadoop_namenode_metrics[名額分類(通常是MBean的名稱)][名額名稱]

4.2.2 名額采集架構

           

結合目前叢集的規模,我們通過集中是Pull的方式采集架構,隻需要啟動時指定叢集Namenode及Jn的Jmx的url資訊,就能采集叢集的所有元件的名額資訊,這樣當有叢集擴充或變更時,會自動采集上報到apm裡,友善運維,具體采集架構如下圖:

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

4.2.3 監控與告警

監控:基于已采集彙報上的名額資料,目前配置了Namenode、Datanode元件核心名額監控大盤,包括HDFS節點健康狀态、HDFS服務健康狀态、資料塊健康狀态、節點的寫入吞吐量等名額。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐
一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

告警:目前監控資料已完成接入公司天眼監控平台,我們将影響hdfs服務可用性的名額統一配置了告警模版,比如叢集總的寫入帶寬、Callqueue隊列、DN存活數量、叢集節點基礎io值班等,可以動态覆寫多叢集,實作定制化告警,更加靈活及友善感覺問題,減少故障止損時長,滿足線上HDFS穩定性保障SLA目标。

4.2.4 叢集快速變更能力

随着Hdfs叢集規模的增加,在日常運維過程中,如何做到快速擴、縮容、節點重新開機及配置變更能力,

保障叢集具備快速止損的能力,我們封裝了一整套HDFS的各元件變更能力,包括節點自動上報到cmdb對應應用、叢集資料節點maintenance模式快速無影響重新開機、日常變配等,并內建到ansible playbook,做到叢集擴容在分鐘級完成。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

4.3 遷移到HDFS攻克難關

4.3.1 DN 心跳彙報于删除共用一把寫鎖問題

現象:自建Flink平台大部分大狀态任務遷移後,自建HDFS叢集節點整體的水位各個ecs的網絡帶寬峰值,出現偶發部分任務因checkpiont 寫入失敗問題,報錯資訊如下:

問題定位過程:

1. 根據用戶端日志的堆棧資訊,檢視Namenode的日志找到對應的檔案、塊,發現了錯誤日志,檔案塊在寫入成功後不能及時上報,塊的狀态一直處于not COMPLETE。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

2. 根據上述寫入流程,懷疑問題出現在IBR階段,檢視Namenode監控名額,Namenode處理塊彙報平均時長<10ms,是以猜測問題出在Datanode端,觀察發現,Datanode偶發心跳彙報間隔>30s(正常3s一次),Datanode IBR和心跳都是BPServiceActor線程處理,很可能是心跳阻塞了IBR。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

3. 我們根據猜測的方向,繼續定位什麼原因導緻心跳阻塞了IBR彙報,于是在每台節點上,部署了腳本(見下圖),根據Datanode的Jmx名額監聽本節點心跳間隔,大于10s時就列印Datanode的Jstack。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

4. 分析多個Jstack代碼(具體内容見下),可以發現BPServiceActor線程被CommandProcessingThread線程阻塞,而CommandProcessingThread線程在調用invalidate()方法,而invalidate()是在調用删除操作。

"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)
        at java.lang.Thread.run(Thread.java:748)


   Locked ownable synchronizers:
        - None
        
"Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]
   java.lang.Thread.State: RUNNABLE
        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
        at java.io.File.isDirectory(File.java:858)
        at java.io.File.toURI(File.java:741)
        at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$Lambda$75/2086554487.run(Unknown Source)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)


   Locked ownable synchronizers:
        - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)
        - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)           

結合堆棧資訊定位到代碼,确實發現processCommandFromActor方法在執行删除(調用invalidate()方法)操作時與心跳彙報updateActorStatesFromHeartbeat方法共用同一把寫鎖。

class BPOfferService {
private final Lock mWriteLock = mReadWriteLock.writeLock();
void writeLock() {
  mWriteLock.lock();
}


void writeUnlock() {
  mWriteLock.unlock();
}


void updateActorStatesFromHeartbeat(
    BPServiceActor actor,
    NNHAStatusHeartbeat nnHaState) {
  writeLock();
  try {
//... 心跳彙報
  } finally {
    writeUnlock();
  }
}
boolean processCommandFromActor(DatanodeCommand cmd,
    BPServiceActor actor) throws IOException {
  assert bpServices.contains(actor);
// ...省略
  writeLock();
  try {
//...執行删除邏輯
  } finally {
    writeUnlock();
  }
}
}           

5. 确認問題:檢視Namenode審計日志發現,叢集持續有大量檔案删除(Flink删除過期Checkpoint meta檔案)操作,修改Datanode端代碼,在調用processCommandFromActive方法超過一定10s後列印調用時長與CommandAction日志。檢視datanode日志發現确實存在删除操作大于30s的情況,由此确認問題就是出現在删除操作耗時過長影響了Datanode的增量塊彙報。

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

問題解決方案

找到問題就是出現在BPServiceActor 線程做了太多的事,包含FBR、IBR、心跳彙報,而且心跳彙報和删除共同持有一把寫鎖,那解決方案一個就把這兩把鎖進行拆分,一個就是将IBR邏輯單獨獨立出來,不受心跳彙報影響。

而社群3.4.0版本已經将IBR從BPServiceActor 線程獨立出來了,所有我們最終将HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不會被invalidate()阻塞,問題得到根治!

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

5

總結與規劃

總結:Oss的流量已從早期137Gib/s降低到30Gib/s左右(下圖一),自建Hdfs叢集峰值流量達到120Gb/s(下圖二),且平穩運作

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐
一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

整個項目已完成全部大狀态任務從Oss遷移到自建Hdfs,目前Hdfs叢集規模xx台,成本x w/月,原OSS帶寬費用阿裡報價1x w/月,相比節省xx w/月。

未來規劃:對于全量 checkpoint 來說,TM 将每個 Checkpoint 内部的資料都寫到同一個檔案,而對于 RocksDBStateBackend 的增量 Checkpoint 來說,則會将每個 sst 檔案寫到一個分布式系統的檔案内,當作業量很大,且作業的并發很大時,則會對底層 HDFS 形成非常大的壓力,

針對上面的問題我們未來考慮引入小檔案合并方案降低 HDFS 的壓力,包括 RPC 壓力以及 NameNode 記憶體的壓力。

作者:希賢

來源:微信公衆号:得物技術

出處:https://mp.weixin.qq.com/s/eEMXptWvhNDOptYgcSOi2Q

繼續閱讀