天天看点

Redis单线程不行了,快来割VM/ BIO/ IO多线程的韭菜!(附源码)

作者介绍

Insutanto,一个普通的编程手艺人。

背景

Redis在早期,曾因单线程“闻名”。在Redis的FAQ里有一个提问

《Redis is single threaded. How can I exploit multiple CPU/cores?》

​​https://redis.io/topic​​s/faq,说明了redis使用单线程的原因:

CPU通常并不是Redis的瓶颈,因为Redis通常要么受内存限制,要么受网络限制。比如说,一般在Linux系统上运行的流水线Redis,每秒可以交付一百万个请求,如果你的应用程序主要使用O(N)或O(log(N))命令,几乎不会使用过多的CPU 。

......

不过从Redis 4.0开始,Redis就开始使用更多的线程了。目前使用多线程的场景(Redis 4.0),仅限于在后台删除对象,以及通过Redis modules实现的阻塞命令。在未来的版本中,计划是让Redis越来越线程化。

这不禁让我好奇,Redis一开始是单线程的吗?又是怎么朝多线程演化的呢,又是为什么让Redis越来越线程化呢。在阅读了几篇文章后,我决定自己读一遍相关源代码,了解Redis的多线程演化历史。

Redis 多线程源码分析系列指南:

  • Redis VM线程(Redis 1.3.x - Redis 2.4)
  • Redis BIO线程(Redis 2.4+ 和 Redis 4.0+)
  • Redis 网络IO线程(Redis 6.0+)

Redis VM线程(Redis 1.3.x - Redis 2.4)

实际上Redis很早就用到多线程,我们在 Redis 的 1.3.x (2010年)的源代码中,能看到 Redis VM 相关的多线程代码,这部分代码主要是在 Redis 中实现线程化VM的能力。Redis VM 可以将 Redis 中很少访问的 value 存到磁盘中,也可以将占用内存大的 value 存到磁盘。

Redis VM 的底层是读写磁盘,所以在从磁盘读写 value 时,阻塞VM会产生阻塞主线程,影响所有的客户端,导致所有客户端耗时增加。所以 Redis VM 又提供了线程化VM,可以将读写文件数据的操作,放在IO线程中执行,这样就只影响一个客户端(需要从文件中读出数据的客户端),从而避免像阻塞VM那样,提升所有客户端的耗时。

我们从《Virtual Memory technical specification》https://redis.io/topics/internals-vm 能看到线程化VM的优势。

列举线程化VM设计目标的重要性:

简单的实现,很少条件竞争,简单的锁,VM系统多少与其余Redis代码解耦。

良好的性能,客户端访问内存中的value没有锁了。

能够在I / O线程中,对对象进行解码/编码。

但其实,Redis VM 是一个被弃用的短寿特性。在 Redis 1.3.x 出现 Redis VM 之后,Redis 2.4 是最后支持它的版本。Redis 1.3.x 在 2010年发布,Redis 2.6 在 2012年发布,Redis VM的生命在Redis项目中,只持续了两年。我们现在从《Virtual Memory》https://redis.io/topics/virtual-memory能看到弃用 Redis VM 的原因:

……我们发现使用VM有许多缺点和问题。在未来,我们只想提供有史以来最好的内存数据库(但仍像往常一样在磁盘上持久化),而至少现在,不考虑对大于RAM的数据库的支持。我们未来的工作重点是提供脚本,群集和更好的持久性。

我个人以为,去掉Redis VM的根本原因,可能是定位问题。Redis的准确定位了磁盘备份的内存数据库,去掉VM后的Redis更纯粹,更简单,更容易让用户理解和使用。

下面简单介绍下 Redis VM 的多线程代码。

Redis主线程和IO线程使用任务队列和单个互斥锁进行通信。队列定义和互斥锁定义如下:

/* Global server state structure */
struct redisServer {
...
    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
    list *io_processing; /* List of VM I/O jobs being processed */
    list *io_processed; /* List of VM I/O jobs already processed */
    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
    pthread_attr_t io_threads_attr; /* attributes for threads creation */
...
}      

Redis在需要处理IO任务时(比如使用的内存超过最大内存等情况),Redis通过queueIOJob函数,将一个IO任务(iojob)入队到任务队列(io_newjobs),在queueIOJob中,会根据VM的最大线程数,判断是否需要创建新的IO线程。

void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads < server.vm_max_threads)
        spawnIOThread();
}      

