天天看點

Redis單線程不行了,快來割VM/ BIO/ IO多線程的韭菜!(附源碼)

作者介紹

Insutanto,一個普通的程式設計手藝人。

背景

Redis在早期,曾因單線程“聞名”。在Redis的FAQ裡有一個提問

《Redis is single threaded. How can I exploit multiple CPU/cores?》

​​https://redis.io/topic​​s/faq,說明了redis使用單線程的原因:

CPU通常并不是Redis的瓶頸,因為Redis通常要麼受記憶體限制,要麼受網絡限制。比如說,一般在Linux系統上運作的流水線Redis,每秒可以傳遞一百萬個請求,如果你的應用程式主要使用O(N)或O(log(N))指令,幾乎不會使用過多的CPU 。

......

不過從Redis 4.0開始,Redis就開始使用更多的線程了。目前使用多線程的場景(Redis 4.0),僅限于在背景删除對象,以及通過Redis modules實作的阻塞指令。在未來的版本中,計劃是讓Redis越來越線程化。

這不禁讓我好奇,Redis一開始是單線程的嗎?又是怎麼朝多線程演化的呢,又是為什麼讓Redis越來越線程化呢。在閱讀了幾篇文章後,我決定自己讀一遍相關源代碼,了解Redis的多線程演化曆史。

Redis 多線程源碼分析系列指南:

  • Redis VM線程(Redis 1.3.x - Redis 2.4)
  • Redis BIO線程(Redis 2.4+ 和 Redis 4.0+)
  • Redis 網絡IO線程(Redis 6.0+)

Redis VM線程(Redis 1.3.x - Redis 2.4)

實際上Redis很早就用到多線程,我們在 Redis 的 1.3.x (2010年)的源代碼中,能看到 Redis VM 相關的多線程代碼,這部分代碼主要是在 Redis 中實作線程化VM的能力。Redis VM 可以将 Redis 中很少通路的 value 存到磁盤中,也可以将占用記憶體大的 value 存到磁盤。

Redis VM 的底層是讀寫磁盤,是以在從磁盤讀寫 value 時,阻塞VM會産生阻塞主線程,影響所有的用戶端,導緻所有用戶端耗時增加。是以 Redis VM 又提供了線程化VM,可以将讀寫檔案資料的操作,放在IO線程中執行,這樣就隻影響一個用戶端(需要從檔案中讀出資料的用戶端),進而避免像阻塞VM那樣,提升所有用戶端的耗時。

我們從《Virtual Memory technical specification》https://redis.io/topics/internals-vm 能看到線程化VM的優勢。

列舉線程化VM設計目标的重要性:

簡單的實作,很少條件競争,簡單的鎖,VM系統多少與其餘Redis代碼解耦。

良好的性能,用戶端通路記憶體中的value沒有鎖了。

能夠在I / O線程中,對對象進行解碼/編碼。

但其實,Redis VM 是一個被棄用的短壽特性。在 Redis 1.3.x 出現 Redis VM 之後,Redis 2.4 是最後支援它的版本。Redis 1.3.x 在 2010年釋出,Redis 2.6 在 2012年釋出,Redis VM的生命在Redis項目中,隻持續了兩年。我們現在從《Virtual Memory》https://redis.io/topics/virtual-memory能看到棄用 Redis VM 的原因:

……我們發現使用VM有許多缺點和問題。在未來,我們隻想提供有史以來最好的記憶體資料庫(但仍像往常一樣在磁盤上持久化),而至少現在,不考慮對大于RAM的資料庫的支援。我們未來的工作重點是提供腳本,群集和更好的持久性。

我個人以為,去掉Redis VM的根本原因,可能是定位問題。Redis的準确定位了磁盤備份的記憶體資料庫,去掉VM後的Redis更純粹,更簡單,更容易讓使用者了解和使用。

下面簡單介紹下 Redis VM 的多線程代碼。

Redis主線程和IO線程使用任務隊列和單個互斥鎖進行通信。隊列定義和互斥鎖定義如下:

/* Global server state structure */
struct redisServer {
...
    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
    list *io_processing; /* List of VM I/O jobs being processed */
    list *io_processed; /* List of VM I/O jobs already processed */
    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
    pthread_attr_t io_threads_attr; /* attributes for threads creation */
...
}      

