天天看點

2. SOFAJRaft源碼分析—JRaft的定時任務排程器是怎麼做的?

看完這個實作之後,感覺還是要多看源碼,多研究。其實JRaft的定時任務排程器是基于Netty的時間輪來做的,如果沒有看過Netty的源碼,很可能并不知道時間輪算法,也就很難想到要去使用這麼優秀的定時排程算法了。

對于介紹RepeatedTimer,我拿Node初始化的時候的electionTimer進行講解

this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {

    @Override
    protected void onTrigger() {
        handleElectionTimeout();
    }

    @Override
    protected int adjustTimeout(final int timeoutMs) {
        //在一定範圍内傳回一個随機的時間戳
        //為了避免同時發起選舉而導緻失敗
        return randomTimeout(timeoutMs);
    }
};
           

構造器

由electionTimer的構造方法可以看出RepeatedTimer需要傳入兩個參數,一個是name,另一個是time

//timer是HashedWheelTimer
private final Timer        timer;
//執行個體是HashedWheelTimeout
private Timeout            timeout;

public RepeatedTimer(String name, int timeoutMs) {
    //name代表RepeatedTimer執行個體的種類,timeoutMs是逾時時間
    this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}

public RepeatedTimer(String name, int timeoutMs, Timer timer) {
    super();
    this.name = name;
    this.timeoutMs = timeoutMs;
    this.stopped = true;
    this.timer = Requires.requireNonNull(timer, "timer");
}
           

在構造器中會根據傳進來的值初始化一個name和一個timeoutMs,然後執行個體化一個timer,RepeatedTimer的run方法是由timer進行回調。在RepeatedTimer中會持有兩個對象,一個是timer,一個是timeout

啟動RepeatedTimer

對于一個RepeatedTimer執行個體,我們可以通過start方法來啟動它:

public void start() {
    //加鎖,隻能一個線程調用這個方法
    this.lock.lock();
    try {
        //destroyed預設是false
        if (this.destroyed) {
            return;
        }
        //stopped在構造器中初始化為ture
        if (!this.stopped) {
            return;
        }
        //啟動完一次後下次就無法再次往下繼續
        this.stopped = false;
        //running預設為false
        if (this.running) {
            return;
        }
        this.running = true;
        schedule();
    } finally {
        this.lock.unlock();
    }
}
           

在調用start方法進行啟動後會進行一系列的校驗和指派,從上面的指派以及加鎖的情況來看,這個是隻能被調用一次的。然後會調用到schedule方法中

private void schedule() {
    if(this.timeout != null) {
        this.timeout.cancel();
    }
    final TimerTask timerTask = timeout -> {
        try {
            RepeatedTimer.this.run();
        } catch (final Throwable t) {
            LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
        }
    };
    this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
}
           

如果timeout不為空,那麼會調用HashedWheelTimeout的cancel方法。然後封裝一個TimerTask執行個體,當執行TimerTask的run方法的時候會調用RepeatedTimer執行個體的run方法。然後傳入到timer中,TimerTask的run方法由timer進行調用,并将傳回值指派給timeout。

如果timer調用了TimerTask的run方法,那麼便會回調到RepeatedTimer的run方法中:

RepeatedTimer#run

public void run() {
    //加鎖
    this.lock.lock();
    try {
        //表示RepeatedTimer已經被調用過
        this.invoking = true;
    } finally {
        this.lock.unlock();
    }
    try {
        //然後會調用RepeatedTimer執行個體實作的方法
        onTrigger();
    } catch (final Throwable t) {
        LOG.error("Run timer failed.", t);
    }
    boolean invokeDestroyed = false;
    this.lock.lock();
    try {
        this.invoking = false;
        //如果調用了stop方法,那麼将不會繼續調用schedule方法
        if (this.stopped) {
            this.running = false;
            invokeDestroyed = this.destroyed;
        } else {
            this.timeout = null;
            schedule();
        }
    } finally {
        this.lock.unlock();
    }
    if (invokeDestroyed) {
        onDestroy();
    }
}

protected void onDestroy() {
    // NO-OP
}

           

這個run方法會由timer進行回調,如果沒有調用stop或destroy方法的話,那麼調用完onTrigger方法後會繼續調用schedule,然後一次次循環調用RepeatedTimer的run方法。

