天天看點

timer的優化故事

原作者:淘傑

前幾天nodejs釋出了新版本4.0,其中涉及到一個更新比較多的子產品,那就是下面要介紹的timer子產品。

timers: Improved timer performance from porting the 0.12 implementation, plus minor fixes (Jeremiah Senkpiel)#2540, (Julien Gilli)nodejs/node-v0.x-archive#8751nodejs/node-v0.x-archive#8905

之前也對timer子產品有過比較多的研究,斷斷續續的看過這個子產品在github上的一些改動,于是借着這次機會整理一下自己對timer子產品的了解,和小夥伴們一起分享timer子產品的優化過程。

使用場景

也許你在使用nodejs開發項目時并沒有使用到timer子產品,諸如setTimeout以及setInterval和setImediate方法等等。但如果你開發的是web項目,那麼你的項目中一定涉及到了timer子產品。

細心的同學在平時的http接口開發調試中可能會注意到,每個http的request header裡都有一個

Connection:keep-alive

辨別,這是http/1.1開始引入的,表示用戶端需要和服務端一直保持着tcp連接配接。當然了,這個連接配接不能就這麼一直保持着,是以一般都會有一個逾時時間,超過這個時間用戶端還沒有發送新的http請求,那麼伺服器就需要自動斷開進而繼續為其他用戶端提供服務。

nodejs提供的http伺服器便是采用timer子產品來滿足這種請求,每一個新的連接配接到來構造出一個socket對象,便會調用socket.setTimeout設定一個定時器用于逾時後自動斷開連接配接。

設計

在nodejs開發的web項目中,timer子產品的使用頻率是非常高的,每一個新的連接配接到來都會設定它的逾時時間,而且每個連接配接的逾時時間都一樣,在http server中預設是2 60 1000ms。nodejs使用c++包裹的Timer對象來實作定時器功能,下面的代碼示例了使用Timer對象來實作一個非常簡單的定時器。

const Timer = process.binding('timer_wrap').Timer;
const kOnTimeout = Timer.kOnTimeout | 0;

var mySetTimeout = function (fn, ms) {
    var timer  = new Timer();
    timer.start(ms, 0);
    timer[kOnTimeout] = fn;
    return timer;    
}

var myClearTimeout = function(timer){
    if(timer && timer.close) {
        timer.close();
    }
}

mySetTimeout(function() {
    console.log('timeout!');
},1000);
           

那我們是否就可以用上面實作的mySetTimeout來對每個socket進行逾時操作呢

mySetTimeout(function(){socket.close();},2 * 60 * 1000);           

可以是可以,但是這樣真的好嗎?設想我們做的是一個非常棒的産品,每天好幾百萬上千萬的使用者,高峰期在2 60 1000ms這段時間内會産生非常多的新連接配接,必然會建立非常多的Timer對象,這個開銷還真不小!

nodejs在設計之初就非常非常注重性能,是以像上面這種這麼簡單的方案必然是不能接受的。

實際上在這2分鐘之内,nodejs中的timer子產品隻會建立一個Timer對象,一個Timer對象如何來滿足這麼多連接配接的逾時處理呢?

timer子產品會使用一個連結清單來儲存所有逾時時間相同的對象,每個對象中都會存儲開始時間_idleStart以及逾時時間_idleTimeout。連結清單中第一個加入的對象一定會比後面加入的對象先逾時,當第一個對象逾時完成處理後,重新計算下一個對象是否已經到時或者還有多久到時,之前建立的Timer對象便會再次啟動并設定新的逾時時間,直到當連結清單上所有的對象都已經完成逾時處理,此時便會關閉這個Timer對象。

通過這種巧妙的設計,使得一個Timer對象得到了最大的重用,進而極大的提升了timer子產品的性能。這一場景其實在libev中已早有研究

http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts

實作

上面說到timer子產品通過c++提供的Timer對象,最終生成setTimeout以及setInterval等函數暴露給使用者使用。那Timer對象是如何實作的呢,下面我們就來一探究竟。

一個最底層的timer

熟悉linux網絡程式設計的同學一定聽說過epoll吧,

epoll是什麼?按照man手冊的說法:是為處理大批量句柄而作了改進的poll。當然,這不是2.6核心才有的,它是在2.5.44核心中被引進的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。