Redis在需要處理IO任務時(比如使用的記憶體超過最大記憶體等情況),Redis通過queueIOJob函數,将一個IO任務(iojob)入隊到任務隊列(io_newjobs),在queueIOJob中,會根據VM的最大線程數,判斷是否需要建立新的IO線程。

void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads < server.vm_max_threads)
        spawnIOThread();
}      

建立出的IO線程,主邏輯是IOThreadEntryPoint。IO線程會先從io_newjobs隊列中取出一個iojob,然後推入io_processing隊列,然後根據iojob中的type來執行對應的任務:

  • 從磁盤讀資料到記憶體
  • 計算需要的page數
  • 将記憶體swap到磁盤

執行完成後,将iojob推入io_processed隊列。最後,IO線程通過UINX管道,向主線程發送一個位元組,告訴主線程,有一個新的任務處理完成,需要主線程處理結果。

typedef struct iojob {
    int type;   /* Request type, REDIS_IOJOB_* */
    redisDb *db;/* Redis database */
    robj *key;  /* This I/O request is about swapping this key */
    robj *id;   /* Unique identifier of this job:
                   this is the object to swap for REDIS_IOREQ_*_SWAP, or the
                   vmpointer objct for REDIS_IOREQ_LOAD. */
    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
    off_t page; /* Swap page where to read/write the object */
    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
    int canceled; /* True if this command was canceled by blocking side of VM */
    pthread_t thread; /* ID of the thread processing this entry */
} iojob;
#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            /* No new jobs in queue, exit. */
            ...
                        unlockThreadedIO();
            return NULL;
        }
                ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
                j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
        unlockThreadedIO();
                ...
        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            vmpointer *vp = (vmpointer*)j->id;
            j->val = vmReadObjectFromSwap(j->page,vp->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            j->pages = rdbSavedObjectPages(j->val);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
        /* Done: insert the job into the processed queue */
        ...
                lockThreadedIO();
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        unlockThreadedIO();
        /* Signal the main thread there is new stuff to process */
        redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}      

​ ​ ​

總結

因為 Redis VM 特性已經從Redis中删除,相關代碼也比較古早,就不展開闡述了。

除了學習到多線程下,Redis 對資料讀寫的優化,我們在學習源碼和Redis的官方部落格時,能夠明顯感受到:

“去掉 Redis VM 的根本原因,可能是定位問題。Redis的準确定位了磁盤備份的記憶體資料庫,去掉VM後的Redis更純粹,更簡單,更容易讓使用者了解和使用。”

有時候,砍掉性能不好、意義不明的特性代碼,就是最好的性能優化吧。

Redis BIO線程(Redis 2.4+ 和 Redis 4.0+)

​ ​ ​

Redis BIO線程(Redis 2.4+)

從系列上一篇我們知道,從一開始,除了“短壽”的VM特性和VM線程,Redis主要還是單線程的。不過,我們在Redis的官方文章裡能看到,從 Redis 2.4 (2011年)開始,Redis會使用線程在背景執行一些主要跟磁盤I/O有關的慢速的I/O操作。我們把代碼分支切到 Redis 2.4 的分支上,能發現有兩個 BIO 線程,協助 Redis 進行AOF檔案同步刷盤和檔案删除的工作。

  • 怎麼找到多線程相關的代碼?

根據Redis的配置appendfsync,我們在代碼裡面找到配置對應的定義。

// config.c
...
    else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
        if (!strcasecmp(o->ptr,"no")) {
            server.appendfsync = APPENDFSYNC_NO;
        } else if (!strcasecmp(o->ptr,"everysec")) {
            server.appendfsync = APPENDFSYNC_EVERYSEC;
        } else if (!strcasecmp(o->ptr,"always")) {
            server.appendfsync = APPENDFSYNC_ALWAYS;
        } else {
            goto badfmt;
        }
    }
...      

通過搜尋 APPENDFSYNC_EVERYSEC ,我們找到了 backgroundRewriteDoneHandler: 

// aof.c
void backgroundRewriteDoneHandler(int statloc) {
......
    else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
        aof_background_fsync(newfd);
......
}
在 aof_background_fsync 函數中,發現了背景任務相關函數:
// aof.c
void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}      

搜尋關鍵詞 REDIS_BIO_AOF_FSYNC,最後我們找到了BIO子產品的頭檔案(bio.h),包含了BIO相關的接口和常量定義:

