上一篇文章我們探讨了基于定時任務的周期性檢查點觸發機制以及基于Akka的
actor
模型的消息驅動協同機制。這篇文章我們将探讨Zookeeper在Flink的
Fault Tolerance
所起到的作用。
其實,Flink引入Zookeeper的目的主要是讓
JobManager
實作高可用(leader選舉)。
因為Zookeeper在Flink裡存在多種應用場景,本篇我們還是将重心放在 Fault Tolerance
上,即講解Zookeeper在檢查點的恢複機制上發揮的作用。
如果用一幅圖表示快照機制(檢查點)大緻的流程可見下圖:
跟本文相關的主要有4,5,6三步
兩種恢複模式
因為跟本文切實相關,是以先介紹一下
JobManager
的
RecoveryMode
(恢複模式)。
RecoveryMode
作為一個枚舉類型,它有兩個枚舉值:
- STANDALONE
- ZOOKEEPER
STANDALONE
表示不對
JobManager
的失敗進行恢複。而
ZOOKEEPER
表示
JobManager
将基于Zookeeper實作HA(高可用)。
兩種類型的檢查點
在前面的文章中已經提及過Flink裡的檢查點分為兩種:
PendingCheckpoint
(正在處理的檢查點)和
CompletedCheckpoint
(完成了的檢查點)。
PendingCheckpoint
表示一個檢查點已經被建立,但還沒有得到所有該應答的
task
的應答。一旦所有的
task
都給予應答,那麼它将會被轉化為一個
CompletedCheckpoint
。
PendingCheckpoint
通過
toCompletedCheckpoint
執行個體方法來将其轉化為已完成了的檢查點。其核心實作如下:
if (notYetAcknowledgedTasks.isEmpty()) {
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));
dispose(null, false);
return completed;
}
它會檢查還沒有ack該檢查點的
task
集合,如果集合為空(即所有
task
都已應答),則基于目前執行個體的屬性建構一個
CompletedCheckpoint
的執行個體,并最終傳回新建立的執行個體。但在傳回之前,調用了
dispose
進行資源釋放。
這個
dispose
方法是一個私有方法,其内部實作依賴于
releaseState
這個flag,上面的
dispose
調用将其置為
false
,意為不釋放
task
狀态:
if (releaseState) {
for (StateForTask state : collectedStates) {
state.discard(userClassLoader);
}
}
但最終,
collectedStates
這個集合總是會被清空:
collectedStates.clear();
notYetAcknowledgedTasks.clear();
toCompletedCheckpoint
方法為什麼不釋放
task
的狀态呢,因為它的語義隻是提供轉化操作,其實
collectedStates
這個集合已經在構造
CompletedCheckpoint
時被深拷貝給
CompletedCheckpoint
的執行個體了。而這些
task
的狀态其最終的釋放,将會由
CompletedCheckpoint
的
discard
方法完成。
PendingCheckpoint
的公共的
discard
方法的實作就會直接釋放收集的狀态集合:
public void discard(ClassLoader userClassLoader) {
dispose(userClassLoader, true);
}
公共的
discard
方法常用于檢查點逾時回收以及當最新的檢查點已經完成時,距離目前時間更久的未完成的檢查點的自動失效。
CompletedCheckpoint
表示一個已經成功完成了得檢查點,當一個檢查點在得到所有要求的
task
的應答之後被認為是一個已完成的檢查點。
已完成的檢查點的存儲
根據
JobManager
的恢複模式,Flink提供了兩種已完成的檢查點的存儲機制的實作:
- StandaloneCompletedCheckpointStore
- ZooKeeperCompletedCheckpointStore
他們都實作了接口
CompletedCheckpointStore
,這個接口提供了思個值得關注的方法:
- recover :用于恢複可通路的檢查點
的執行個體CompletedCheckpoint
- addCheckpoint :将已完成的檢查點加入到檢查點集合
- getLatestCheckpoint :獲得最新的檢查點
- discardAllCheckpoints : 回收所有的已完成的檢查點
針對
RecoveryMode
為
STANDALONE
提供了
StandaloneCompletedCheckpointStore
。它提供了一個基于JVM堆記憶體的
ArrayDeque
來存放檢查點。
而針對
RecoveryMode
為
ZOOKEEPER
提供的
ZooKeeperCompletedCheckpointStore
要複雜得多。這也是我們關注的重點。它的實作依賴于兩個存儲機制:
在Zookeeper中的分布式存儲:
private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
本地JVM記憶體中的存儲:
private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
我們先來看恢複方法
recover
,恢複的過程首先是從Zookeeper擷取所有的檢查點,這裡為了規避并發修改帶來的失敗,采用了循環重試的機制:
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
break;
}
catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
}
在恢複時,将從Zookeeper中讀取最新的檢查點,如果檢查點超過一個,僅僅最新的那個檢查點有效,舊的都會被丢棄。如果存在着網絡分區,多個
JobManager
的執行個體并發對相同的程式實行檢查點,那麼選擇任意一個驗證通過的已完成的檢查點都是沒有問題的。
if (numberOfInitialCheckpoints > ) {
// Take the last one. This is the latest checkpoints, because path names are strictly
// increasing (checkpoint ID).
Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
.get(numberOfInitialCheckpoints - );
CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
checkpointStateHandles.add(latest);
LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);
for (int i = ; i < numberOfInitialCheckpoints - ; i++) {
try {
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint", e);
}
}
}
而
discardAllCheckpoints
方法會做四件事:
- 疊代每個檢查點,将其從Zookeeper中移除
- discard每個已完成的檢查點
- discard每個存儲的狀态
- 将本地集合清空掉
檢查點編号計數器
每個檢查點都有各自的編号,為
Long
類型。根據
JobManager
的恢複模式分别提供了兩種計數器:
- StandaloneCheckpointIDCounter
- ZooKeeperCheckpointIDCounter
計數器在這裡被認為是一種服務,它具備和
start
方法
stop
StandaloneCheckpointIDCounter
隻是簡單得對
AtomicLong
進行了包裝,因為在這種模式下,
JobManager
幾乎是不可恢複的,是以這麼做就足夠了。
ZooKeeperCheckpointIDCounter
是基于Zookeeper實作的一種分布式原子累加器。具體的做法是每一個計數器,在Zookeeper上建立一個
ZNode
,形如:
/flink/checkpoint-counter/<job-id> 1 [persistent]
....
/flink/checkpoint-counter/<job-id> N [persistent]
在Zookeeper中的檢查點編号被要求是升序的,這可以使得我們在
JobManager
失效的情況下,可以擁有一個共享的跨
JobManager
執行個體的計數器。
值得一提的是,這裡使用的Zookeeper的用戶端是
CuratorFramework
,同時還利用了它附帶的
SharedCount
這一
recipes
來作為分布式共享的計數器。
而在累加接口方法
getAndIncrement
的實作上,使用了循環嘗試的機制:
public long getAndIncrement() throws Exception {
while (true) {
ConnectionState connState = connStateListener.getLastState();
if (connState != null) {
throw new IllegalStateException("Connection state: " + connState);
}
VersionedValue<Integer> current = sharedCount.getVersionedValue();
Integer newCount = current.getValue() + ;
if (sharedCount.trySetCount(current, newCount)) {
return current.getValue();
}
}
}
另外從
stop
方法的實作來看,如果一個計數器停止,則會再Zookeeper中删除其對應的
ZNode
。
檢查點恢複服務
所謂的檢查點恢複服務,其實就是聚合了上面的已完成的檢查點存儲以及檢查點編号計數器這兩個功能。因為Flink提供了
STANDALONE
以及
ZOOKEEPER
這兩個恢複模式,是以這裡存在一個基于不同模式建立服務的工廠接口
CheckpointRecoveryFactory
。并針對這兩種恢複模式分别提供了兩個工廠:
StandaloneCheckpointRecoveryFactory
以及
ZooKeeperCheckpointRecoveryFactory
。
具體的功能聚合展現在這兩個方法上:
/**
* Creates a {@link CompletedCheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
throws Exception;
/**
* Creates a {@link CheckpointIDCounter} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @return {@link CheckpointIDCounter} instance for the job
*/
CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
兩個工廠的具體實作并沒有什麼特别的地方。檢查點恢複服務會被
JobManager
使用到。
小結
本篇文章我們主要分析了,Zookeeper在Flink的
Fault Tolerance
機制中發揮的作用。但因為Zookeeper在Flink中得主要用途是實作
JobManager
的高可用,是以裡面的部分内容多少還是跟這一主題有所聯系。
微信掃碼關注公衆号:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)