如果調用了destroy方法,在這裡會有一個onDestroy的方法,可以由實作類override複寫執行一個鈎子。

HashedWheelTimer的基本介紹

2. SOFAJRaft源碼分析—JRaft的定時任務排程器是怎麼做的?

HashedWheelTimer通過一定的hash規則将不同timeout的定時任務劃分到HashedWheelBucket進行管理,而HashedWheelBucket利用雙向連結清單結構維護了某一時刻需要執行的定時任務清單

Wheel

時間輪,是一個HashedWheelBucket數組,數組數量越多,定時任務管理的時間精度越精确。tick每走一格都會将對應的wheel數組裡面的bucket拿出來進行排程。

Worker

Worker繼承自Runnable,HashedWheelTimer必須通過Worker線程操作HashedWheelTimer中的定時任務。Worker是整個HashedWheelTimer的執行流程管理者,控制了定時任務配置設定、全局deadline時間計算、管理未執行的定時任務、時鐘計算、未執行定時任務回收處理。

HashedWheelTimeout

是HashedWheelTimer的執行機關,維護了其所屬的HashedWheelTimer和HashedWheelBucket的引用、需要執行的任務邏輯、目前輪次以及目前任務的逾時時間(不變)等,可以認為是自定義任務的一層Wrapper。

HashedWheelBucket

HashedWheelBucket維護了hash到其内的所有HashedWheelTimeout結構,是一個雙向隊列。

HashedWheelTimer的構造器

在初始化RepeatedTimer執行個體的時候會執行個體化一個HashedWheelTimer:

new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048)
           

然後調用HashedWheelTimer的構造器:

private final HashedWheelBucket[]  wheel;
private final int  mask;
private final long  tickDuration;
private final Worker  worker    = new Worker();
private final Thread   workerThread;
private final long  maxPendingTimeouts;
private static final int  INSTANCE_COUNT_LIMIT   = 256;
private static final AtomicInteger instanceCounter        = new AtomicInteger();
private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();


public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
		tickDuration
    this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                        long maxPendingTimeouts) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    //unit = MILLISECONDS
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    }
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    // 建立一個HashedWheelBucket數組
    // 建立時間輪基本的資料結構,一個數組。長度為不小于ticksPerWheel的最小2的n次方
    wheel = createWheel(ticksPerWheel);
    // 這是一個标示符,用來快速計算任務應該呆的格子。
    // 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,是以使用一種變通的位運算代替:
    // 因為一圈的長度為2的n次方,mask = 2^n-1後低位将全部是1,然後deadline&mast == deadline%wheel.length
    // java中的HashMap在進行hash之後,進行index的hash尋址尋址的算法也是和這個一樣的
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    //tickDuration傳入是1的話,這裡會轉換成1000000
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    // 校驗是否存在溢出。即指針轉動的時間間隔不能太長而導緻tickDuration*wheel.length>Long.MAX_VALUE
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
                                                                                        / wheel.length));
    }
    //将worker包裝成thread
    workerThread = threadFactory.newThread(worker);
    //maxPendingTimeouts = -1
    this.maxPendingTimeouts = maxPendingTimeouts;

    //如果HashedWheelTimer執行個體太多,那麼就會列印一個error日志
    if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
        && warnedTooManyInstances.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}
           

這個構造器裡面主要做一些初始化的工作。

  1. 初始化一個wheel資料,我們這裡初始化的數組長度為2048.
  2. 初始化mask,用來計算槽位的下标,類似于hashmap的槽位的算法,因為wheel的長度已經是一個2的n次方,是以2^n-1後低位将全部是1,用&可以快速的定位槽位,比%耗時更低
  3. 初始化tickDuration,這裡會将傳入的tickDuration轉化成納秒,那麼這裡是1000,000
  4. 校驗整個時間輪走完的時間不能過長
  5. 包裝worker線程
  6. 因為HashedWheelTimer是一個很消耗資源的一個結構,是以校驗HashedWheelTimer執行個體不能太多,如果太多會列印error日志

啟動timer

時間輪算法中并不需要手動的去調用start方法來啟動,而是在添加節點的時候會啟動時間輪。