创建出的IO线程,主逻辑是IOThreadEntryPoint。IO线程会先从io_newjobs队列中取出一个iojob,然后推入io_processing队列,然后根据iojob中的type来执行对应的任务:

  • 从磁盘读数据到内存
  • 计算需要的page数
  • 将内存swap到磁盘

执行完成后,将iojob推入io_processed队列。最后,IO线程通过UINX管道,向主线程发送一个字节,告诉主线程,有一个新的任务处理完成,需要主线程处理结果。

typedef struct iojob {
    int type;   /* Request type, REDIS_IOJOB_* */
    redisDb *db;/* Redis database */
    robj *key;  /* This I/O request is about swapping this key */
    robj *id;   /* Unique identifier of this job:
                   this is the object to swap for REDIS_IOREQ_*_SWAP, or the
                   vmpointer objct for REDIS_IOREQ_LOAD. */
    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
    off_t page; /* Swap page where to read/write the object */
    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
    int canceled; /* True if this command was canceled by blocking side of VM */
    pthread_t thread; /* ID of the thread processing this entry */
} iojob;
#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            /* No new jobs in queue, exit. */
            ...
                        unlockThreadedIO();
            return NULL;
        }
                ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
                j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
        unlockThreadedIO();
                ...
        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            vmpointer *vp = (vmpointer*)j->id;
            j->val = vmReadObjectFromSwap(j->page,vp->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            j->pages = rdbSavedObjectPages(j->val);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
        /* Done: insert the job into the processed queue */
        ...
                lockThreadedIO();
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        unlockThreadedIO();
        /* Signal the main thread there is new stuff to process */
        redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}      

​ ​ ​

总结

因为 Redis VM 特性已经从Redis中删除,相关代码也比较古早,就不展开阐述了。

除了学习到多线程下,Redis 对数据读写的优化,我们在学习源码和Redis的官方博客时,能够明显感受到:

“去掉 Redis VM 的根本原因,可能是定位问题。Redis的准确定位了磁盘备份的内存数据库,去掉VM后的Redis更纯粹,更简单,更容易让用户理解和使用。”

有时候,砍掉性能不好、意义不明的特性代码,就是最好的性能优化吧。

Redis BIO线程(Redis 2.4+ 和 Redis 4.0+)

​ ​ ​

Redis BIO线程(Redis 2.4+)

从系列上一篇我们知道,从一开始,除了“短寿”的VM特性和VM线程,Redis主要还是单线程的。不过,我们在Redis的官方文章里能看到,从 Redis 2.4 (2011年)开始,Redis会使用线程在后台执行一些主要跟磁盘I/O有关的慢速的I/O操作。我们把代码分支切到 Redis 2.4 的分支上,能发现有两个 BIO 线程,协助 Redis 进行AOF文件同步刷盘和文件删除的工作。

  • 怎么找到多线程相关的代码?

根据Redis的配置appendfsync,我们在代码里面找到配置对应的定义。

// config.c
...
    else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
        if (!strcasecmp(o->ptr,"no")) {
            server.appendfsync = APPENDFSYNC_NO;
        } else if (!strcasecmp(o->ptr,"everysec")) {
            server.appendfsync = APPENDFSYNC_EVERYSEC;
        } else if (!strcasecmp(o->ptr,"always")) {
            server.appendfsync = APPENDFSYNC_ALWAYS;
        } else {
            goto badfmt;
        }
    }
...      

通过搜索 APPENDFSYNC_EVERYSEC ,我们找到了 backgroundRewriteDoneHandler: 

// aof.c
void backgroundRewriteDoneHandler(int statloc) {
......
    else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
        aof_background_fsync(newfd);
......
}
在 aof_background_fsync 函数中,发现了后台任务相关函数:
// aof.c
void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}      

搜索关键词 REDIS_BIO_AOF_FSYNC,最后我们找到了BIO模块的头文件(bio.h),包含了BIO相关的接口和常量定义:

// bio.h
/* Exported API */
void bioInit(void);
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
unsigned long long bioPendingJobsOfType(int type);
void bioWaitPendingJobsLE(int type, unsigned long long num);
time_t bioOlderJobOfType(int type);
/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS       2      

最后,我们找到了 bioInit,发现 Redis 创建了2个 BIO 线程来执行 bioProcessBackgroundJobs 函数,而 bioInit 又是在 server.c 的 main 方法中,通过 initServer 函数来调用:

// bio.c
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;
    /* Initialization of state vars and objects */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_condvar[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }
    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);
    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
    }
}      
  • BIO多线程的意义

在 backgroundRewriteDoneHandler 函数中,我们会给 BIO 线程增加后台任务,然后让 BIO 线程在后台处理一些工作,为了搞清楚 Redis 使用 BIO 多线程的意义,我们可以先弄清楚这个函数是做什么的。

看注释的描述,这个函数是在后台AOF重写(BGREWRITEAOF)结束时调用,然后我们继续往下看代码,主要是一些写文件的操作,直到我们看到 aof.c 中有一段很详细的注释:

剩下要做的唯一事情就是将临时文件重命名为配置的文件,并切换用于执行AOF写入的文件描述符。我们不希望close(2)或rename(2)调用在删除旧文件时阻塞服务器。有两种可能的方案:

AOF已禁用,这是一次重写。临时文件将重命名为配置的文件。当该文件已经存在时,它将被取消链接(unlink),这可能会阻塞server。

AOF已启用,重写的AOF将立即开始接收写操作。将临时文件重命名为配置文件后,原始AOF文件描述符将关闭。由于这将是对该文件的最后一个引用,因此关闭该文件将导致底层文件被取消链接(unlink),这可能会阻塞server。

为了减轻取消链接(unlink)操作的阻塞效果(由方案1中的rename(2)或方案2中的close(2)引起),我们使用后台线程来解决此问题。首先,通过打开目标文件,使方案1与方案2相同。rename(2)之后的取消链接(unlink)操作将在为其描述符调用close(2)时执行。到那时,保证这条分支原子性的一切都已发生,因此,只要文件描述符再次被释放,我们就不在乎该关闭操作的影响或持续时间。

我们发现了Redis使用BIO线程(REDIS_BIO_CLOSE_FILE)的目的——后台线程删除文件,避免因为删除大文件耗时过长导致主线程阻塞:在AOF重写时,rename(2)或者close(2)文件,可能会导致系统调用执行删除文件的操作,而删除文件的操作是在当前进程执行(内核态),所以如果文件较大,当前进程删除文件的耗时就会比较长。而如果在主线程删除比较大的文件,就会导致主线程被磁盘IO阻塞。

//aof.c
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
 * Handle this. */
