天天看點

Redis源代碼解析:13Redis中的事件驅動機制

         Redis中。處理網絡IO時,採用的是事件驅動機制。但它沒有使用libevent或者libev這種庫,而是自己實作了一個很easy明了的事件驅動庫ae_event,主要代碼隻400行左右。

         沒有選擇libevent或libev的原因大概在于。這些庫為了迎合通用性造成代碼龐大,并且當中的非常多功能,比方監控子程序,複雜的定時器等。這些都不是Redis所須要的。

         Redis中的事件驅動庫僅僅關注網絡IO,以及定時器。該事件庫處理以下兩類事件:

         a:檔案事件(file  event):用于處理Redisserver和client之間的網絡IO。

         b:時間事件(time  eveat):Redis伺服器中的一些操作(比方serverCron函數)須要在給定的時間點運作,而時間事件就是處理這類定時操作的。

         事件驅動庫的代碼主要是在src/ae.c中實作的。

一:檔案事件

         Redis基于Reactor模式開發了自己的網絡事件處理器,也就是檔案事件處理器。

檔案事件處理器使用IO多路複用技術。同一時候監聽多個套接字,并為套接字關聯不同的事件處理函數。

當套接字的可讀或者可寫事件觸發時,就會調用對應的事件處理函數。

         Redis使用的IO多路複用技術主要有:select、epoll、evport和kqueue等。每一個IO多路複用函數庫在Redis源代碼中都相應一個單獨的檔案,比方ae_select.c,ae_epoll.c, ae_kqueue.c等。

         這些多路複用技術,依據不同的作業系統,Redis依照一定的優先級。選擇當中的一種使用。在ae.c中。是這樣實作的:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif      

         注意這裡是include的.c檔案,是以。使用哪種多路複用技術。是在編譯階段就決定了的。

         檔案事件由結構體aeFileEvent表示,它的定義例如以下:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;      

         當中mask表示描寫叙述符注冊的事件。能夠是AE_READABLE。AE_WRITABLE或者是AE_READABLE|AE_WRITABLE。

         rfileProc和wfileProc分别表示可讀和可寫事件的回調函數。

         clientData是使用者提供的資料,在調用回調函數時被當做參數。注意。該資料是可讀和可寫事件共用的。

二:時間事件

         Redis的時間事件主要有一次性事件和周期性事件兩種。一次性時間事件僅觸發一次。而周期性事件每隔一段時間就觸發一次。

         時間事件由aeTimeEvent結構體表示,它的定義例如以下:

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;      

         id用于辨別時間事件。id号依照從小到大的順序遞增,新時間事件的id号比舊時間事件的id号要大。

         when_sec和when_ms表示時間事件的下次觸發時間,實際上就是一個Unix時間戳,when_sec記錄它的秒數,when_ms記錄它的毫秒數。是以觸發時間是一個絕對值,而非相對值。

         timeProc是時間事件處理器,也就是時間事件觸發時的回調函數;

         finalizerProc是删除該時間事件時要調用的函數;

         clientData是使用者提供的資料。在調用timeProc和finalizerProc時。作為參數;

         全部的時間事件aeTimeEvent結構被組織成一個連結清單。next指針就運作連結清單中,目前aeTimeEvent結構的後繼結點。

         aeTimeEvent結構連結清單是一個無序連結清單。也就是說它并不依照事件的觸發時間而排序。

每當建立一個新的時間事件aeTimeEvent結構時,該結構就插傳入連結表的頭部。是以。當監控時間事件時,須要周遊整個連結清單。查找全部已到達的時間事件,并調用對應的事件處理器。

         在眼下版本号中,正常模式下的Redisserver僅僅使用serverCron一個時間事件。而在benchmark模式下,server也僅僅使用兩個時間事件。

是以。時間事件連結清單的這樣的設計盡管簡單粗暴,可是也能滿足性能需求。

三:事件循環結構

         在事件驅動的實作中,須要有一個事件循環結構來監控排程全部的事件,比方Libevent庫中的event_base,libev中的ev_loop等。

         在Redis中的事件驅動庫中,事件循環結構是由aeEventLoop結構體實作的。aeEventLoop結構是Redis中事件驅動機制的主要資料結構。它的定義例如以下:

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;      

         events是aeFileEvent結構的數組,每一個aeFileEvent結構表示一個注冊的檔案事件。events數組以描寫叙述符的值為下标。

         fired是aeFiredEvent結構的數組,aeFiredEvent結構表示一個觸發的檔案事件。