// bio.h
/* Exported API */
void bioInit(void);
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
unsigned long long bioPendingJobsOfType(int type);
void bioWaitPendingJobsLE(int type, unsigned long long num);
time_t bioOlderJobOfType(int type);
/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS       2      

最後,我們找到了 bioInit,發現 Redis 建立了2個 BIO 線程來執行 bioProcessBackgroundJobs 函數,而 bioInit 又是在 server.c 的 main 方法中,通過 initServer 函數來調用:

// bio.c
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;
    /* Initialization of state vars and objects */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_condvar[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }
    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);
    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
    }
}      
  • BIO多線程的意義

在 backgroundRewriteDoneHandler 函數中,我們會給 BIO 線程增加背景任務,然後讓 BIO 線程在背景處理一些工作,為了搞清楚 Redis 使用 BIO 多線程的意義,我們可以先弄清楚這個函數是做什麼的。

看注釋的描述,這個函數是在背景AOF重寫(BGREWRITEAOF)結束時調用,然後我們繼續往下看代碼,主要是一些寫檔案的操作,直到我們看到 aof.c 中有一段很詳細的注釋:

剩下要做的唯一事情就是将臨時檔案重命名為配置的檔案,并切換用于執行AOF寫入的檔案描述符。我們不希望close(2)或rename(2)調用在删除舊檔案時阻塞伺服器。有兩種可能的方案:

AOF已禁用,這是一次重寫。臨時檔案将重命名為配置的檔案。當該檔案已經存在時,它将被取消連結(unlink),這可能會阻塞server。

AOF已啟用,重寫的AOF将立即開始接收寫操作。将臨時檔案重命名為配置檔案後,原始AOF檔案描述符将關閉。由于這将是對該檔案的最後一個引用,是以關閉該檔案将導緻底層檔案被取消連結(unlink),這可能會阻塞server。

為了減輕取消連結(unlink)操作的阻塞效果(由方案1中的rename(2)或方案2中的close(2)引起),我們使用背景線程來解決此問題。首先,通過打開目标檔案,使方案1與方案2相同。rename(2)之後的取消連結(unlink)操作将在為其描述符調用close(2)時執行。到那時,保證這條分支原子性的一切都已發生,是以,隻要檔案描述符再次被釋放,我們就不在乎該關閉操作的影響或持續時間。

我們發現了Redis使用BIO線程(REDIS_BIO_CLOSE_FILE)的目的——背景線程删除檔案,避免因為删除大檔案耗時過長導緻主線程阻塞:在AOF重寫時,rename(2)或者close(2)檔案,可能會導緻系統調用執行删除檔案的操作,而删除檔案的操作是在目前程序執行(核心态),是以如果檔案較大,目前程序删除檔案的耗時就會比較長。而如果在主線程删除比較大的檔案,就會導緻主線程被磁盤IO阻塞。

//aof.c
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
void backgroundRewriteDoneHandler(int statloc) {
    int exitcode = WEXITSTATUS(statloc);
    int bysignal = WIFSIGNALED(statloc);
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        int nwritten;
        char tmpfile[256];
        long long now = ustime();
                ...
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.bgrewritechildpid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            redisLog(REDIS_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }
        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
        if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
            if (nwritten == -1) {
                redisLog(REDIS_WARNING,
                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            } else {
                redisLog(REDIS_WARNING,
                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            }
            close(newfd);
            goto cleanup;
        }
        redisLog(REDIS_NOTICE,
            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
         * use a background thread to take care of this. First, we
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
        if (server.appendfd == -1) {
            /* AOF disabled */
             /* Don't care if this fails: oldfd will be -1 and we handle that.
              * One notable case of -1 return is if the old file does
              * not exist. */
             oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
        } else {
            /* AOF enabled */
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
        }
        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        if (rename(tmpfile,server.appendfilename) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to rename the temporary AOF: %s", strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        if (server.appendfd == -1) {
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
            close(newfd);
        } else {
            /* AOF enabled, replace the old fd with the new one. */
            oldfd = server.appendfd;
            server.appendfd = newfd;
            if (server.appendfsync == APPENDFSYNC_ALWAYS)
                aof_fsync(newfd);
            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            server.appendseldb = -1; /* Make sure SELECT is re-issued */
            aofUpdateCurrentSize();
            server.auto_aofrewrite_base_size = server.appendonly_current_size;
            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
            sdsfree(server.aofbuf);
            server.aofbuf = sdsempty();
        }
        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
        /* Asynchronously close the overwritten AOF. */
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
        redisLog(REDIS_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated by signal %d",
            WTERMSIG(statloc));
    }
cleanup:
    sdsfree(server.bgrewritebuf);
    server.bgrewritebuf = sdsempty();
    aofRemoveTempFile(server.bgrewritechildpid);
    server.bgrewritechildpid = -1;
}      

我們回到 backgroundRewriteDoneHandler 函數中調用的 aof_background_fsync 函數,在這個函數裡,我們發現了另一個BIO線程(REDIS_BIO_AOF_FSYNC)的任務建立代碼:

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}      