我們在RepeatedTimer的schedule方法裡會調用newTimeout向時間輪中添加一個任務。

HashedWheelTimer#newTimeout

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                                             + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
    // 如果時間輪沒有啟動,則啟動
    start();

    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    //在delay為正數的情況下,deadline是不可能為負數
    //如果為負數,那麼說明超過了long的最大值
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // 這裡定時任務不是直接加到對應的格子中,而是先加入到一個隊列裡,然後等到下一個tick的時候,
    // 會從隊列裡取出最多100000個任務加入到指定的格子中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    //Worker會去處理timeouts隊列裡面的資料
    timeouts.add(timeout);
    return timeout;
}
           

在這個方法中,在校驗之後會調用start方法啟動時間輪,然後設定deadline,這個時間等于時間輪啟動的時間點+延遲的的時間;

然後建立一個HashedWheelTimeout執行個體,會直接加入到timeouts隊列中去,timeouts對列會在worker的run方法裡面取出來放入到wheel中進行處理。

然後我們來看看start方法:

HashedWheelTimer#start

private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater     = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");

private volatile int  workerState; 
//不需要你主動調用,當有任務添加進來的的時候他就會跑
public void start() {
    //workerState一開始的時候是0(WORKER_STATE_INIT),然後才會設定為1(WORKER_STATE_STARTED)
    switch (workerStateUpdater.get(this)) {
        case WORKER_STATE_INIT:
            //使用cas來擷取啟動排程的權力,隻有競争到的線程允許來進行執行個體啟動
            if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                //如果成功設定了workerState,那麼就調用workerThread線程
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // 等待worker線程初始化時間輪的啟動時間
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            //這裡使用countDownLauch來確定排程的線程已經被啟動
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}
           

由這裡我們可以看出,啟動時間輪是不需要手動去調用的,而是在有任務的時候會自動運作,防止在沒有任務的時候空轉浪費資源。

在start方法裡面會使用AtomicIntegerFieldUpdater的方式來更新workerState這個變量,如果沒有啟動過那麼直接在cas成功之後調用start方法啟動workerThread線程。

如果workerThread還沒運作,那麼會在while循環中等待,直到workerThread運作為止才會往下運作。

開始時間輪轉

時間輪的運轉是在Worker的run方法中進行的:

Worker#run

private final Set<Timeout> unprocessedTimeouts = new HashSet<>();
private long               tick;
public void run() {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    //HashedWheelTimer的start方法會繼續往下運作
    // Notify the other threads waiting for the initialization at start().
    startTimeInitialized.countDown();

    do {
        //傳回的是目前的nanoTime- startTime
        //也就是傳回的是 每 tick 一次的時間間隔
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            //算出時間輪的槽位
            int idx = (int) (tick & mask);
            //移除cancelledTimeouts中的bucket
            // 從bucket中移除timeout
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // 将newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中
            transferTimeoutsToBuckets();
            bucket.expireTimeouts(deadline);
            tick++;
        }
    //    校驗如果workerState是started狀态,那麼就一直循環
    } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        //如果有沒有被處理的timeout,那麼加入到unprocessedTimeouts對列中
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    //處理被取消的任務
    processCancelledTasks();
}
           
  1. 這個方法首先會設定一個時間輪的開始時間startTime,然後調用startTimeInitialized的countDown讓被阻塞的線程往下運作
  2. 調用waitForNextTick等待到下次tick的到來,并傳回當次的tick時間-startTime
  3. 通過&的方式擷取時間輪的槽位
  4. 移除掉被取消的task
  5. 将timeouts中的任務轉移到對應的wheel槽位中,如果槽位中不止一個bucket,那麼串成一個連結清單
  6. 執行格子中的到期任務
  7. 周遊整個wheel,将過期的bucket放入到unprocessedTimeouts隊列中
  8. 将timeouts中過期的bucket放入到unprocessedTimeouts隊列中

上面所有的過期但未被處理的bucket會在調用stop方法的時候傳回unprocessedTimeouts隊列中的資料。是以unprocessedTimeouts中的資料隻是做一個記錄,并不會再次被執行。

時間輪的所有處理過程都在do-while循環中被處理,我們下面一個個分析