結構中包括了描寫叙述符,以及其上已經觸發的事件。該數組不是以描寫叙述符的值為下标。而是依次儲存全部觸發的檔案事件。當處理事件時,輪訓fired數組中的每一個元素。然後依次處理。

         setsize表示eventLoop->events和eventLoop->fired數組的大小。是以。setsize- 1就表示所能處理的最大的描寫叙述符的值。

         lastTime:為了處理時間事件而記錄的Unix時間戳。主要為了在系統時間被調整時可以盡快的處理時間事件;

         timeEventHead:時間事件aeTimeEvent結構組成的連結清單的頭指針。

         timeEventNextId:下個時間事件的ID,該ID依次遞增,是以目前時間事件的最大ID為timeEventNextId-1;

         stop:是否停止事件監控;

         maxfd:目前處理的最大的描寫叙述符的值,主要是在select中使用。

         beforesleep:每次監控事件觸發之前。須要調用的函數。

         apidata表示詳細的底層多路複用所使用的資料結構,比方對于select來說。該結構中儲存了讀寫描寫叙述符數組;對于epoll來說。該結構中儲存了epoll描寫叙述符,以及epoll_event結構數組;

四:監控排程時間事件

         監控排程時間事件是由函數processTimeEvents實作的,它的代碼例如以下:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don't process events registered
             * by event handlers itself in order to don't loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it's not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). */
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}      

         首先推斷系統時間是否被調整了。将目前時間now,與上次記錄的時間戳eventLoop->lastTime相比較。假設now小于eventLoop->lastTime。說明系統時間被調整到過去了,比方由201603312030調整到了201603312000了。這樣的情況下,直接将全部事件的觸發時間的秒數清0,這意味着全部的時間事件都會馬上觸發。

之是以這麼做。是由于提前處理比延後處理的危急性要小;

         然後更新eventLoop->lastTime為now;

         接下來,先記錄目前的maxId。之是以這麼做,是由于有時間事件觸發後。要又一次回到連結清單頭結點開始處理。而在時間事件的觸發回調函數中。有可能注冊了新的時間事件,成為新的連結清單頭結點,這就可能導緻會無限處理下去。為了防止這樣的情況發生。記錄目前的maxId,僅僅處理目前的時間事件;

         輪訓連結清單eventLoop->timeEventHead,針對當中的每個事件節點te,假設te的id大于maxId。說明該事件,是在之前已經觸發的時間事件的回調函數中注冊的。不處理這種事件,直接處理下一個;

         然後得到目前時間,推斷目前時間是否已經超過了te的觸發時間,若是。說明該事件須要觸發,調用觸發回調函數te->timeProc,該函數的傳回值為retval;

         假設retval是AE_NOMORE,說明觸發的時間事件是一次性事件,直接從連結清單中删除;否則。說明該事件是周期性事件。将其觸發時間更改為目前時間加上retval;

         事件觸發後,連結清單已經被改動了,要又一次回到連結清單頭結點開始處理。由于Redis中僅僅有一個時間事件,是以採用了這樣的簡單粗暴的算法。更好的處理方式是處理完目前事件後。标記該節點須要删除(比方在還有一個連結清單中儲存該節點的指針),然後接着處理下一個節點,全部節點處理完之後,将标記為删除的節點統一删除就可以。

         最後傳回觸發的事件總數。

五:監控排程全部事件

         監控排程全部事件是由函數aeProcessEvents實作的,它的代碼例如以下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}      

         依據flags處理不同的事件:

         假設flags為0。則該函數直接傳回。

         假設flags中設定了AE_ALL_EVENTS,則處理全部的檔案事件和時間事件;

         假設flags中設定了AE_FILE_EVENTS。則處理全部的檔案事件。

         假設flags中設定了AE_TIME_EVENTS,則處理全部的時間事件;

         假設flags中設定了AE_DONT_WAIT,則調用多路複用函數時。不會堵塞等

待事件的觸發。将全部已觸發的事件處理完後馬上傳回。

         眼下在Redis中,調用aeProcessEvents時設定的flags僅僅有AE_ALL_EVENTS和

