天天看點

Apache Flink fault tolerance源碼剖析(三)兩種恢複模式兩種類型的檢查點已完成的檢查點的存儲檢查點編号計數器檢查點恢複服務小結

上一篇文章我們探讨了基于定時任務的周期性檢查點觸發機制以及基于Akka的

actor

模型的消息驅動協同機制。這篇文章我們将探讨Zookeeper在Flink的

Fault Tolerance

所起到的作用。

其實,Flink引入Zookeeper的目的主要是讓

JobManager

實作高可用(leader選舉)。

因為Zookeeper在Flink裡存在多種應用場景,本篇我們還是将重心放在

Fault Tolerance

上,即講解Zookeeper在檢查點的恢複機制上發揮的作用。

如果用一幅圖表示快照機制(檢查點)大緻的流程可見下圖:

Apache Flink fault tolerance源碼剖析(三)兩種恢複模式兩種類型的檢查點已完成的檢查點的存儲檢查點編号計數器檢查點恢複服務小結
跟本文相關的主要有4,5,6三步

兩種恢複模式

因為跟本文切實相關,是以先介紹一下

JobManager

RecoveryMode

(恢複模式)。

RecoveryMode

作為一個枚舉類型,它有兩個枚舉值:

  • STANDALONE
  • ZOOKEEPER

STANDALONE

表示不對

JobManager

的失敗進行恢複。而

ZOOKEEPER

表示

JobManager

将基于Zookeeper實作HA(高可用)。

兩種類型的檢查點

在前面的文章中已經提及過Flink裡的檢查點分為兩種:

PendingCheckpoint

(正在處理的檢查點)和

CompletedCheckpoint

(完成了的檢查點)。

PendingCheckpoint

表示一個檢查點已經被建立,但還沒有得到所有該應答的

task

的應答。一旦所有的

task

都給予應答,那麼它将會被轉化為一個

CompletedCheckpoint

PendingCheckpoint

通過

toCompletedCheckpoint

執行個體方法來将其轉化為已完成了的檢查點。其核心實作如下:

if (notYetAcknowledgedTasks.isEmpty()) {
    CompletedCheckpoint completed =  new CompletedCheckpoint(jobId, checkpointId,checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));
    dispose(null, false);           
    return completed;
}
           

它會檢查還沒有ack該檢查點的

task

集合,如果集合為空(即所有

task

都已應答),則基于目前執行個體的屬性建構一個

CompletedCheckpoint

的執行個體,并最終傳回新建立的執行個體。但在傳回之前,調用了

dispose

進行資源釋放。

這個

dispose

方法是一個私有方法,其内部實作依賴于

releaseState

這個flag,上面的

dispose

調用将其置為

false

,意為不釋放

task

狀态:

if (releaseState) {
    for (StateForTask state : collectedStates) {
        state.discard(userClassLoader);
    }
}
           

但最終,

collectedStates

這個集合總是會被清空:

collectedStates.clear();
notYetAcknowledgedTasks.clear();
           

toCompletedCheckpoint

方法為什麼不釋放

task

的狀态呢,因為它的語義隻是提供轉化操作,其實

collectedStates

這個集合已經在構造

CompletedCheckpoint

時被深拷貝給

CompletedCheckpoint

的執行個體了。而這些

task

的狀态其最終的釋放,将會由

CompletedCheckpoint

discard

方法完成。

PendingCheckpoint

的公共的

discard

方法的實作就會直接釋放收集的狀态集合:

public void discard(ClassLoader userClassLoader) {
    dispose(userClassLoader, true);
}
           

公共的

discard

方法常用于檢查點逾時回收以及當最新的檢查點已經完成時,距離目前時間更久的未完成的檢查點的自動失效。

CompletedCheckpoint

表示一個已經成功完成了得檢查點,當一個檢查點在得到所有要求的

task

的應答之後被認為是一個已完成的檢查點。

已完成的檢查點的存儲

根據

JobManager

的恢複模式,Flink提供了兩種已完成的檢查點的存儲機制的實作:

  • StandaloneCompletedCheckpointStore
  • ZooKeeperCompletedCheckpointStore

他們都實作了接口

CompletedCheckpointStore

,這個接口提供了思個值得關注的方法:

  • recover :用于恢複可通路的檢查點

    CompletedCheckpoint

    的執行個體
  • addCheckpoint :将已完成的檢查點加入到檢查點集合
  • getLatestCheckpoint :獲得最新的檢查點
  • discardAllCheckpoints : 回收所有的已完成的檢查點

針對

RecoveryMode

STANDALONE

提供了

StandaloneCompletedCheckpointStore

。它提供了一個基于JVM堆記憶體的

ArrayDeque

來存放檢查點。

而針對

RecoveryMode

ZOOKEEPER

提供的

ZooKeeperCompletedCheckpointStore

要複雜得多。這也是我們關注的重點。它的實作依賴于兩個存儲機制:

在Zookeeper中的分布式存儲:

private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
           

本地JVM記憶體中的存儲:

private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
           

我們先來看恢複方法

recover