閱讀 bioCreateBackgroundJob 函數的代碼,我們發現 Redis 在寫對應Job類型的任務隊列時加了互斥鎖(mutex),寫完隊列後通過釋放條件變量和互斥鎖,用來激活等待條件變量的 BIO線程,讓 BIO線程繼續執行任務隊列的任務,這樣保證隊列在多線程下的資料一緻性(還增加了對應 BIO類型的IO等待計數,暫時我們用不上),而 Redis BIO 線程就是從 BIO 的任務隊列不斷取任務的:

// bio.c
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_condvar[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}      

接着我們回到 BIO 線程的主函數 bioProcessBackgroundJobs,我們驗證了 BIO 線程執行邏輯,BIO線程通過等待互斥鎖和條件變量來判斷是否繼續讀取隊列。如前面的注釋所說,在執行 REDIS_BIO_CLOSE_FILE 類型的任務時,調用的是 close(fd) 函數。繼續閱讀代碼,發現在執行 REDIS_BIO_AOF_FSYNC 類型的任務時,調用的是函數 aof_fsync:

// bio.c
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    pthread_detach(pthread_self());
    pthread_mutex_lock(&bio_mutex[type]);
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);
        /* Process the job accordingly to its type. */
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);
        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}




我們繼續看 aof_fsync 的函數定義,發現 aof_fsync 其實就是 fdatasync 和 fsync :




/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif      

熟悉 Redis 的朋友知道,這是 Redis 2.4 中 BIO線程關于 Redis AOF 持久性的設計:

使用AOF Redis更加持久;

你有不同的fsync政策:完全不fsync,每秒fsync,每個查詢fsync。使用fsync的預設政策,每秒的寫入性能當然很好(fsync是使用背景線程執行的,并且當沒有fsync執行時,主線程将盡力執行寫入操作),但是你會損失一秒鐘的寫入資料。——《Redis Persistence》https://redis.io/topics/persistence

AOF advantages

而為什麼fsync需要使用 BIO線程在背景執行,其實就很簡單了。因為 Redis 需要保證資料的持久化,資料寫入檔案時,其實隻是寫到緩沖區,隻有資料刷入磁盤,才能保證資料不會丢失,而 fsync将緩沖區刷入磁盤是一個同步IO操作。是以,在主線程執行緩沖區刷盤的操作,雖然能更好的保證資料的持久化,但是卻會阻塞主線程。

最後,為了減少阻塞,Redis 使用 BIO線程處理 fsync。但其實這并不意味着 Redis 不再受 fsync 的影響,實際上如果 fsync 過于緩慢(資料2S以上未刷盤),Redis主線程會不計代價的阻塞執行檔案寫入(Redis persistence demystified http://oldblog.antirez.com/m/p.php?i=251  #appendfsync everysec)。

​ ​

Redis BIO線程(Redis 4.0+)

從 Redis 4.0 (2017年)開始,又增加了一個新的BIO線程,我們在 bio.h 中發現了新的定義——BIO_LAZY_FREE,這個線程主要用來協助 Redis 異步釋放記憶體。在antirez的《Lazy Redis is better Redis》http://antirez.com/news/93中,我們能了解到為什麼要将釋放記憶體放在異步線程中:

(漸進式回收記憶體)這是一個很好的技巧,效果很好。但是,我們還是必須在一個線程中執行此操作,這仍然讓我感到很難過。當有很多邏輯需要處理,并且lazy free也非常頻繁時,ops(每秒的操作數)會減少到正常值的65%左右。​

釋放不同線程中的對象會更簡單:如果有一個線程正忙于僅執行釋放操作,則釋放應該總是比在資料集中添加新值快。

當然,主線程和lazy free線程之間在調用記憶體配置設定器上也存在一些競争,但是Redis隻會花一小部分時間在記憶體配置設定上,而将更多的時間花在I/O,指令分派,緩存未命中等等。

對這個特性背景感興趣的朋友還可以看看這個issue: Lazy free of keys and databases #1748  github.com/redis/re...ues/1748

// bio.h
/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3      

我們回頭看,發現在原來的基礎上,增加了 BIO_LAZY_FREE 的部分。lazy free 的任務有三種:

  • 釋放對象
  • 釋放 Redis Database
  • 釋放 跳表(skip list)
// bio.c
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;
    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }
    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);
        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);
        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}      