void backgroundRewriteDoneHandler(int statloc) {
    int exitcode = WEXITSTATUS(statloc);
    int bysignal = WIFSIGNALED(statloc);
    if (!bysignal && exitcode == 0) {
        int newfd, oldfd;
        int nwritten;
        char tmpfile[256];
        long long now = ustime();
                ...
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.bgrewritechildpid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            redisLog(REDIS_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }
        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
        if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
            if (nwritten == -1) {
                redisLog(REDIS_WARNING,
                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            } else {
                redisLog(REDIS_WARNING,
                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            }
            close(newfd);
            goto cleanup;
        }
        redisLog(REDIS_NOTICE,
            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
        /* The only remaining thing to do is to rename the temporary file to
         * the configured file and switch the file descriptor used to do AOF
         * writes. We don't want close(2) or rename(2) calls to block the
         * server on old file deletion.
         *
         * There are two possible scenarios:
         *
         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
         * file will be renamed to the configured file. When this file already
         * exists, it will be unlinked, which may block the server.
         *
         * 2) AOF is ENABLED and the rewritten AOF will immediately start
         * receiving writes. After the temporary file is renamed to the
         * configured file, the original AOF file descriptor will be closed.
         * Since this will be the last reference to that file, closing it
         * causes the underlying file to be unlinked, which may block the
         * server.
         *
         * To mitigate the blocking effect of the unlink operation (either
         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
         * use a background thread to take care of this. First, we
         * make scenario 1 identical to scenario 2 by opening the target file
         * when it exists. The unlink operation after the rename(2) will then
         * be executed upon calling close(2) for its descriptor. Everything to
         * guarantee atomicity for this switch has already happened by then, so
         * we don't care what the outcome or duration of that close operation
         * is, as long as the file descriptor is released again. */
        if (server.appendfd == -1) {
            /* AOF disabled */
             /* Don't care if this fails: oldfd will be -1 and we handle that.
              * One notable case of -1 return is if the old file does
              * not exist. */
             oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
        } else {
            /* AOF enabled */
            oldfd = -1; /* We'll set this to the current AOF filedes later. */
        }
        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        if (rename(tmpfile,server.appendfilename) == -1) {
            redisLog(REDIS_WARNING,
                "Error trying to rename the temporary AOF: %s", strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        if (server.appendfd == -1) {
            /* AOF disabled, we don't need to set the AOF file descriptor
             * to this new file, so we can close it. */
            close(newfd);
        } else {
            /* AOF enabled, replace the old fd with the new one. */
            oldfd = server.appendfd;
            server.appendfd = newfd;
            if (server.appendfsync == APPENDFSYNC_ALWAYS)
                aof_fsync(newfd);
            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
                aof_background_fsync(newfd);
            server.appendseldb = -1; /* Make sure SELECT is re-issued */
            aofUpdateCurrentSize();
            server.auto_aofrewrite_base_size = server.appendonly_current_size;
            /* Clear regular AOF buffer since its contents was just written to
             * the new AOF from the background rewrite buffer. */
            sdsfree(server.aofbuf);
            server.aofbuf = sdsempty();
        }
        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
        /* Asynchronously close the overwritten AOF. */
        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
        redisLog(REDIS_VERBOSE,
            "Background AOF rewrite signal handler took %lldus", ustime()-now);
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated with error");
    } else {
        redisLog(REDIS_WARNING,
            "Background AOF rewrite terminated by signal %d",
            WTERMSIG(statloc));
    }
cleanup:
    sdsfree(server.bgrewritebuf);
    server.bgrewritebuf = sdsempty();
    aofRemoveTempFile(server.bgrewritechildpid);
    server.bgrewritechildpid = -1;
}      

我们回到 backgroundRewriteDoneHandler 函数中调用的 aof_background_fsync 函数,在这个函数里,我们发现了另一个BIO线程(REDIS_BIO_AOF_FSYNC)的任务创建代码:

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}      

阅读 bioCreateBackgroundJob 函数的代码,我们发现 Redis 在写对应Job类型的任务队列时加了互斥锁(mutex),写完队列后通过释放条件变量和互斥锁,用来激活等待条件变量的 BIO线程,让 BIO线程继续执行任务队列的任务,这样保证队列在多线程下的数据一致性(还增加了对应 BIO类型的IO等待计数,暂时我们用不上),而 Redis BIO 线程就是从 BIO 的任务队列不断取任务的:

// bio.c
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_condvar[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}      

接着我们回到 BIO 线程的主函数 bioProcessBackgroundJobs,我们验证了 BIO 线程执行逻辑,BIO线程通过等待互斥锁和条件变量来判断是否继续读取队列。如前面的注释所说,在执行 REDIS_BIO_CLOSE_FILE 类型的任务时,调用的是 close(fd) 函数。继续阅读代码,发现在执行 REDIS_BIO_AOF_FSYNC 类型的任务时,调用的是函数 aof_fsync:

// bio.c
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    pthread_detach(pthread_self());
    pthread_mutex_lock(&bio_mutex[type]);
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);
        /* Process the job accordingly to its type. */
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);
        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}




我们继续看 aof_fsync 的函数定义,发现 aof_fsync 其实就是 fdatasync 和 fsync :




/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif      

熟悉 Redis 的朋友知道,这是 Redis 2.4 中 BIO线程关于 Redis AOF 持久性的设计:

使用AOF Redis更加持久;

你有不同的fsync策略:完全不fsync,每秒fsync,每个查询fsync。使用fsync的默认策略,每秒的写入性能当然很好(fsync是使用后台线程执行的,并且当没有fsync执行时,主线程将尽力执行写入操作),但是你会损失一秒钟的写入数据。——《Redis Persistence》https://redis.io/topics/persistence

AOF advantages

而为什么fsync需要使用 BIO线程在后台执行,其实就很简单了。因为 Redis 需要保证数据的持久化,数据写入文件时,其实只是写到缓冲区,只有数据刷入磁盘,才能保证数据不会丢失,而 fsync将缓冲区刷入磁盘是一个同步IO操作。所以,在主线程执行缓冲区刷盘的操作,虽然能更好的保证数据的持久化,但是却会阻塞主线程。