AE_FILE_EVENTS|AE_DONT_WAIT兩種。

         函數中,首先假設flags中既沒有設定AE_TIME_EVENTS。也沒有設定AE_FILE_EVENTS。則該函數直接傳回0.

         接下來,假設已經注冊過檔案事件,或者須要處理時間事件且不是AE_DONT_WAIT。則須要調用底層多路複用函數aeApiPoll。是以須要計算調用aeApiPoll函數時,最長堵塞時間tvp。該值是由最早要觸發的時間事件(假設有的話)決定的。

         假設須要處理時間事件且不是AE_DONT_WAIT。這樣的情況下,無論有沒有檔案事件,都要堵塞一段時間。堵塞的時間依據shortest得到,shortest是通過調用aeSearchNearestTimer得到的最早要觸發的時間事件。得到shortest後,計算得出其觸發時間距離目前時間的內插補點,該內插補點就是堵塞時間tvp。

         否則。假設注冊過檔案事件,而且flags中設定了AE_DONT_WAIT。則将tvp中的值設定為0。表示全然不堵塞;       

         假設注冊過檔案事件,可是flags中沒有設定AE_DONT_WAIT,則将tvp置為NULL,表示一直堵塞,直到有檔案事件觸發;

         得到最長堵塞時間tvp之後。以tvp為參數調用aeApiPoll等待檔案事件的觸發。該函數由不同的底層多路複用函數實作。終于都傳回觸發的檔案事件總數numevents,并将觸發的事件和描寫叙述符,依次記錄到eventLoop->fired中。

         接下來。依次輪訓eventLoop->fired中的前numevents個元素。調用對應的事件回調函數。注意。假設一個套接字又可讀又可寫的話,那麼server将先處理可讀事件,然後在處理可寫事件。

         觸發的檔案事件是依次處理的,假設某個檔案事件的處理時間過長,就會影響到下一個事件的處理。在事件驅動的實作中,要由使用者保證事件回調函數可以高速傳回,而不堵塞。

         注意。有這樣一種情況。比方描寫叙述符3和4都有事件觸發了,在3的事件回調函數中,調用aeDeleteFileEvent将4的注冊事件删除了。這樣在處理描寫叙述符4時,就不應該再次調用4的回調函數了。

是以,每次調用事件回調函數之前,都推斷該描寫叙述符上的注冊事件是否還有效。并且假設可讀和可寫事件的回調函數同樣的話,僅僅能調用一次該函數。

         處理完檔案事件之後(或者沒有檔案事件。而隻堵塞了tvp的時間),假設flags中設定了AE_TIME_EVENTS。則調用processTimeEvents處理時間事件,因已經堵塞了tvp的時間,是以此時肯定有觸發的時間事件。最後。傳回全部觸發的事件總數。

         由于時間事件在檔案事件之後處理,而且事件之間不會出現搶占,是以時間事件的實際處理時間。一般會比時間事件設定的到達時間稍晚一些。

         再次強調一點:對檔案事件和時間事件的處理都是同步、有序、原子地運作的,server不會中途中斷事件處理,也不會對事件進行搶占。

是以。無論是檔案事件的回調函數,還是時間事件的回調函數。都須要盡可地降低程式的堵塞時間,進而降低造成事件饑餓的可能性。比方,在指令回複回調函數中,将一個指令回複寫入到client套接字時。假設寫人位元組數超過了一個預設常量的話。指令回複函數就會主動用break跳出寫人循環。将餘下的資料留到下次再寫。

另外,時間事件也會将很耗時的持久化操作放到子線程或者子程序運作。

六:事件循環監控

         事件循環監控是由函數aeMain實作的,它的代碼例如以下:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}      

         僅僅要eventLoop->stop不為1。則持續調用aeProcessEvents監控排程全部事件的觸發。正常情況下,在Redisserver中,eventLoop->stop永遠不可能為1。       

         在Redisserver的主函數中,全部初始化工作完畢之後,就會調用該函數。監控全部事件的觸發。

七:樣例:ECHOserver

         以下是使用Redis的事件驅動庫,實作的一個簡單echoserver:

#define SERVER_PORT 9998

typedef struct
{
    char clientaddr[INET_ADDRSTRLEN];
    int port;
    char buf[1024];
}Userbuf;

