轉載請聲明出處哦~,本篇文章釋出于luozhiyun的部落格:https://www.luozhiyun.com/archives/444
最近在工作中有一個需求,簡單來說就是在短時間内會建立上百萬個定時任務,建立的時候會将對應的金額相加,防止超售,需要過半個小時再去核對資料,如果資料對不上就需要将加上的金額再減回去。
這個需求如果用Go内置的Timer來做的話性能比較低下,因為Timer是使用最小堆來實作的,建立和删除的時間複雜度都為 O(log n)。如果使用時間輪的話則是O(1)性能會好很多。
對于時間輪來說,我以前寫過一篇java版的時間輪算法分析:https://www.luozhiyun.com/archives/59,這次來看看Go語言的時間輪實作,順便大家有興趣的也可以對比一下兩者的差別,以及我寫文章的水準和一年多前有沒有提升,哈哈哈。
時間輪的運用其實是非常的廣泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等元件中都存在時間輪的蹤影。下面用Go實作的時間輪是以Kafka的代碼為原型來實作的,完整代碼:https://github.com/devYun/timingwheel。
介紹
簡單時間輪
在時間輪中存儲任務的是一個環形隊列,底層采用數組實作,數組中的每個元素可以存放一個定時任務清單。定時任務清單是一個環形的雙向連結清單,連結清單中的每一項表示的都是定時任務項,其中封裝了真正的定時任務。
時間輪由多個時間格組成,每個時間格代表目前時間輪的基本時間跨度(tickMs)。時間輪的時間格個數是固定的,可用 wheelSize 來表示,那麼整個時間輪的總體時間跨度(interval)可以通過公式 tickMs×wheelSize 計算得出。
時間輪還有一個表盤指針(currentTime),用來表示時間輪目前所處的時間,currentTime 是 tickMs 的整數倍。currentTime指向的地方是表示到期的時間格,表示需要處理的時間格所對應的連結清單中的所有任務。
如下圖是一個tickMs為1s,wheelSize等于10的時間輪,每一格裡面放的是一個定時任務連結清單,連結清單裡面存有真正的任務項:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuQzNxgDO5EzNz0iNzADN0UDM3EzMxIDMxIDMy0SOxEDNwITMvwlMwEjMwIzLclTMxQDMyEzLcd2bsJ2Lc12bj5ycn9Gbi52YuAjMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
初始情況下表盤指針 currentTime 指向時間格0,若時間輪的 tickMs 為 1ms 且 wheelSize 等于10,那麼interval則等于10s。如下圖此時有一個定時為2s的任務插進來會存放到時間格為2的任務連結清單中,用紅色标記。随着時間的不斷推移,指針 currentTime 不斷向前推進,如果過了2s,那麼 currentTime 會指向時間格2的位置,會将此時間格的任務連結清單擷取出來處理。
如果目前的指針 currentTime 指向的是2,此時如果插入一個9s的任務進來,那麼新來的任務會服用原來的時間格連結清單,會存放到時間格1中
這裡所講的時間輪都是簡單時間輪,隻有一層,總體時間範圍在 currentTime 和 currentTime+interval 之間。如果現在有一個15s的定時任務是需要重新開啟一個時間輪,設定一個時間跨度至少為15s的時間輪才夠用。但是這樣擴充是沒有底線的,如果需要一個1萬秒的時間輪,那麼就需要一個這麼大的數組去存放,不僅占用很大的記憶體空間,而且也會因為需要周遊這麼大的數組進而拉低效率。
是以引入了層級時間輪的概念。
層級時間輪
如圖是一個兩層的時間輪,第二層時間輪也是由10個時間格組成,每個時間格的跨度是10s。第二層的時間輪的 tickMs 為第一層時間輪的 interval,即10s。每一層時間輪的 wheelSize 是固定的,都是10,那麼第二層的時間輪的總體時間跨度 interval 為100s。
圖中展示了每個時間格對應的過期時間範圍, 我們可以清晰地看到, 第二層時間輪的第0個時間格的過期時間範圍是 [0,9]。也就是說, 第二層時間輪的一個時間格就可以表示第一層時間輪的所有(10個)時間格;
如果向該時間輪中添加一個15s的任務,那麼當第一層時間輪容納不下時,進入第二層時間輪,并插入到過期時間為[10,19]的時間格中。
随着時間的流逝,當原本15s的任務還剩下5s的時候,這裡就有一個時間輪降級的操作,此時第一層時間輪的總體時間跨度已足夠,此任務被添加到第一層時間輪到期時間為5的時間格中,之後再經曆5s後,此任務真正到期,最終執行相應的到期操作。
代碼實作
因為我們這個Go語言版本的時間輪代碼是仿照Kafka寫的,是以在具體實作時間輪 TimingWheel 時還有一些小細節:
- 時間輪的時間格中每個連結清單會有一個root節點用于簡化邊界條件。它是一個附加的連結清單節點,該節點作為第一個節點,它的值域中并不存儲任何東西,隻是為了操作的友善而引入的;
- 除了第一層時間輪,其餘高層時間輪的起始時間(startMs)都設定為建立此層時間輪時前面第一輪的 currentTime。每一層的 currentTime 都必須是 tickMs 的整數倍,如果不滿足則會将 currentTime 修剪為 tickMs 的整數倍。修剪方法為:currentTime = startMs - (startMs % tickMs);
- Kafka 中的定時器隻需持有 TimingWheel 的第一層時間輪的引用,并不會直接持有其他高層的時間輪,但每一層時間輪都會有一個引用(overflowWheel)指向更高一層的應用;
- Kafka 中的定時器使用了 DelayQueue 來協助推進時間輪。在操作中會将每個使用到的時間格中每個連結清單都加入 DelayQueue,DelayQueue 會根據時間輪對應的過期時間 expiration 來排序,最短 expiration 的任務會被排在 DelayQueue 的隊頭,通過單獨線程來擷取 DelayQueue 中到期的任務;
結構體
type TimingWheel struct {
// 時間跨度,機關是毫秒
tick int64 // in milliseconds
// 時間輪個數
wheelSize int64
// 總跨度
interval int64 // in milliseconds
// 目前指針指向時間
currentTime int64 // in milliseconds
// 時間格清單
buckets []*bucket
// 延遲隊列
queue *delayqueue.DelayQueue
// 上級的時間輪引用
overflowWheel unsafe.Pointer // type: *TimingWheel
exitC chan struct{}
waitGroup waitGroupWrapper
}
tick、wheelSize、interval、currentTime都比較好了解,buckets字段代表的是時間格清單,queue是一個延遲隊列,所有的任務都是通過延遲隊列來進行觸發,overflowWheel是上層時間輪的引用。
type bucket struct {
// 任務的過期時間
expiration int64
mu sync.Mutex
// 相同過期時間的任務隊列
timers *list.List
}
bucket裡面實際上封裝的是時間格裡面的任務隊列,裡面放入的是相同過期時間的任務,到期後會将隊列timers拿出來進行處理。這裡有個有意思的地方是由于會有多個線程并發的通路bucket,是以需要用到原子類來擷取int64位的值,為了保證32位系統上面讀取64位資料的一緻性,需要進行64位對齊。具體的可以看這篇:https://www.luozhiyun.com/archives/429,講的是對記憶體對齊的思考。
type Timer struct {
// 到期時間
expiration int64 // in milliseconds
// 要被執行的具體任務
task func()
// Timer所在bucket的指針
b unsafe.Pointer // type: *bucket
// bucket清單中對應的元素
element *list.Element
}
Timer是時間輪的最小執行單元,是定時任務的封裝,到期後會調用task來執行任務。
初始化時間輪
例如現在初始化一個tick是1s,wheelSize是10的時間輪:
func main() {
tw := timingwheel.NewTimingWheel(time.Second, 10)
tw.Start()
}
func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
// 将傳入的tick轉化成毫秒
tickMs := int64(tick / time.Millisecond)
// 如果小于零,那麼panic
if tickMs <= 0 {
panic(errors.New("tick must be greater than or equal to 1ms"))
}
// 設定開始時間
startMs := timeToMs(time.Now().UTC())
// 初始化TimingWheel
return newTimingWheel(
tickMs,
wheelSize,
startMs,
delayqueue.New(int(wheelSize)),
)
}
func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
// 初始化buckets的大小
buckets := make([]*bucket, wheelSize)
for i := range buckets {
buckets[i] = newBucket()
}
// 執行個體化TimingWheel
return &TimingWheel{
tick: tickMs,
wheelSize: wheelSize,
// currentTime必須是tickMs的倍數,是以這裡使用truncate進行修剪
currentTime: truncate(startMs, tickMs),
interval: tickMs * wheelSize,
buckets: buckets,
queue: queue,
exitC: make(chan struct{}),
}
}
初始化十分簡單,大家可以看看上面的代碼注釋即可。
啟動時間輪
下面我們看看start方法:
func (tw *TimingWheel) Start() {
// Poll會執行一個無限循環,将到期的元素放入到queue的C管道中
tw.waitGroup.Wrap(func() {
tw.queue.Poll(tw.exitC, func() int64 {
return timeToMs(time.Now().UTC())
})
})
// 開啟無限循環擷取queue中C的資料
tw.waitGroup.Wrap(func() {
for {
select {
// 從隊列裡面出來的資料都是到期的bucket
case elem := <-tw.queue.C:
b := elem.(*bucket)
// 時間輪會将目前時間 currentTime 往前移動到 bucket的到期時間
tw.advanceClock(b.Expiration())
// 取出bucket隊列的資料,并調用addOrRun方法執行
b.Flush(tw.addOrRun)
case <-tw.exitC:
return
}
}
})
}
這裡使用了util封裝的一個Wrap方法,這個方法會起一個goroutines異步執行傳入的函數,具體的可以到我上面給出的連結去看源碼。
Start方法會啟動兩個goroutines。第一個goroutines用來調用延遲隊列的queue的Poll方法,這個方法會一直循環擷取隊列裡面的資料,然後将到期的資料放入到queue的C管道中;第二個goroutines會無限循環擷取queue中C的資料,如果C中有資料表示已經到期,那麼會先調用advanceClock方法将目前時間 currentTime 往前移動到 bucket的到期時間,然後再調用Flush方法取出bucket中的隊列,并調用addOrRun方法執行。
func (tw *TimingWheel) advanceClock(expiration int64) {
currentTime := atomic.LoadInt64(&tw.currentTime)
// 過期時間大于等于(目前時間+tick)
if expiration >= currentTime+tw.tick {
// 将currentTime設定為expiration,進而推進currentTime
currentTime = truncate(expiration, tw.tick)
atomic.StoreInt64(&tw.currentTime, currentTime)
// Try to advance the clock of the overflow wheel if present
// 如果有上層時間輪,那麼遞歸調用上層時間輪的引用
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel != nil {
(*TimingWheel)(overflowWheel).advanceClock(currentTime)
}
}
}
advanceClock方法會根據到期時間來從新設定currentTime,進而推進時間輪前進。
func (b *bucket) Flush(reinsert func(*Timer)) {
var ts []*Timer
b.mu.Lock()
// 循環擷取bucket隊列節點
for e := b.timers.Front(); e != nil; {
next := e.Next()
t := e.Value.(*Timer)
// 将頭節點移除bucket隊列
b.remove(t)
ts = append(ts, t)
e = next
}
b.mu.Unlock()
b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()
for _, t := range ts {
reinsert(t)
}
}
Flush方法會根據bucket裡面timers清單進行周遊插入到ts數組中,然後調用reinsert方法,這裡是調用的addOrRun方法。
func (tw *TimingWheel) addOrRun(t *Timer) {
// 如果已經過期,那麼直接執行
if !tw.add(t) {
// 異步執行定時任務
go t.task()
}
}
addOrRun會調用add方法檢查傳入的定時任務Timer是否已經到期,如果到期那麼異步調用task方法直接執行。add方法我們下面會接着分析。
整個start執行流程如圖:
- start方法回啟動一個goroutines調用poll來處理DelayQueue中到期的資料,并将資料放入到管道C中;
- start方法啟動第二個goroutines方法會循環擷取DelayQueue中管道C的資料,管道C中實際上存放的是一個bucket,然後周遊bucket的timers清單,如果任務已經到期,那麼異步執行,沒有到期則重新放入到DelayQueue中。
add task
func main() {
tw := timingwheel.NewTimingWheel(time.Second, 10)
tw.Start()
// 添加任務
tw.AfterFunc(time.Second*15, func() {
fmt.Println("The timer fires")
exitC <- time.Now().UTC()
})
}
我們通過AfterFunc方法添加一個15s的定時任務,如果到期了,那麼執行傳入的函數。
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
t := &Timer{
expiration: timeToMs(time.Now().UTC().Add(d)),
task: f,
}
tw.addOrRun(t)
return t
}
AfterFunc方法回根據傳入的任務到期時間,以及到期需要執行的函數封裝成Timer,調用addOrRun方法。addOrRun方法我們上面已經看過了,會根據到期時間來決定是否需要執行定時任務。
下面我們來看一下add方法:
func (tw *TimingWheel) add(t *Timer) bool {
currentTime := atomic.LoadInt64(&tw.currentTime)
// 已經過期
if t.expiration < currentTime+tw.tick {
// Already expired
return false
// 到期時間在第一層環内
} else if t.expiration < currentTime+tw.interval {
// Put it into its own bucket
// 擷取時間輪的位置
virtualID := t.expiration / tw.tick
b := tw.buckets[virtualID%tw.wheelSize]
// 将任務放入到bucket隊列中
b.Add(t)
// 如果是相同的時間,那麼傳回false,防止被多次插入到隊列中
if b.SetExpiration(virtualID * tw.tick) {
// 将該bucket加入到延遲隊列中
tw.queue.Offer(b, b.Expiration())
}
return true
} else {
// Out of the interval. Put it into the overflow wheel
// 如果放入的到期時間超過第一層時間輪,那麼放到上一層中去
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel == nil {
atomic.CompareAndSwapPointer(
&tw.overflowWheel,
nil,
// 需要注意的是,這裡tick變成了interval
unsafe.Pointer(newTimingWheel(
tw.interval,
tw.wheelSize,
currentTime,
tw.queue,
)),
)
overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
}
// 往上遞歸
return (*TimingWheel)(overflowWheel).add(t)
}
}
add方法根據到期時間來分成了三部分,第一部分是小于目前時間+tick,表示已經到期,那麼傳回false執行任務即可;
第二部分的判斷會根據expiration是否小于時間輪的跨度,如果小于的話表示該定時任務可以放入到目前時間輪中,通過取模找到buckets對應的時間格并放入到bucket隊列中,SetExpiration方法會根據傳入的參數來判斷是否已經執行過延遲隊列的Offer方法,防止重複插入;
第三部分表示該定時任務的時間跨度超過了目前時間輪,需要更新到上一層的時間輪中。需要注意的是,上一層的時間輪的tick是目前時間輪的interval,延遲隊列還是同一個,然後設定為指針overflowWheel,并調用add方法往上層遞歸。
到這裡時間輪已經講完了,不過還有需要注意的地方,我們在用上面的時間輪實作中,使用了DelayQueue加環形隊列的方式實作了時間輪。對定時任務項的插入和删除操作而言,TimingWheel時間複雜度為 O(1),在DelayQueue中的隊列使用的是優先隊列,時間複雜度是O(log n),但是由于buckets清單實際上是非常小的,是以并不會影響性能。
Reference
https://github.com/RussellLuo/timingwheel
https://zhuanlan.zhihu.com/p/121483218
https://github.com/apache/kafka/tree/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/utils/timer