其中有這麼一個函數

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
           
收集在epoll監控的事件中已經發送的事件。參數events是配置設定好的epoll_event結構體數組,epoll将會把發生的事件指派到events數組中(events不可以是空指針,核心隻負責把資料複制到這個events數組中,不會去幫助我們在使用者态中配置設定記憶體)。maxevents告之核心這個events有多大,這個 maxevents的值不能大于建立epoll_create()時的size,參數timeout是逾時時間(毫秒,0會立即傳回,-1将不确定,也有說法說是永久阻塞)。如果函數調用成功,傳回對應I/O上已準備好的檔案描述符數目,如傳回0表示已逾時。

當我們監聽一個fd上的事件時,可以設定等待事件發生的逾時時間。利用這個特性便可以非常簡單的實作一個定時器功能。

由于我使用的是mac系統,是以就用kqueue來代替epoll(它們之間非常相似,具體的詳細介紹以及使用方法感興趣的可以自行查閱相關資料)

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

const int MAX_EVENT_COUNT = 5000;

int main() {
  struct timeval t_start,t_end;
  int fd = -1;//構造一個不會有任何事件發生的fd
  int kq = kqueue();
  struct kevent changes[1];
  EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
  int timeout = 3500;
  struct kevent events[MAX_EVENT_COUNT];
  struct timespec spec;
  spec.tv_sec = timeout / 1000;
  spec.tv_nsec = (timeout % 1000) * 1000000;
  gettimeofday(&t_start, NULL);
  kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec);
  gettimeofday(&t_end, NULL);
  printf("timeout = %d, run time is %ld\n", timeout,  t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000));
  return 0;
}
           

至此我們便利用kqueue實作了一個非常簡單非常底層的定時器。

libuv中的timer

前面講到,在http server中每一個新的連接配接并不會真的就去建立一個Timer對象。同樣,在nodejs底層的定時器中,并不會每次建立一個Timer對象就在kqueue上注冊一個事件等待逾時。優化的思路和nodejs中的timer子產品很相似,隻不過現在不能保證每個定時器的逾時時間都一樣。

定時器有一個非常顯著的特征,逾時時間最短的定時器一定最先觸發,假設我們有很多的定時任務,每個任務的執行時間都不同。當第一個定時器逾時後,便從這些任務中查找出已經到點的任務并執行對應的逾時處理,然後再重新計算餘下任務中最先執行的時間,并根據這個時間再次開啟一個定時器。

對應的算法需求就是每次都需要查找集合中最小的元素,顯然二叉堆中的最小堆(父結點的鍵值總是小于或等于任何一個子節點的鍵值)是最适合不過的一種資料結構了。由于最小的元素總是處于根節點,我們可以以O(1)時間找到最小值。對于插入操作,在最壞的情況下,新插入的節點需要不斷的和它的父節點進行交換,直到它為根節點為止。假設堆的高度為h, 二叉樹最多有2^(h+1) - 1 個 節點. 是以新插入一個節點最多需要log(n+1) -1 次比較,其算法複雜度為O(logn)。

libuv中已經實作了一個最小二叉堆的算法

https://github.com/joyent/libuv/blob/master/src/heap-inl.h

, 下面我們就用這個算法來實作一個支援設定不同逾時時間的定時器。

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

//https://github.com/joyent/libuv/blob/master/src/heap-inl.h
#include "heap-inl.h"

#define container_of(ptr, type, member) \
  ((type *) ((char *) (ptr) - offsetof(type, member)))

const int MAX_EVENT_COUNT = 5000;

typedef struct {
    struct heap_node* heap_node[3];
    int value;
}node_t;

static int less_than(const struct heap_node* ha, const struct heap_node* hb) {
    const node_t* a;
      const node_t* b;

    a = container_of(ha, const node_t, heap_node);
    b = container_of(hb, const node_t, heap_node);

    if (a->value < b->value)
      return 1;
    return 0;
}