void setunblock(int fd) 
{
    int flags;
    if ((flags = fcntl(fd, F_GETFL)) == -1) 
    {
        perror("fcntl(F_GETFL) error");
        return;
    }

    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) 
    {
        perror("fcntl(F_SETFL) error");
        return;
    }
    return;
}

void acceptfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
    int acceptfd = -1;
    struct sockaddr_in cliaddr;
    socklen_t addrlen = sizeof(cliaddr);

    acceptfd = accept(fd, (struct sockaddr *)&cliaddr, &addrlen);
    if (acceptfd < 0)
    {
        perror("accept error\n");
        return;
    }

    Userbuf *usrbuf = calloc(1, sizeof(Userbuf));
    printf("calloc %p\n", usrbuf);
    inet_ntop(AF_INET, &cliaddr.sin_addr, usrbuf->clientaddr, INET_ADDRSTRLEN),
    usrbuf->port = ntohs(cliaddr.sin_port);
    printf("\naccept from <%s:%d>\n", usrbuf->clientaddr, usrbuf->port);

    setunblock(acceptfd);

    if (aeCreateFileEvent(eventLoop, acceptfd, AE_READABLE, readfun, usrbuf) != AE_OK)
    {
        perror("aeCreateFileEvent error");
        close(acceptfd);
        printf("free %p\n", usrbuf);
        free(usrbuf);
        return;
    }
    return;
}

void readfun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
    char readbuf[1024] = {};
    int len = -1;
    Userbuf *usrbuf = (Userbuf *)clientData;

    if ((len = read(fd, readbuf, 1024)) > 0)
    {
        printf("read from <%s:%d>: %s\n", usrbuf->clientaddr, usrbuf->port, readbuf);

        memcpy(usrbuf->buf, readbuf, 1024);
        if (aeCreateFileEvent(eventLoop, fd, AE_WRITABLE, writefun, clientData) != AE_OK)
        {
            printf("aeCreateFileEvent error\n");
            goto END;

        }
        else
            return;
    }
    else if (len == 0)
    {
        printf("close link from %s\n", usrbuf->buf);
        goto END;
    }
    else
    {
        printf("read error from %s\n", usrbuf->buf);
        goto END;
    }

END:
    close(fd);
    aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
    aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);
    printf("free %p\n", clientData);
    free(clientData);
    return;
}

void writefun(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
{
    int len = 0;
    char *buf = ((Userbuf *)clientData)->buf;
    len = strlen(buf);

    printf("write to client: %s\n", buf);
    if(write(fd, buf, len) != len)
    {
        perror("write error");

        close(fd);
        aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
        aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);

        printf("free %p\n", clientData);
        free(clientData);
    }   
    aeDeleteFileEvent(eventLoop, fd, AE_WRITABLE);
}

int main()
{
    int listenfd;
    aeEventLoop *eventloop = NULL;
    struct sockaddr_in seraddr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0)
    {
        perror("socket error");
        return -1;
    }

    seraddr.sin_family = AF_INET;
    seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    seraddr.sin_port = htons(SERVER_PORT);

    if (bind(listenfd, (struct sockaddr *)&seraddr, sizeof(seraddr)) < 0)
    {
        perror("bind error");
        close(listenfd);
        return -1;
    }

    if (listen(listenfd, 5) < 0)
    {
        perror("listen error");
        close(listenfd);
        return -1;
    }

    eventloop = aeCreateEventLoop(1024);
    if (eventloop == NULL)
    {
        printf("aeCreateEventLoop error\n");
        close(listenfd);
        return -1;
    }

    if (aeCreateFileEvent(eventloop, listenfd, AE_READABLE, acceptfun, NULL) != AE_OK)
    {
        perror("aeCreateFileEvent error");
        close(listenfd);
        aeDeleteEventLoop(eventloop);
        return -1;
    }

    aeMain(eventloop);
    return 0;
}      

         這裡要注意的是,對于同一個acceptfd,調用aeCreateFileEvent函數。分别注冊可讀事件和可寫事件時。其clientData是共享的。

假設在注冊可寫事件時,改動了clientData,則可讀事件的clientData也對應改變,這是由于一個描寫叙述符僅僅有一個aeFileEvent結構。

         client的代碼依據Webbench改寫。詳細代碼見:

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/tests/hhunittest/test_ae_client.c

         其它有關事件驅動的代碼實作。能夠參考:

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae.c

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_epoll.c

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/ae_select.c