處理被取消的任務

Worker#processCancelledTasks

private void processCancelledTasks() {
    for (;;) {
        HashedWheelTimeout timeout = cancelledTimeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        try {
            timeout.remove();
        } catch (Throwable t) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("An exception was thrown while process a cancellation task", t);
            }
        }
    }
}
           

這個方法相當的簡單,因為在調用HashedWheelTimer的stop方法的時候會将要取消的HashedWheelTimeout執行個體放入到cancelledTimeouts隊列中,是以這裡隻需要循環把隊列中的資料取出來,然後調用HashedWheelTimeout的remove方法将自己在bucket移除就好了

HashedWheelTimeout#remove

void remove() {
    HashedWheelBucket bucket = this.bucket;
    if (bucket != null) {
		  //這裡面涉及到連結清單的引用摘除,十厘清晰易懂,想了解的可以去看看
        bucket.remove(this);
    } else {
        timer.pendingTimeouts.decrementAndGet();
    }
}
           

轉移資料到時間輪中

Worker#transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
    // adds new timeouts in a loop.
    // 每次tick隻處理10w個任務,以免阻塞worker線程
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        //已經被取消了;
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        //calculated = tick 次數
        long calculated = timeout.deadline / tickDuration;
        // 計算剩餘的輪數, 隻有 timer 走夠輪數, 并且到達了 task 所在的 slot, task 才會過期
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        //如果任務在timeouts隊列裡面放久了, 以至于已經過了執行時間, 這個時候就使用目前tick, 也就是放到目前bucket, 此方法調用完後就會被執行
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        //// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        //将timeout加入到bucket連結清單中
        bucket.addTimeout(timeout);
    }
}
           
  1. 每次調用這個方法會處理10w個任務,以免阻塞worker線程
  2. 在校驗之後會用timeout的deadline除以每次tick運作的時間tickDuration得出需要tick多少次才會運作這個timeout的任務
  3. 由于timeout的deadline實際上還包含了worker線程啟動到timeout加入隊列這段時間,是以在算remainingRounds的時候需要減去目前的tick次數
|_____________________|____________
 worker啟動時間  	   	 timeout任務加入時間

           
  1. 最後根據計算出來的ticks來&算出wheel的槽位,加入到bucket連結清單中

執行到期任務

在worker的run方法的do-while循環中,在根據目前的tick拿到wheel中的bucket後會調用expireTimeouts方法來處理這個bucket的到期任務

HashedWheelBucket#expireTimeouts

// 過期并執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法,
//根據deadline和remainingRounds判斷任務是否過期
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    //周遊格子中的所有定時任務
    while (timeout != null) {
        // 先儲存next,因為移除後next将被設定為null
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            //從bucket連結清單中移除目前timeout,并傳回連結清單中下一個timeout
            next = remove(timeout);
            //如果timeout的時間小于目前的時間,那麼就調用expire執行task
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                //不可能發生的情況,就是說round已經為0了,deadline卻>目前槽的deadline
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
                        timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            //因為目前的槽位已經過了,說明已經走了一圈了,把輪數減一
            timeout.remainingRounds--;
        }
        //把指針放置到下一個timeout
        timeout = next;
    }
}
           

expireTimeouts方法會根據目前tick到的槽位,然後擷取槽位中的bucket并找到連結清單中到期的timeout并執行

  1. 因為每一次的指針都會指向bucket中的下一個timeout,是以timeout為空時說明整個連結清單已經周遊完畢,是以用while循環做非空校驗
  2. 因為沒一次循環都會把目前的輪數大于零的做減一處理,是以當輪數小于或等于零的時候就需要把目前的timeout移除bucket連結清單
  3. 在校驗deadline之後執行expire方法,這裡會真正進行任務調用

HashedWheelTimeout#task

public void expire() {
    if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
        return;
    }

    try {
        task.run(this);
    } catch (Throwable t) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
        }
    }
}
           

這裡這個task就是在schedule方法中建構的timerTask執行個體,調用timerTask的run方法會調用到外層的RepeatedTimer的run方法,進而調用到RepeatedTimer子類實作的onTrigger方法。

到這裡Jraft的定時排程就講完了,感覺還是很有意思的。