,恢複的過程首先是從Zookeeper擷取所有的檢查點,這裡為了規避并發修改帶來的失敗,采用了循環重試的機制:

while (true) {
            try {
                initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
                break;
            }
            catch (ConcurrentModificationException e) {
                LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
            }
        }
           

在恢複時,将從Zookeeper中讀取最新的檢查點,如果檢查點超過一個,僅僅最新的那個檢查點有效,舊的都會被丢棄。如果存在着網絡分區,多個

JobManager

的執行個體并發對相同的程式實行檢查點,那麼選擇任意一個驗證通過的已完成的檢查點都是沒有問題的。

if (numberOfInitialCheckpoints > ) {
            // Take the last one. This is the latest checkpoints, because path names are strictly
            // increasing (checkpoint ID).
            Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
                    .get(numberOfInitialCheckpoints - );

            CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);

            checkpointStateHandles.add(latest);

            LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);

            for (int i = ; i < numberOfInitialCheckpoints - ; i++) {
                try {
                    removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
                }
                catch (Exception e) {
                    LOG.error("Failed to discard checkpoint", e);
                }
            }
        }
           

discardAllCheckpoints

方法會做四件事:

  • 疊代每個檢查點,将其從Zookeeper中移除
  • discard每個已完成的檢查點
  • discard每個存儲的狀态
  • 将本地集合清空掉

檢查點編号計數器

每個檢查點都有各自的編号,為

Long

類型。根據

JobManager

的恢複模式分别提供了兩種計數器:

  • StandaloneCheckpointIDCounter
  • ZooKeeperCheckpointIDCounter
計數器在這裡被認為是一種服務,它具備

start

stop

方法

StandaloneCheckpointIDCounter

隻是簡單得對

AtomicLong

進行了包裝,因為在這種模式下,

JobManager

幾乎是不可恢複的,是以這麼做就足夠了。

ZooKeeperCheckpointIDCounter

是基于Zookeeper實作的一種分布式原子累加器。具體的做法是每一個計數器,在Zookeeper上建立一個

ZNode

,形如:

/flink/checkpoint-counter/<job-id> 1 [persistent]
....
/flink/checkpoint-counter/<job-id> N [persistent]
           

在Zookeeper中的檢查點編号被要求是升序的,這可以使得我們在

JobManager

失效的情況下,可以擁有一個共享的跨

JobManager

執行個體的計數器。

值得一提的是,這裡使用的Zookeeper的用戶端是

CuratorFramework

,同時還利用了它附帶的

SharedCount

這一

recipes

來作為分布式共享的計數器。

而在累加接口方法

getAndIncrement

的實作上,使用了循環嘗試的機制:

public long getAndIncrement() throws Exception {
        while (true) {
            ConnectionState connState = connStateListener.getLastState();

            if (connState != null) {
                throw new IllegalStateException("Connection state: " + connState);
            }

            VersionedValue<Integer> current = sharedCount.getVersionedValue();

            Integer newCount = current.getValue() + ;

            if (sharedCount.trySetCount(current, newCount)) {
                return current.getValue();
            }
        }
    }
           

另外從

stop

方法的實作來看,如果一個計數器停止,則會再Zookeeper中删除其對應的

ZNode

檢查點恢複服務

所謂的檢查點恢複服務,其實就是聚合了上面的已完成的檢查點存儲以及檢查點編号計數器這兩個功能。因為Flink提供了

STANDALONE

以及

ZOOKEEPER

這兩個恢複模式,是以這裡存在一個基于不同模式建立服務的工廠接口

CheckpointRecoveryFactory

。并針對這兩種恢複模式分别提供了兩個工廠:

StandaloneCheckpointRecoveryFactory

以及

ZooKeeperCheckpointRecoveryFactory

具體的功能聚合展現在這兩個方法上:

/**
     * Creates a {@link CompletedCheckpointStore} instance for a job.
     *
     * @param jobId           Job ID to recover checkpoints for
     * @param userClassLoader User code class loader of the job
     * @return {@link CompletedCheckpointStore} instance for the job
     */
    CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
            throws Exception;

    /**
     * Creates a {@link CheckpointIDCounter} instance for a job.
     *
     * @param jobId Job ID to recover checkpoints for
     * @return {@link CheckpointIDCounter} instance for the job
     */
    CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
           

兩個工廠的具體實作并沒有什麼特别的地方。檢查點恢複服務會被

JobManager

使用到。

小結

本篇文章我們主要分析了,Zookeeper在Flink的

Fault Tolerance

機制中發揮的作用。但因為Zookeeper在Flink中得主要用途是實作

JobManager

的高可用,是以裡面的部分内容多少還是跟這一主題有所聯系。

微信掃碼關注公衆号:Apache_Flink
Apache Flink fault tolerance源碼剖析(三)兩種恢複模式兩種類型的檢查點已完成的檢查點的存儲檢查點編号計數器檢查點恢複服務小結
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)
Apache Flink fault tolerance源碼剖析(三)兩種恢複模式兩種類型的檢查點已完成的檢查點的存儲檢查點編号計數器檢查點恢複服務小結

繼續閱讀