其中釋放對象的主要邏輯在 decrRefCount 中:

// lazyfree.c
/* Release objects from the lazyfree thread. It's just decrRefCount()
 * updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1);
}      

按照不同的資料類型,執行不同的記憶體釋放邏輯:

// object.c
void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        switch(o->type) {
        case OBJ_STRING: freeStringObject(o); break;
        case OBJ_LIST: freeListObject(o); break;
        case OBJ_SET: freeSetObject(o); break;
        case OBJ_ZSET: freeZsetObject(o); break;
        case OBJ_HASH: freeHashObject(o); break;
        case OBJ_MODULE: freeModuleObject(o); break;
        default: serverPanic("Unknown object type"); break;
        }
        zfree(o);
    } else {
        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
    }
}      

​ ​ ​

擴充

其他的相關内容就不一一說明了,這裡有一個擴充内容,算是 Redis 開發背後的故事。

我參考學習了文章《Lazy Redis is better Redis》http://antirez.com/news/93,發現其實 antirez 在設計 lazy free 時還是比較糾結的。因為 lazy free 的特性涉及到了 Redis 本身的内部特性 —— 共享對象 (sharing objects),lazy free 特性的推進受到了共享對象的影響。這裡隻說說結論,最後為了實作 lazy free 的特性,antirez 去掉了共享對象的特性。直到現在 (Redis 6.0),共享對象僅在少部分地方出現,我們追蹤代碼的話,可以發現 robj 結構體的 refcount 目前大部分情況下等于 1。當然還有少部分情況,比如 server.c 中初始化建立整型數字的共享字元串,又或者手動增加計數來降低記憶體對象的回收速度等等。這就是為什麼 Redis 明明去掉了共享對象的設計,但是我們還能看到 refcount 相關的代碼,這大概就是曆史遺留原因吧(手動狗頭)。

// server.c
#define OBJ_SHARED_REFCOUNT INT_MAX
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;
// server.c
void createSharedObjects(void) {
......
    for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
        shared.integers[j] =
            makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
        shared.integers[j]->encoding = OBJ_ENCODING_INT;
    }
......
}      

Redis 網絡IO線程(Redis 6.0+)

從2020年正式釋出的 Redis 6.0 開始開始,Redis增加了與用戶端IO讀寫線程,減輕主線程與用戶端的網絡IO負擔。而實際上,這個設想在2015年開發 lazy free 特性的時候就已經出現了。《Lazy Redis is better Redis》http://antirez.com/news/93 #Not just lazy freeing :

既然聚合資料類型的值是完全不共享的,并且用戶端輸出緩沖區也不包含共享對象,有很多地方可以利用這一點。例如,最終有可能在 Redis 中實作線程化I/O,以便由不同的線程為不同的用戶端提供服務。這意味着我們僅在通路資料庫時才具有全局鎖定,但是用戶端讀取/寫入系統調用,甚至解析用戶端發送的指令資料,都可以在不同的線程中進行。這是一種類似 memcached 的設計,我期待去實作和測試。

而且,有可能實作對某一線程中的聚合資料類型執行某些慢速操作,隻會導緻“幾個”鍵被“阻塞”,而所有其他用戶端都可以繼續工作。這可以通過與我們目前使用阻塞操作(請參閱blocking.c)非常相似的方式來實作,此外還可以使用哈希表來存儲目前正在使用哪些鍵以及它使用的用戶端。是以,如果客戶要求使用SMEMBERS之類的東西,就能夠僅鎖定鍵,處理建立輸出緩沖區的請求,然後再次釋放鍵。如果某個鍵被阻塞了,則嘗試通路同一鍵的用戶端都将被阻塞。

所有這些都需要進行更大幅度的内部修改,但是最重要的是,我們的禁忌要少一些。我們可以用更少的緩存丢失和更少記憶體占用的聚合資料類型,來彌補對象複制的時間,我們現在可以暢想無共享設計的線程化 Redis ,這是唯一可以輕松戰勝我們單線程架構的設計。過去,如果為了實作并發通路,在資料結構和對象中增加一系列互斥鎖,始終會被視為一個壞主意。但現在幸運的是,有方法可以兩全其美。我們可現在以仍然像過去那樣,從主線程繼續執行所有快速的操作。而要在性能方面有所收獲,需要增加一些複雜性作為代價。

上述是 antirez 在《Lazy Redis is better Redis》的 Not just lazy freeing 部分所分享的内容,了解這個,我們就能知道為何 Redis 要實作 IO 線程化了:

  • IO單線程時,某些鍵的阻塞操作會阻塞整個線程,而使用多線程,可以實作隻有通路相同鍵的用戶端被阻塞。
  • 去掉了共享對象,讓IO線程化更加簡單,不再需要向資料結構和對象中增加一系列的互斥鎖來實作多線程,進而保留了Redis單線程的“傳統藝能”。(PS:去掉共享對象,會增加記憶體的複制,但是也可以帶來記憶體上更緊湊的資料類型,也因為記憶體上更加連續帶來更少的緩存丢失。)

接下來,我們從 redis server.c 中的main()函數開始,看看IO線程是怎麼運作的。

​ ​ ​

IO線程的建立

通過 pthread_create 搜尋到 initThreadedIO() 函數,然後整理下IO線程的建立過程:

無論是否哨兵模式,Redis都會執行InitServerLast:

int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();
    ......some log......
    readOOMScoreAdj();
    initServer();
    if (background || server.pidfile) createPidFile();
    redisSetProcTitle(argv[0]);
    redisAsciiArt();
    checkTcpBacklogSettings();
    if (!server.sentinel_mode) {
        moduleLoadFromQueue();
        ACLLoadUsersAtStartup();
        InitServerLast();
        loadDataFromDisk();
        ......
    } else {
        InitServerLast();
        sentinelIsRunning();
        ......
    }
    ......
    redisSetCpuAffinity(server.server_cpulist);
    setOOMScoreAdj(-1);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}      

initServer()中,Redis會初始化相關的任務隊列,而在InitServerLast中,才會初始化網絡IO相關的線程資源,因為Redis的網絡IO多線程是可以配置的。Redis實作了網絡IO多線程,但是網絡IO的邏輯,既可以在ThreadedIO線程執行,也可以在主線程執行,給使用者提供了選擇:

void initServer(void) {
    ......
    /* Initialization after setting defaults from the config system. */
    server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
    server.hz = server.config_hz;
    server.pid = getpid();
    server.in_fork_child = CHILD_TYPE_NONE;
    server.main_thread_id = pthread_self();
    server.current_client = NULL; // 目前正在執行指令的用戶端
    server.errors = raxNew();
    server.fixed_time_expire = 0;
    server.clients = listCreate(); // 活躍的用戶端清單
    server.clients_index = raxNew(); // 按照 client_id 索引的活躍的用戶端字典
    server.clients_to_close = listCreate(); // 需要異步關閉的用戶端清單
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.clients_pending_write = listCreate(); // 等待寫或者安裝handler的用戶端清單
    server.clients_pending_read = listCreate(); // 等待讀socket緩沖區的用戶端清單
    server.clients_timeout_table = raxNew();
    server.replication_allowed = 1;
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate(); // 下一個循環之前,要取消阻塞的用戶端清單
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.client_pause_type = 0;
    server.paused_clients = listCreate();
    server.events_processed_while_blocked = 0;
    server.system_memory_size = zmalloc_get_memory_size();
    server.blocked_last_cron = 0;
    server.blocking_op_nesting = 0;
    ......
}
在 InitServerLast()中 ,除了 initThreadedIO (Redis網絡IO線程),我們還能看到bioInit(background I/O 初始化),兩個子產品使用了不同的資源:
/* Some steps in server initialization need to be done last (after modules
 * are loaded).
 * Specifically, creation of threads due to a race bug in ld.so, in which
 * Thread Local Storage initialization collides with dlopen call.
 * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}      

接下來我們來看看 Redis 源碼的 networking.c 檔案:io_threads 線程池,io_threads_mutex 互斥鎖,io_threads_pending IO線程用戶端等待數,io_threads_list 每個IO線程的用戶端清單。

/* ==========================================================================
 * Threaded I/O
 * ========================================================================== */