最后,为了减少阻塞,Redis 使用 BIO线程处理 fsync。但其实这并不意味着 Redis 不再受 fsync 的影响,实际上如果 fsync 过于缓慢(数据2S以上未刷盘),Redis主线程会不计代价的阻塞执行文件写入(Redis persistence demystified http://oldblog.antirez.com/m/p.php?i=251  #appendfsync everysec)。

​ ​

Redis BIO线程(Redis 4.0+)

从 Redis 4.0 (2017年)开始,又增加了一个新的BIO线程,我们在 bio.h 中发现了新的定义——BIO_LAZY_FREE,这个线程主要用来协助 Redis 异步释放内存。在antirez的《Lazy Redis is better Redis》http://antirez.com/news/93中,我们能了解到为什么要将释放内存放在异步线程中:

(渐进式回收内存)这是一个很好的技巧,效果很好。但是,我们还是必须在一个线程中执行此操作,这仍然让我感到很难过。当有很多逻辑需要处理,并且lazy free也非常频繁时,ops(每秒的操作数)会减少到正常值的65%左右。​

释放不同线程中的对象会更简单:如果有一个线程正忙于仅执行释放操作,则释放应该总是比在数据集中添加新值快。

当然,主线程和lazy free线程之间在调用内存分配器上也存在一些竞争,但是Redis只会花一小部分时间在内存分配上,而将更多的时间花在I/O,命令分派,缓存未命中等等。

对这个特性背景感兴趣的朋友还可以看看这个issue: Lazy free of keys and databases #1748  github.com/redis/re...ues/1748

// bio.h
/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3      

我们回头看,发现在原来的基础上,增加了 BIO_LAZY_FREE 的部分。lazy free 的任务有三种:

  • 释放对象
  • 释放 Redis Database
  • 释放 跳表(skip list)
// bio.c
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;
    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }
    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);
        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);
        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}      

其中释放对象的主要逻辑在 decrRefCount 中:

// lazyfree.c
/* Release objects from the lazyfree thread. It's just decrRefCount()
 * updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1);
}      

按照不同的数据类型,执行不同的内存释放逻辑:

// object.c
void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        switch(o->type) {
        case OBJ_STRING: freeStringObject(o); break;
        case OBJ_LIST: freeListObject(o); break;
        case OBJ_SET: freeSetObject(o); break;
        case OBJ_ZSET: freeZsetObject(o); break;
        case OBJ_HASH: freeHashObject(o); break;
        case OBJ_MODULE: freeModuleObject(o); break;
        default: serverPanic("Unknown object type"); break;
        }
        zfree(o);
    } else {
        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
    }
}      

​ ​ ​

扩展

其他的相关内容就不一一说明了,这里有一个扩展内容,算是 Redis 开发背后的故事。

我参考学习了文章《Lazy Redis is better Redis》http://antirez.com/news/93,发现其实 antirez 在设计 lazy free 时还是比较纠结的。因为 lazy free 的特性涉及到了 Redis 本身的内部特性 —— 共享对象 (sharing objects),lazy free 特性的推进受到了共享对象的影响。这里只说说结论,最后为了实现 lazy free 的特性,antirez 去掉了共享对象的特性。直到现在 (Redis 6.0),共享对象仅在少部分地方出现,我们追踪代码的话,可以发现 robj 结构体的 refcount 目前大部分情况下等于 1。当然还有少部分情况,比如 server.c 中初始化创建整型数字的共享字符串,又或者手动增加计数来降低内存对象的回收速度等等。这就是为什么 Redis 明明去掉了共享对象的设计,但是我们还能看到 refcount 相关的代码,这大概就是历史遗留原因吧(手动狗头)。

// server.c
#define OBJ_SHARED_REFCOUNT INT_MAX
typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;
// server.c
void createSharedObjects(void) {
......
    for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
        shared.integers[j] =
            makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
        shared.integers[j]->encoding = OBJ_ENCODING_INT;
    }
......
}      

Redis 网络IO线程(Redis 6.0+)

从2020年正式发布的 Redis 6.0 开始开始,Redis增加了与客户端IO读写线程,减轻主线程与客户端的网络IO负担。而实际上,这个设想在2015年开发 lazy free 特性的时候就已经出现了。《Lazy Redis is better Redis》http://antirez.com/news/93 #Not just lazy freeing :

既然聚合数据类型的值是完全不共享的,并且客户端输出缓冲区也不包含共享对象,有很多地方可以利用这一点。例如,最终有可能在 Redis 中实现线程化I/O,以便由不同的线程为不同的客户端提供服务。这意味着我们仅在访问数据库时才具有全局锁定,但是客户端读取/写入系统调用,甚至解析客户端发送的指令数据,都可以在不同的线程中进行。这是一种类似 memcached 的设计,我期待去实现和测试。

而且,有可能实现对某一线程中的聚合数据类型执行某些慢速操作,只会导致“几个”键被“阻塞”,而所有其他客户端都可以继续工作。这可以通过与我们当前使用阻塞操作(请参阅blocking.c)非常相似的方式来实现,此外还可以使用哈希表来存储当前正在使用哪些键以及它使用的客户端。因此,如果客户要求使用SMEMBERS之类的东西,就能够仅锁定键,处理创建输出缓冲区的请求,然后再次释放键。如果某个键被阻塞了,则尝试访问同一键的客户端都将被阻塞。

所有这些都需要进行更大幅度的内部修改,但是最重要的是,我们的禁忌要少一些。我们可以用更少的缓存丢失和更少内存占用的聚合数据类型,来弥补对象复制的时间,我们现在可以畅想无共享设计的线程化 Redis ,这是唯一可以轻松战胜我们单线程架构的设计。过去,如果为了实现并发访问,在数据结构和对象中增加一系列互斥锁,始终会被视为一个坏主意。但现在幸运的是,有方法可以两全其美。我们可现在以仍然像过去那样,从主线程继续执行所有快速的操作。而要在性能方面有所收获,需要增加一些复杂性作为代价。

上述是 antirez 在《Lazy Redis is better Redis》的 Not just lazy freeing 部分所分享的内容,理解这个,我们就能知道为何 Redis 要实现 IO 线程化了:

  • IO单线程时,某些键的阻塞操作会阻塞整个线程,而使用多线程,可以实现只有访问相同键的客户端被阻塞。
  • 去掉了共享对象,让IO线程化更加简单,不再需要向数据结构和对象中增加一系列的互斥锁来实现多线程,从而保留了Redis单线程的“传统艺能”。(PS:去掉共享对象,会增加内存的复制,但是也可以带来内存上更紧凑的数据类型,也因为内存上更加连续带来更少的缓存丢失。)

接下来,我们从 redis server.c 中的main()函数开始,看看IO线程是怎么运行的。

​ ​ ​

IO线程的创建

通过 pthread_create 搜索到 initThreadedIO() 函数,然后整理下IO线程的创建过程:

无论是否哨兵模式,Redis都会执行InitServerLast:

int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();
    ......some log......
    readOOMScoreAdj();
    initServer();
    if (background || server.pidfile) createPidFile();
    redisSetProcTitle(argv[0]);
    redisAsciiArt();
    checkTcpBacklogSettings();
    if (!server.sentinel_mode) {
        moduleLoadFromQueue();
        ACLLoadUsersAtStartup();
        InitServerLast();
        loadDataFromDisk();
        ......
    } else {
        InitServerLast();
        sentinelIsRunning();
        ......
    }
    ......
    redisSetCpuAffinity(server.server_cpulist);
    setOOMScoreAdj(-1);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}      

initServer()中,Redis会初始化相关的任务队列,而在InitServerLast中,才会初始化网络IO相关的线程资源,因为Redis的网络IO多线程是可以配置的。Redis实现了网络IO多线程,但是网络IO的逻辑,既可以在ThreadedIO线程执行,也可以在主线程执行,给用户提供了选择:

void initServer(void) {
    ......
    /* Initialization after setting defaults from the config system. */
    server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
    server.hz = server.config_hz;
    server.pid = getpid();
    server.in_fork_child = CHILD_TYPE_NONE;
    server.main_thread_id = pthread_self();
    server.current_client = NULL; // 当前正在执行命令的客户端
    server.errors = raxNew();
    server.fixed_time_expire = 0;
    server.clients = listCreate(); // 活跃的客户端列表
    server.clients_index = raxNew(); // 按照 client_id 索引的活跃的客户端字典
    server.clients_to_close = listCreate(); // 需要异步关闭的客户端列表
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.clients_pending_write = listCreate(); // 等待写或者安装handler的客户端列表
    server.clients_pending_read = listCreate(); // 等待读socket缓冲区的客户端列表
    server.clients_timeout_table = raxNew();
    server.replication_allowed = 1;
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate(); // 下一个循环之前,要取消阻塞的客户端列表
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.client_pause_type = 0;
    server.paused_clients = listCreate();
    server.events_processed_while_blocked = 0;
    server.system_memory_size = zmalloc_get_memory_size();
    server.blocked_last_cron = 0;
    server.blocking_op_nesting = 0;
    ......
}
在 InitServerLast()中 ,除了 initThreadedIO (Redis网络IO线程),我们还能看到bioInit(background I/O 初始化),两个模块使用了不同的资源:
/* Some steps in server initialization need to be done last (after modules
 * are loaded).
 * Specifically, creation of threads due to a race bug in ld.so, in which
 * Thread Local Storage initialization collides with dlopen call.
 * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}      

接下来我们来看看 Redis 源码的 networking.c 文件:io_threads 线程池,io_threads_mutex 互斥锁,io_threads_pending IO线程客户端等待数,io_threads_list 每个IO线程的客户端列表。

/* ==========================================================================
 * Threaded I/O
 * ========================================================================== */