int main() {

  struct heap *heap_p = malloc(sizeof(node_t));
  heap_init(heap_p);

  int a[] = {10,9,8,6,7,3,5,4,2};

  int len = sizeof(a)/sizeof(int);
  for(int i=0;i<len;i++){
    node_t *node_p = malloc(sizeof(node_t));
    node_p->value = a[i]*1000;
    heap_insert(heap_p, (struct heap_node*)node_p->heap_node, less_than);
  }


  int fd = -1;
  int kq = kqueue();

  struct kevent changes[1];
  EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);

  struct kevent events[MAX_EVENT_COUNT];
  struct timeval t_start,t_end;

  while(heap_p->nelts) {
      node_t *node_p = container_of(heap_p->min, node_t, heap_node);
      struct timespec spec;
      spec.tv_sec = node_p->value / 1000;
      spec.tv_nsec = (node_p->value % 1000) * 1000000;
      gettimeofday(&t_start, NULL);
      kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, &spec);
      gettimeofday(&t_end, NULL);
      printf("timeout = %d, run time is %ld\n", node_p->value,  t_end.tv_sec*1000+t_end.tv_usec/1000 - (t_start.tv_sec*1000+t_start.tv_usec/1000));
      heap_dequeue(heap_p, less_than);
  }

  printf("timer is over!\n");

  return 0;
}
           

執行gcc timer.c -o timer && ./timer後輸出

timeout = 2000, run time is 2004
timeout = 3000, run time is 3000
timeout = 4000, run time is 4003
timeout = 5000, run time is 5005
timeout = 6000, run time is 6005
timeout = 7000, run time is 7005
timeout = 8000, run time is 8004
timeout = 9000, run time is 9000
timeout = 10000, run time is 10005
timer is over!           

可以看到我們設定的9個定時器都預期執行了,除了有5ms以内的偏差。這就是nodejs中最底層的定時器實作了。

nodejs中的timer

我們再回到nodejs中的timer子產品,為了不影響到nodejs中的event loop,timer子產品專門提供了一些内部的api(timers._unrefActive)給像socket這樣的對象使用。

timer内部會維護一個unrefList連結清單以及一個unrefTimer Timer對象,當有新的逾時任務到來時便會添加到unrefList中,逾時後便從unrefList中取出任務執行。

在最初的設計中,每次執行_unrefActive添加任務時都會維持着unrefList的順序,保證逾時時間最小的處于前面。這樣在定時器逾時後便可以以最快的速度處理逾時任務并設定下一個定時器,但是在添加任務時最壞的情況下需要周遊unrefList連結清單中的所有節點。具體實作可參考

https://github.com/nodejs/node/blob/5abd4ac079b390467360d671a186a061b5aba736/lib/timers.js

很顯然,在web開發中建立連接配接是最頻繁的操作,那麼向unrefList連結清單中添加節點也就非常頻繁了,而且最開始設定的定時器其實最後真正會逾時的非常少,因為中間涉及到io的正常操作時便會取消定時器。是以問題就變成最耗性能的操作非常頻繁,而幾乎不花時間的操作卻很少被執行到。

針對這種情況,如何解決呢?目前在node社群主要有2種方案。

使用不排序的連結清單

主要思路就是将對unrefList連結清單的周遊操作,移到unrefTimeout定時器逾時進行中。這樣每次查找出已經逾時的任務就需要花比較多的時間了O(n),但是插入操作卻變得非常簡單O(1),而插入節點正是最頻繁的操作。

使用二叉堆

原理和libuv中的timer實作一樣,添加和查找一個節點都能達到O(log(n))的複雜度(找出最小節點本身很快,但是删除它需要O(log(n))的複雜度),能夠在二者之間保持一個很好的平衡。

benchamark

這2種方案都有比較詳細benchamark資料, 具體可參考

https://github.com/nodejs/node-v0.x-archive/wiki/Optimizing-_unrefActive

小結

在高并發連接配接到來并且很少有實際的逾時事件發生時unrefList使用沒有排序的連結清單來存儲逾時任務時性能是非常棒的。但是一旦出現很多逾時事件都發生的情況下,對逾時事件的處理會再次變成一個瓶頸。

而使用二叉堆來存儲逾時任務時,當有大量逾時事件發生時性能會比連結清單好很多,沒有逾時事件觸發時性能比連結清單稍差。

可見nodejs在不同的場景中使用的定時器實作也不都一樣。當我們自己在實際的開發時,如果需要使用到定時器功能,不妨好好思考下哪種方案更适合業務場景,能夠最大的提升timer子產品的性能。

參考文檔

繼續閱讀