#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_OP_READ 0
#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_op;      /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
 * used. We spawn io_threads_num-1 threads, since one is the main thread
 * itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];      

然後就是建立線程的initThreadedIO 函數。初始化的時候IO線程處于未激活狀态,等待後續激活,如果 Redis 配置的 io_threads_num 為 1,代表IO使用主線程單線程處理,如果線程數配置超過最大值 IO_THREADS_MAX_NUM (128) 則異常退出,最後,建立的線程都将被鎖上直到被喚醒:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */
        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}      

​ ​ ​

IO線程的工作流程

Redis 在啟動時,初始化函數 initServer 将 beforeSleep 和 afterSleep 注冊為事件循環休眠前和休眠後的handler :

void initServer(void) {
......
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
......
    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
......
}      

事件循環執行 beforeSleep 時,會調用handleClientsWithPendingReadsUsingThreads 和handleClientsWithPendingWritesUsingThreads,分别是IO讀寫任務的配置設定邏輯。特殊情況下,在AOF和RDB資料恢複(從檔案讀取資料到記憶體)的時候,Redis會通過processEventsWhileBlocked調用 beforeSleep,這個時候,隻會執行handleClientsWithPendingReadsUsingThreads ,這個時候IO寫是同步的:

/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors.
 *
 * Note: This function is (currently) called from two functions:
 * 1. aeMain - The main server loop
 * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
 *
 * If it was called from processEventsWhileBlocked we don't want
 * to perform all actions (For example, we don't want to expire
 * keys), but we do need to perform some actions.
 *
 * The most important is freeClientsInAsyncFreeQueue but we also
 * call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