#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_OP_READ 0
#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_op;      /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
 * used. We spawn io_threads_num-1 threads, since one is the main thread
 * itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];      

然后就是创建线程的initThreadedIO 函数。初始化的时候IO线程处于未激活状态,等待后续激活,如果 Redis 配置的 io_threads_num 为 1,代表IO使用主线程单线程处理,如果线程数配置超过最大值 IO_THREADS_MAX_NUM (128) 则异常退出,最后,创建的线程都将被锁上直到被唤醒:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */
        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}      

​ ​ ​

IO线程的工作流程

Redis 在启动时,初始化函数 initServer 将 beforeSleep 和 afterSleep 注册为事件循环休眠前和休眠后的handler :

void initServer(void) {
......
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
......
    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
......
}      

事件循环执行 beforeSleep 时,会调用handleClientsWithPendingReadsUsingThreads 和handleClientsWithPendingWritesUsingThreads,分别是IO读写任务的分配逻辑。特殊情况下,在AOF和RDB数据恢复(从文件读取数据到内存)的时候,Redis会通过processEventsWhileBlocked调用 beforeSleep,这个时候,只会执行handleClientsWithPendingReadsUsingThreads ,这个时候IO写是同步的:

/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors.
 *
 * Note: This function is (currently) called from two functions:
 * 1. aeMain - The main server loop
 * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
 *
 * If it was called from processEventsWhileBlocked we don't want
 * to perform all actions (For example, we don't want to expire
 * keys), but we do need to perform some actions.
 *
 * The most important is freeClientsInAsyncFreeQueue but we also
 * call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
......
    /* Just call a subset of vital functions in case we are re-entering
     * the event loop from processEventsWhileBlocked(). Note that in this
     * case we keep track of the number of events we are processing, since
     * processEventsWhileBlocked() wants to stop ASAP if there are no longer
     * events to handle. */
    if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;
        processed += handleClientsWithPendingReadsUsingThreads();
        processed += tlsProcessPendingData();
        processed += handleClientsWithPendingWrites();
        processed += freeClientsInAsyncFreeQueue();
        server.events_processed_while_blocked += processed;
        return;
    }
......
    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();
......
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    /* Close clients that need to be closed asynchronous */
    freeClientsInAsyncFreeQueue();
......
    /* Before we are going to sleep, let the threads access the dataset by
     * releasing the GIL. Redis main thread will not touch anything at this
     * time. */
    if (moduleCount()) moduleReleaseGIL();
    /* Do NOT add anything below moduleReleaseGIL !!! */
}      

在handleClientsWithPendingReadsUsingThreads函数中,Redis会执行IO读的任务分配逻辑,当Redis配置了IO线程的读取和解析(io_threads_do_reads),可读的handler会将普通的客户端放到客户端队列中处理,而不是同步处理。这个函数将队列分配给IO线程处理,累积读取buffer中的数据:

  • IO线程在初始化时未激活,Redis配置了用IO线程读取和解析数据(io_threads_do_reads),才会继续执行;
  • 读取待处理的客户端列表 clients_pending_read,将任务按照取模平均分配到不同线程的任务队列io_threads_list[target_id];
  • 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程;
  • 依然在主线程处理一些客户端请求;
  • 如果客户端等待写入,并且响应的buffer还有待写数据,或有待发送给客户端的响应对象,则给客户端的连接安装写handler;
