redis的main函數在redis.c檔案的最後(3933行)。一個Redis伺服器從啟動到能夠接受用戶端的指令請求,需要經過一系列的初始化和設定過程,比如初始化伺服器狀态,接受使用者指定的伺服器配置,建立相應的資料結構和網絡連接配接等等。通過分析main函數我們可以了解這些流程。下面我們就簡要分析一下Redis伺服器啟動流程。
int main(int argc, char **argv) {
struct timeval tv;
/* We need to initialize our libraries, and the server configuration. */
// 初始化庫
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
zmalloc_enable_thread_safeness();
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
// 檢查伺服器是否以 Sentinel 模式啟動
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 初始化伺服器
initServerConfig();
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
// 如果伺服器以 Sentinel 模式啟動,那麼進行 Sentinel 功能相關的初始化
// 并為要監視的主伺服器建立一些相應的資料結構
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 檢查使用者是否指定了配置檔案,或者配置選項
if (argc >= 2) {
int j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;
/* Handle special options --help and --version */
// 處理特殊選項 -h 、-v 和 --test-memory
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* First argument is the config file name? */
// 如果第一個參數(argv[1])不是以 "--" 開頭
// 那麼它應該是一個配置檔案
if (argv[j][0] != '-' || argv[j][1] != '-')
configfile = argv[j++];
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
// 對使用者給定的其餘選項進行分析,并将分析所得的字元串追加稍後載入的配置檔案的内容之後
// 比如 --port 6380 會被分析為 "port 6380\n"
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
if (configfile) server.configfile = getAbsolutePath(configfile);
// 重置儲存條件
resetServerSaveParams();
// 載入配置檔案, options 是前面分析出的給定選項
loadServerConfig(configfile,options);
sdsfree(options);
// 擷取配置檔案的絕對路徑
if (configfile) server.configfile = getAbsolutePath(configfile);
} else {
redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
// 将伺服器設定為守護程序
if (server.daemonize) daemonize();
// 建立并初始化伺服器資料結構
initServer();
// 如果伺服器是守護程序,那麼建立 PID 檔案
if (server.daemonize) createPidFile();
// 為伺服器程序設定名字
redisSetProcTitle(argv[0]);
// 列印 ASCII LOGO
redisAsciiArt();
// 如果伺服器不是運作在 SENTINEL 模式,那麼執行以下代碼
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
// 列印問候語
redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
// 列印記憶體警告
linuxOvercommitMemoryWarning();
#endif
// 從 AOF 檔案或者 RDB 檔案中載入資料
loadDataFromDisk();
// 啟動叢集?
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == REDIS_ERR) {
redisLog(REDIS_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
// 列印 TCP 端口
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
// 列印本地套接字端口
if (server.sofd > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
sentinelIsRunning();
}
/* Warning the user about suspicious maxmemory setting. */
// 檢查不正常的 maxmemory 配置
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
// 運作事件處理器,一直到伺服器關閉為止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 伺服器關閉,停止事件循環
aeDeleteEventLoop(server.el);
return 0;
}
初始化
初始化庫:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CN4kTO4ATMxM2Y2IzY0YGZyYzXwMjNwATM2IzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
void spt_init(int argc,char *argv[])
定義在setproctitle.c中的第148行(這個源程式檔案很多工程是庫檔案,Linux/Darwin setproctitle)。通過這個宏定義可以看出是考慮移植性的,檢查是否可以使用setproctitle函數,BSD系統本身支援該函數,對于Linux和osx我們提供相應的實作版本。這段定義在config.h中。
程式的環境由一組格式為“名字=值”的字元串組成。程式可以通過environ變量直接通路這個字元串數組。詳細可參考Linux程式設計第4章Linux環境第122頁。
void spt_init(int argc, char *argv[]) {
char **envp = environ;
char *base, *end, *nul, *tmp;
int i, error;
if (!(base = argv[0]))
return;
nul = &base[strlen(base)];
end = nul + 1;
for (i = 0; i < argc || (i >= argc && argv[i]); i++) {
if (!argv[i] || argv[i] < end)
continue;
end = argv[i] + strlen(argv[i]) + 1;
}
for (i = 0; envp[i]; i++) {
if (envp[i] < end)
continue;
end = envp[i] + strlen(envp[i]) + 1;
}
if (!(SPT.arg0 = strdup(argv[0])))
goto syerr;
#if __GLIBC__
if (!(tmp = strdup(program_invocation_name)))
goto syerr;
program_invocation_name = tmp;
if (!(tmp = strdup(program_invocation_short_name)))
goto syerr;
program_invocation_short_name = tmp;
#elif __APPLE__
if (!(tmp = strdup(getprogname())))
goto syerr;
setprogname(tmp);
#endif
if ((error = spt_copyenv(envp)))
goto error;
if ((error = spt_copyargs(argc, argv)))
goto error;
SPT.nul = nul;
SPT.base = base;
SPT.end = end;
return;
syerr:
error = errno;
error:
SPT.error = error;
} /* spt_init() */
char *setlocale(int category, const char *locale) 用于設定或讀取地域化資訊。
zmalloc_enable_thread_safeness(); //使能zmalloc線程安全模式
zmalloc_set_oom_handler(redisOutOfMemoryHandler); //設定記憶體溢出句柄
srand(time(NULL)^getpid()); 設定srand函數的種子
設定字典哈希函數種子
初始化伺服器
第一步檢查伺服器是否以Sentinel模式啟動,checkForSentinelMode函數來檢查指令行上設定的sentinel模式,最後設定server.sentinel_mode标志。
初始化伺服器是建立一個struct redisServer類型的執行個體變量server作為伺服器的狀态,并為結構中的各個屬性設定預設值。初始化server變量的工作由redis.c/initServerConfig函數完成。redis.c/initServerConfig函數設定主要工作:
- 設定伺服器的運作ID
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
- 設定預設配置檔案路徑為NULL
- 設定伺服器的預設運作頻率REDIS_DEFAULT_HZ
- 設定伺服器的運作架構32位還是64位
- 設定預設伺服器端口号、tcp_baklog等相關項
- 設定伺服器log檔案路徑及syslog配置
- 設定伺服器的預設RDB持久化和AOF持久化條件及一系列伺服器設定
- 初始化LRU時間時鐘
- 初始化并設定儲存條件
- 初始化複制相關的狀态
- 初始化 PSYNC 指令所使用的 backlog
- 設定用戶端的輸出緩沖區限制
- 初始化浮點常量
- 初始化指令表,在這裡初始化是因為接下來讀取 .conf 檔案時可能會用到這些指令
- 初始化慢查詢日志
- 初始化調試項
可以看到clientBufferLimitsConfig的成員表示的是用戶端緩存區的硬限制、軟限制及軟限制時限。
指令表是一個字典,字典的鍵是一個個指令名字,比如set、get、del等等;而字典的值則是一個個redisCommand結構,每個redisCommand結構記錄了一個Redis指令的實作資訊。從初始化中可以看出,為server的指令表配置設定了兩個字典,一個是受到rename配置選項作用的指令表,一個是無rename配置指令作用的指令表。
該字典使用的哈希函數,鍵比較函數以及鍵銷毀函數
其中最重要的是指令的名字、實作函數以及參數個數。實作函數的傳回值的宏定義為
typedef void redisCommandProc(redisClient *c)
populateCommandTable();
用于根據redis.c檔案頂部的指令清單,建立指令表
/* Populates the Redis Command Table starting from the hard coded list
* we have on top of redis.c file.
*
* 根據 redis.c 檔案頂部的指令清單,建立指令表
*/
void populateCommandTable(void) {
int j;
// 指令的數量
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
// 指定指令
struct redisCommand *c = redisCommandTable+j;
// 取出字元串 FLAG
char *f = c->sflags;
int retval1, retval2;
// 根據字元串 FLAG 生成實際 FLAG
while(*f != '\0') {
switch(*f) {
case 'w': c->flags |= REDIS_CMD_WRITE; break;
case 'r': c->flags |= REDIS_CMD_READONLY; break;
case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
case 'a': c->flags |= REDIS_CMD_ADMIN; break;
case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break;
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= REDIS_CMD_LOADING; break;
case 't': c->flags |= REDIS_CMD_STALE; break;
case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
case 'k': c->flags |= REDIS_CMD_ASKING; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
}
// 将指令關聯到指令表
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf.
*
* 将指令也關聯到原始指令表
*
* 原始指令表不會受 redis.conf 中指令改名的影響
*/
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
redisAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
從redisCommandTable數組取出資料,寫入指令字典,根據sflags的屬性辨別分析得出二進制辨別,并将其置入flags中。通過dictAdd将指令關聯到指令表和原始指令表。
初始化常用指令的快捷連結
載入配置選項
伺服器在用initServerConfig函數初始化完server變量之後,就會開始載入使用者給定的配置參數和配置檔案,并根據使用者設定的配置,對server變量相關屬性的值進行修改。如果使用者沒有為屬性的相應選項設定新的值,那麼伺服器就沿用之前initServerConfig函數為屬性設定的預設值。
- 檢查使用者是否指定了配置檔案,或者配置選項
- 對使用者給定的其餘選項進行分析,并将分析所得的字元串追加稍後載入的配置檔案的内容之後
- 将伺服器設定為守護程序,如果伺服器是守護程序,那麼建立 PID 檔案,為伺服器程序設定名字
初始化伺服器資料結構
在之前執行initServerConfig函數初始化server狀态時,程式隻建立了指令表一個資料結構,不過除了指令表之外,伺服器狀态還包含其他資料結構,比如:
- server.clients連結清單,這個連結清單記錄了所有與伺服器相連的用戶端的狀态結構,連結清單的每個節點都包含了一個redisClient結構執行個體
- server.ds數組,數組中包含了伺服器的所有資料庫
- 用于儲存頻道訂閱資訊的server.pubsub_channels字典,以及用于儲存模式訂閱資訊的server.pusub_patterns連結清單
- 用于執行Lua腳本的Lua環境server.lua
-
用于儲存慢查詢日志的server.slowlog屬性
當初始化伺服器進行到這一步,伺服器将調用initServer函數,為以上提到的資料結構配置設定記憶體,并在有需要時,為這些資料結構設定或者關聯初始值。
- 分析initServer函數
- 為伺服器設定程序信号處理器:SIGHUP、SIGPIPE設定為SIG_IGN忽略。setupSignalHandlers是通過sigaction函數為信号設定信号處理函數。
- 這個信号處理器負責在伺服器接到SIGTERM信号時,打開伺服器狀态的shutdown_asap辨別。每次serverCron函數運作時,程式都會對伺服器狀态的shutdown_asap屬性進行檢查,并根據屬性的值決定是否關閉伺服器。
- 設定syslog
- 初始化并建立資料結構
- 建立共享對象:這些對象包含Redis伺服器經常用到的一些值,比如包含OK回複的字元串對象,包含ERR回複的字元串對象,包含整數1到10000的字元串對象等等,伺服器通過重用這些共享對象來避免反複建立相同的對象。
- 初始化事件處理器狀态
/*
* 初始化事件處理器狀态
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 建立事件狀态結構
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 初始化檔案事件結構和已就緒檔案事件結構數組
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 設定數組大小
eventLoop->setsize = setsize;
// 初始化執行最近一次執行時間
eventLoop->lastTime = time(NULL);
// 初始化時間事件結構
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化監聽事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
// 傳回事件循環
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
- 打開TCP監聽端口,用于等待用戶端的指令請求,打開UNIX本地端口
- 這裡建立時間事件serverCron
- 這裡建立TCP連接配接關聯連接配接應答處理器用于接收并應答用戶端的connect()調用
- 建立并初始化數組結構
- 建立PUBSUB相關結構
- 為serverCron函數建立時間事件,等待伺服器正式運作時執行serverCron函數
- 如果AOF持久化功能已經打開,那麼打開現有的AOF檔案,如果AOF檔案不存在,那麼建立并打開一個新的AOF檔案,為AOF寫入做好準備。
- 初始化伺服器的背景I/O子產品(bio),為将來的I/O操作做好準備
還原資料庫狀态
在完成了對伺服器狀态server變量的初始化之後,伺服器需要載入RDB檔案或者AOF檔案,并根據檔案記錄的内容來還原伺服器的資料庫狀态。根據伺服器是否用了AOF持久化功能,伺服器載入資料時使用的目标檔案會有所不同:
- 如果伺服器啟用了AOF持久化功能,那麼伺服器使用AOF檔案來還原資料庫狀态。
- 相反地,如果伺服器沒有啟用AOF持久化功能,那麼伺服器使用RDB檔案來還原資料庫狀态。
運作事件處理器
指令請求執行過程
接收用戶端資料和讀取用戶端的查詢緩沖區
readQueryFromClient函數讀取用戶端的查詢緩沖區内容
從查詢緩存重讀内容,建立參數,并執行指令函數會執行到緩存中的所有内容都被處理完為止。當用戶端與伺服器之間的連接配接套接字因為用戶端的寫入而變得可讀時,伺服器将調用指令請求處理器來執行一下操作:
讀取套接字中協定格式的指令請求,并将其儲存到用戶端狀态的輸入緩沖區裡面。
查找指令實作和執行預備操作
這個函數會調用processCommand,對輸入緩沖區中的指令請求進行分析,提取出指令請求中包含的指令參數,以及指令參數的個數,然後分别将參數和參數個數儲存到用戶端的argv屬性和argc屬性裡面。指令執行器要做的第一件事就是根據用戶端狀态的argv[0]參數,在指令表中查找參數所指定的指令,并将找到的指令儲存到用戶端狀态的cmd屬性裡面。
- 特别處理quit指令
- 檢查用戶端狀态的cmd指針是否指向NULL,如果是的話,那麼說明使用者輸入的指令名字找不到相應的指令實作,伺服器不再執行後續步驟,并向用戶端傳回一個錯誤。
- 根據用戶端cmd屬性指向的redisCommand結構的arity屬性,檢查指令請求所給定的參數個數是否正确,當參數個數不正确時,不再執行後續步驟,直接向用戶端傳回一個錯誤。
- 檢查用戶端是否已經通過身份認證,未通過身份驗證的用戶端隻能執行AUTH指令,如果未通過身份驗證的用戶端試圖執行除AUTH指令之外的其他指令,那麼伺服器将向用戶端傳回一個錯誤。
- 如果開啟了叢集模式,那麼在這裡進行轉向操作。不過,如果有以下情況出現,那麼節點不進行轉向:指令的發送者是本節點的主節點,指令沒有key參數。
- 如果伺服器打開了maxmemory功能,那麼在執行指令之前,先檢查伺服器的記憶體占用情況,并在有需要時進行記憶體回收,進而使得接下來的指令可以順利執行。如果記憶體回收失敗,那麼不再執行後續步驟,向用戶端傳回一個錯誤。
- 如果伺服器上一次執行BGSAVE指令時出錯,并且伺服器打開了stop-writes-on-bgsave-error功能,而且伺服器即将要執行的指令是一個寫指令,那麼伺服器将拒絕執行這個指令,并向用戶端傳回一個錯誤。
- 如果這個伺服器是一個隻讀 slave 的話,那麼拒絕執行寫指令。
- 如果用戶端目前正在用SUBSCRIBE指令訂閱頻道,或者正在用PSUBSCRIBE指令訂閱模式,那麼伺服器隻會執行用戶端發來的SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE四個指令,其他指令都會被伺服器拒絕。
- 如果伺服器正在進行資料載入,那麼用戶端發送的指令必須帶有l辨別(比如INFO、SHUTDOWN、PUBLISH等)才會被伺服器執行,其他指令都會被伺服器拒絕。
- 如果伺服器因為執行Lua腳本而逾時并進入阻塞狀态,那麼伺服器隻會執行用戶端發來的SHUTDOWN nosave指令和SCRIPT KILL指令,其他指令都會被伺服器拒絕。
- 如果用戶端正在執行事務,那麼伺服器隻會執行用戶端發來的EXEC、DISCARD、MULTI、WATCH四個指令,其他指令都會被放進事務隊列中。
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* 這個函數執行時,我們已經讀入了一個完整的指令到用戶端,
* 這個函數負責執行這個指令,
* 或者伺服器準備從用戶端中進行一次讀取。
*
* If 1 is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroyed (i.e. after QUIT).
*
* 如果這個函數傳回 1 ,那麼表示用戶端在執行指令之後仍然存在,
* 調用者可以繼續執行其他操作。
* 否則,如果這個函數傳回 0 ,那麼表示用戶端已經被銷毀。
*/
int processCommand(redisClient *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// 特别處理 quit 指令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= REDIS_CLOSE_AFTER_REPLY;
return REDIS_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 查找指令,并進行指令合法性檢查,以及指令參數個數檢查
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 沒找到指定的指令
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 參數個數錯誤
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
/* Check if the user is authenticated */
// 檢查認證資訊
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return REDIS_OK;
}
/* If cluster is enabled perform the cluster redirection here.
*
* 如果開啟了叢集模式,那麼在這裡進行轉向操作。
*
* However we don't perform the redirection if:
*
* 不過,如果有以下情況出現,那麼節點不進行轉向:
*
* 1) The sender of this command is our master.
* 指令的發送者是本節點的主節點
*
* 2) The command has no key arguments.
* 指令沒有 key 參數
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
// 叢集已下線
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;
// 叢集運作正常
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能執行多鍵處理指令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
// 指令針對的槽和鍵不是本節點處理的,進行轉向
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// 例如 -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
return REDIS_OK;
}
// 如果執行到這裡,說明鍵 key 所在的槽由本節點處理
// 或者用戶端執行的是無參數指令
}
}
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
// 如果設定了最大記憶體,那麼檢查記憶體是否超過限制,并做相應的操作
if (server.maxmemory) {
// 如果記憶體已超過限制,那麼嘗試通過删除過期鍵來釋放記憶體
int retval = freeMemoryIfNeeded();
// 如果即将要執行的指令可能占用大量記憶體(REDIS_CMD_DENYOOM)
// 并且前面的記憶體釋放失敗的話
// 那麼向用戶端傳回記憶體錯誤
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
}
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
// 如果這是一個主伺服器,并且這個伺服器之前執行 BGSAVE 時發生了錯誤
// 那麼不執行寫指令
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR) ||
server.aof_last_write_status == REDIS_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & REDIS_CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == REDIS_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return REDIS_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
// 如果伺服器沒有足夠多的狀态良好伺服器
// 并且 min-slaves-to-write 選項已打開
if (server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & REDIS_CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return REDIS_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
// 如果這個伺服器是一個隻讀 slave 的話,那麼拒絕執行寫指令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
c->cmd->flags & REDIS_CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return REDIS_OK;
}
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
// 在訂閱于釋出模式的上下文中,隻能執行訂閱和退訂相關的指令
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK;
}
/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
/* Loading DB? Return an error if the command has not the
* REDIS_CMD_LOADING flag. */
// 如果伺服器正在載入資料到資料庫,那麼隻執行帶有 REDIS_CMD_LOADING
// 辨別的指令,否則将出錯
if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
/* Lua script too slow? Only allow a limited number of commands. */
// Lua 腳本逾時,隻允許執行限定的操作,比如 SHUTDOWN 和 SCRIPT KILL
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 在事務上下文中
// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 指令之外
// 其他所有指令都會被入隊到事務隊列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 執行指令
call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
// 處理那些解除了阻塞的鍵
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
調用指令執行器,執行用戶端指定的指令
call指令會調用redis.c的2441行的
c->cmd->proc(c)
,被調用的指令實作函數會執行指定的操作,并産生相應的指令回複,這些回複會被儲存在用戶端狀态的輸出緩沖區裡面,之後實作函數還會為用戶端的套接字關聯指令回複處理器,這個處理器負責将指令回複傳回給用戶端。當用戶端套接字變為可寫狀态時,伺服器就會執行指令回複處理器,将儲存在用戶端輸出緩沖區中的指令回複發送給用戶端。
執行後續工作
- 如果伺服器開啟了慢查詢日志功能,那麼慢查詢日志子產品會檢查是否需要為剛剛執行完的指令請求添加一條新的慢查詢日志。
- 根據剛剛執行指令所耗費的時長,更新被執行指令的redisCommand結構的milliseconds屬性,并将指令的redisCommand結構的calls計數器的值增一。
- 如果伺服器開啟了AOF持久化功能,那麼AOF持久化子產品會将剛剛執行的指令請求寫入到AOF緩沖區裡面。
- 如果有其他從伺服器正在複制目前這個伺服器,那麼伺服器會将剛剛執行的指令傳播給所有從伺服器。