......
    /* Just call a subset of vital functions in case we are re-entering
     * the event loop from processEventsWhileBlocked(). Note that in this
     * case we keep track of the number of events we are processing, since
     * processEventsWhileBlocked() wants to stop ASAP if there are no longer
     * events to handle. */
    if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;
        processed += handleClientsWithPendingReadsUsingThreads();
        processed += tlsProcessPendingData();
        processed += handleClientsWithPendingWrites();
        processed += freeClientsInAsyncFreeQueue();
        server.events_processed_while_blocked += processed;
        return;
    }
......
    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();
......
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    /* Close clients that need to be closed asynchronous */
    freeClientsInAsyncFreeQueue();
......
    /* Before we are going to sleep, let the threads access the dataset by
     * releasing the GIL. Redis main thread will not touch anything at this
     * time. */
    if (moduleCount()) moduleReleaseGIL();
    /* Do NOT add anything below moduleReleaseGIL !!! */
}      

在handleClientsWithPendingReadsUsingThreads函數中,Redis會執行IO讀的任務配置設定邏輯,當Redis配置了IO線程的讀取和解析(io_threads_do_reads),可讀的handler會将普通的用戶端放到用戶端隊列中處理,而不是同步處理。這個函數将隊列配置設定給IO線程處理,累積讀取buffer中的資料:

  • IO線程在初始化時未激活,Redis配置了用IO線程讀取和解析資料(io_threads_do_reads),才會繼續執行;
  • 讀取待處理的用戶端清單 clients_pending_read,将任務按照取模平均配置設定到不同線程的任務隊列io_threads_list[target_id];
  • 通過setIOPendingCount給對應的IO線程設定條件變量,激活IO線程;
  • 依然在主線程處理一些用戶端請求;
  • 如果用戶端等待寫入,并且響應的buffer還有待寫資料,或有待發送給用戶端的響應對象,則給用戶端的連接配接安裝寫handler;
/* When threaded I/O is also enabled for the reading + parsing side, the
 * readable handler will just put normal clients into a queue of clients to
 * process (instead of serving them synchronously). This function runs
 * the queue using the I/O threads, and process them in order to accumulate
 * the reads in the buffers, and also parse the first command available
 * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
    // IO線程在初始化時未激活,Redis配置了用IO線程讀取和解析資料(io_threads_do_reads),才會繼續執行
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;
    /* Distribute the clients across N different lists. */
    // 讀取待處理的用戶端清單 clients_pending_read,
    // 将任務按照取模平均配置設定到不同線程的任務隊列io_threads_list[target_id]
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 通過setIOPendingCount給對應的IO線程設定條件變量,激活IO線程
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    // 依然在主線程處理一些用戶端請求
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }
        processInputBuffer(c);
        /* We may have pending replies if a thread readQueryFromClient() produced
         * replies and did not install a write handler (it can't).
         */
        // 如果用戶端等待寫入,
        // 并且響應的buffer還有待寫資料,或有待發送給用戶端的響應對象,
        // 則給用戶端的連接配接安裝寫handler
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
    /* Update processed count on server */
    server.stat_io_reads_processed += processed;
    return processed;
}      