/* When threaded I/O is also enabled for the reading + parsing side, the
 * readable handler will just put normal clients into a queue of clients to
 * process (instead of serving them synchronously). This function runs
 * the queue using the I/O threads, and process them in order to accumulate
 * the reads in the buffers, and also parse the first command available
 * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
    // IO线程在初始化时未激活,Redis配置了用IO线程读取和解析数据(io_threads_do_reads),才会继续执行
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;
    /* Distribute the clients across N different lists. */
    // 读取待处理的客户端列表 clients_pending_read,
    // 将任务按照取模平均分配到不同线程的任务队列io_threads_list[target_id]
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    // 依然在主线程处理一些客户端请求
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }
        processInputBuffer(c);
        /* We may have pending replies if a thread readQueryFromClient() produced
         * replies and did not install a write handler (it can't).
         */
        // 如果客户端等待写入,
        // 并且响应的buffer还有待写数据,或有待发送给客户端的响应对象,
        // 则给客户端的连接安装写handler
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
    /* Update processed count on server */
    server.stat_io_reads_processed += processed;
    return processed;
}      

在 handleClientsWithPendingWritesUsingThreads 中,Redis会执行IO线程的启动,IO线程写任务的分配等逻辑:

  • 如果没有开启多线程,或者等待的客户端数量小于线程数的两倍,则执行同步代码;
  • 如果 IO 线程没有激活,则激活(在initThreadedIO函数创建线程时处于未激活状态);
  • 如果遇到需要关闭的客户端(CLIENT_CLOSE_ASAP),则将其从待处理的客户端列表里删除;
  • 读取待处理的客户端列表 clients_pending_write ,将任务按照取模平均分配到不同线程的任务队列io_threads_list[target_id];
  • 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程;
  • 依然在主线程处理一些客户端请求;
  • 如果响应的buffer还有待写数据,或者还有待发送给客户端的响应对象,则给客户端的连接安装写handler;
  • 最后调用freeClientAsync 将待释放的客户端放入clients_to_close队列,等待beforeSleep执行freeClientsInAsyncFreeQueue时实现异步释放客户端;
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    // 如果没有开启多线程,或者等待的客户端数量小于线程数的两倍,则执行同步代码
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    /* Start threads if needed. */
    // 如果 IO 线程没有激活,则激活(在initThreadedIO函数创建线程时处于未激活状态)
    if (!server.io_threads_active) startThreadedIO();
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        // 如果遇到需要关闭的客户端(CLIENT_CLOSE_ASAP),则将其从待处理的客户端列表里删除
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 通过setIOPendingCount给对应的IO线程设置条件变量,激活IO线程
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    // 依然在主线程处理一些客户端请求
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        /* Install the write handler if there are pending writes in some
         * of the clients. */
        // 如果响应的buffer还有待写数据,或者还有待发送给客户端的响应对象,
        // 则给客户端的连接安装写handler
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            // 将待释放的客户端放入clients_to_close队列,
            // 等待beforeSleep执行freeClientsInAsyncFreeQueue时实现异步释放客户端
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    /* Update processed count on server */
    server.stat_io_writes_processed += processed;
    return processed;
}      

​ ​ ​

IO线程的主逻辑

在 IOThreadMain 函数中,是 Redis IO线程的主逻辑。

我们发现IO线程在创建后,会通过redisSetCpuAffinity函数和server_cpulist参数,来设置线程的CPU的亲和性,合理配置线程的CPU亲和性,能够一定程度上提升性能。

之后,IO线程会根据条件变量 io_threads_pending[id] 判断是否有等待的IO需要处理,然后从 io_threads_list[myid] 中获取分给自己的 client,再根据 io_thread_op 来判断,这个时候需要执行读写IO中的哪一个, readQueryFromClient 还是 writeToClient :

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(io_threads_pending[id] != 0);
        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
        if (tio_debug) printf("[%ld] Done\n", id);
    }
}      

总结

从Redis VM开始,到Redis BIO,再到最后的IO多线程,我们能看到 Redis 正在逐渐的向线程化的方向发展。特别是在实现Lazy Free之后(Redis BIO),antirez似乎尝到了多线程的好处,在保证db操作单线程的情况下,让Redis发挥CPU一部分多核多线程的实力。我们不难发现,Redis 的多线程不过是顺势而为罢了,如果单线程没有瓶颈,就不会产生使用多线程的Redis。再结合现状来看,毕竟时代变了,从多年前的单核服务器,到后来的双核,四核服务器,再到现在动辄八核,十六核的服务器:单线程模型固然简单,代码清晰,但是在摩尔定律失效,多核多线程的时代洪流下,有谁能够拒绝多线程的好处呢?