天天看點

「深入淺出 Yarn 架構與實作」2-4 Yarn 基礎庫 - 狀态機庫

作者:Java網際網路技術棧

當一個服務擁有太多處理邏輯時,會導緻代碼結構異常的混亂,很難分辨一段邏輯是在哪個階段發揮作用的。

這時就可以引入狀态機模型,幫助代碼結構變得清晰。

一、狀态機庫概述

一)簡介

狀态機由一組狀态組成:

【初始狀态 -> 中間狀态 -> 最終狀态】。

在一個狀态機中,每個狀态會接收一組特定的事件,根據事件類型進行處理,并轉換到下一個狀态。當轉換到最終狀态時則退出。

二)狀态轉換方式

狀态間轉換會有下面這三種類型:

「深入淺出 Yarn 架構與實作」2-4 Yarn 基礎庫 - 狀态機庫

三)Yarn 狀态機類

在 Yarn 中提供了一個工廠類 StateMachineFactory 來幫助定義狀态機。如何使用,我們直接寫個 demo。

「深入淺出 Yarn 架構與實作」2-4 Yarn 基礎庫 - 狀态機庫

二、案例 demo

在上一篇文章《Yarn 服務庫和事件庫》案例基礎上進行擴充,增加狀态機庫的内容。如果還不了解服務庫和事件庫的同學,建議先學習下上一篇文章。

案例已上傳至 github,有幫助可以點個 ⭐️

https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo

一)狀态機實作

狀态機實作,可以直接嵌入到上篇文章中的 AsyncDispatcher使用。

這裡僅給出狀态機JobStateMachine以及各種事件處理的代碼。完整的代碼項目執行,請到 github demo 中檢視。

import com.shuofxz.event.JobEvent;
import com.shuofxz.event.JobEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;

import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/*
* 可參考 Yarn 中實作的狀态機對象:
* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,
* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,
* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
* */
@SuppressWarnings({"rawtypes", "unchecked"})
public class JobStateMachine implements EventHandler<JobEvent> {
    private final String jobID;
    private EventHandler eventHandler;
    private final Lock writeLock;
    private final Lock readLock;

    // 定義狀态機
    protected static final StateMachineFactory<JobStateMachine, JobStateInternal,
            JobEventType, JobEvent>
            stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
            .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
            .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
            .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
            .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
            .installTopology();

    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;

    public JobStateMachine(String jobID, EventHandler eventHandler) {
        this.jobID = jobID;

        // 多線程異步處理,state 有可能被同時讀寫,使用讀寫鎖來避免競争
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();

        this.eventHandler = eventHandler;
        stateMachine = stateMachineFactory.make(this);
    }

    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
        return stateMachine;
    }

    public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            // do something...
            // 完成後發送新的 Event —— JOB_START
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
        }
    }

    public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
        }
    }

    public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
        }
    }

    public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {
        @Override
        public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);

            // 這是多結果狀态部分,是以需要人為制定後續狀态
            // 這裡整個流程結束,設定一下對應的狀态
            boolean flag = true;
            if (flag) {
                return JobStateInternal.SUCCEEDED;
            } else {
                return JobStateInternal.KILLED;
            }
        }
    }

    @Override
    public void handle(JobEvent jobEvent) {
        try {
            // 注意這裡為了避免靜态條件,使用了讀寫鎖
            writeLock.lock();
            JobStateInternal oldState = getInternalState();
            try {
                getStateMachine().doTransition(jobEvent.getType(), jobEvent);
            } catch (InvalidStateTransitionException e) {
                System.out.println("Can't handle this event at current state!");
            }
            if (oldState != getInternalState()) {
                System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());
            }

        } finally {
            writeLock.unlock();
        }
    }

    public JobStateInternal getInternalState() {
        readLock.lock();
        try {
            return getStateMachine().getCurrentState();
        } finally {
            readLock.unlock();
        }
    }

    public enum JobStateInternal {
        NEW,
        SETUP,
        INITED,
        RUNNING,
        SUCCEEDED,
        KILLED
    }
}
           

二)狀态機可視化

hadoop 中提供了狀态機可視化的工具類 VisualizeStateMachine.java,可以拷貝到我們的工程中使用。

根據提示,運作需要三個參數:

Usage: %s <GraphName> <class[,class[,...]]> <OutputFile>%n
           
「深入淺出 Yarn 架構與實作」2-4 Yarn 基礎庫 - 狀态機庫

運作後會在項目根目錄生成圖檔案 jsm.gv。

需要使用 graphviz工具将 gv 檔案轉換成 png 檔案:

# linux 安裝
yum install graphviz

# mac 安裝
brew install graphviz
           

轉換:

dot -Tpng jsm.gv > jsm.png
           

三)如果不用狀态機庫

【思考】

如果不用狀态機,代碼結構會是什麼樣呢?

下面這樣的代碼,如果要增加或修改邏輯可能就是很痛苦的一件事情了。

// 一堆的函數調用

// 一堆的 if 嵌套

// 或者 switch case
           

三、總結

本節對 Yarn 狀态機庫進行了介紹。實際使用時會結合事件庫、服務庫一同使用。

狀态機庫的使用幫助代碼結構更加的清晰,新增狀态處理邏輯隻需要增加一個狀态類别,或者增加一個方法處理對應類型的事件即可。将整個處理邏輯進行了拆分,便于編寫和維護。

繼續閱讀