在 handleClientsWithPendingWritesUsingThreads 中,Redis會執行IO線程的啟動,IO線程寫任務的配置設定等邏輯:

  • 如果沒有開啟多線程,或者等待的用戶端數量小于線程數的兩倍,則執行同步代碼;
  • 如果 IO 線程沒有激活,則激活(在initThreadedIO函數建立線程時處于未激活狀态);
  • 如果遇到需要關閉的用戶端(CLIENT_CLOSE_ASAP),則将其從待處理的用戶端清單裡删除;
  • 讀取待處理的用戶端清單 clients_pending_write ,将任務按照取模平均配置設定到不同線程的任務隊列io_threads_list[target_id];
  • 通過setIOPendingCount給對應的IO線程設定條件變量,激活IO線程;
  • 依然在主線程處理一些用戶端請求;
  • 如果響應的buffer還有待寫資料,或者還有待發送給用戶端的響應對象,則給用戶端的連接配接安裝寫handler;
  • 最後調用freeClientAsync 将待釋放的用戶端放入clients_to_close隊列,等待beforeSleep執行freeClientsInAsyncFreeQueue時實作異步釋放用戶端;
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    // 如果沒有開啟多線程,或者等待的用戶端數量小于線程數的兩倍,則執行同步代碼
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    /* Start threads if needed. */
    // 如果 IO 線程沒有激活,則激活(在initThreadedIO函數建立線程時處于未激活狀态)
    if (!server.io_threads_active) startThreadedIO();
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        // 如果遇到需要關閉的用戶端(CLIENT_CLOSE_ASAP),則将其從待處理的用戶端清單裡删除
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 通過setIOPendingCount給對應的IO線程設定條件變量,激活IO線程
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    // 依然在主線程處理一些用戶端請求
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        /* Install the write handler if there are pending writes in some
         * of the clients. */
        // 如果響應的buffer還有待寫資料,或者還有待發送給用戶端的響應對象,
        // 則給用戶端的連接配接安裝寫handler
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            // 将待釋放的用戶端放入clients_to_close隊列,
            // 等待beforeSleep執行freeClientsInAsyncFreeQueue時實作異步釋放用戶端
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    /* Update processed count on server */
    server.stat_io_writes_processed += processed;
    return processed;
}      

​ ​ ​

IO線程的主邏輯

在 IOThreadMain 函數中,是 Redis IO線程的主邏輯。

我們發現IO線程在建立後,會通過redisSetCpuAffinity函數和server_cpulist參數,來設定線程的CPU的親和性,合理配置線程的CPU親和性,能夠一定程度上提升性能。

之後,IO線程會根據條件變量 io_threads_pending[id] 判斷是否有等待的IO需要處理,然後從 io_threads_list[myid] 中擷取分給自己的 client,再根據 io_thread_op 來判斷,這個時候需要執行讀寫IO中的哪一個, readQueryFromClient 還是 writeToClient :

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(io_threads_pending[id] != 0);
        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
        if (tio_debug) printf("[%ld] Done\n", id);
    }
}      

總結

從Redis VM開始,到Redis BIO,再到最後的IO多線程,我們能看到 Redis 正在逐漸的向線程化的方向發展。特别是在實作Lazy Free之後(Redis BIO),antirez似乎嘗到了多線程的好處,在保證db操作單線程的情況下,讓Redis發揮CPU一部分多核多線程的實力。我們不難發現,Redis 的多線程不過是順勢而為罷了,如果單線程沒有瓶頸,就不會産生使用多線程的Redis。再結合現狀來看,畢竟時代變了,從多年前的單核伺服器,到後來的雙核,四核伺服器,再到現在動辄八核,十六核的伺服器:單線程模型固然簡單,代碼清晰,但是在摩爾定律失效,多核多線程的時代洪流下,有誰能夠拒絕多線程的好處呢?