memcached是一款非常普及的伺服器端緩存軟體,memcached主要是基于Libevent庫進行開發的。Memcached分析
1. 網絡模型流程分析
Memcached主要是基于Libevent的事件庫來實作網絡線程模型的。我們先需要下載下傳memcached的源碼包,上面我們已經給出了源碼包下載下傳位址。
Memcached的網絡線程模型主要涉及兩個主要檔案:memcached.c 和thread.c檔案。
我們這邊主要分析tcp的模型。memcached也支援udp。
流程
1. memcached首先在主線程中會建立main_base,memcached的主線程的主要工作就是監聽和接收listen和accpet新進入的連接配接。
2. 當memcached啟動的時候會初始化N個worker thread工作線程,每個工作線程都會有自己的LIBEVENT_THREAD資料結構來存儲線程的資訊(線程基本資訊、線程隊列、pipe資訊)。worker thread工作線程和main thread主線程之間主要通過pipe來進行通信。
3. 當使用者有連接配接進來的時候,main thread主線程會通過求餘的方式選擇一個worker thread工作線程。
4. main thread會将目前使用者的連接配接資訊放入一個CQ_ITEM,并且将CQ_ITEM放入這個線程的conn_queue處理隊列,然後主線程會通過寫入pipe的方式來通知worker thread工作線程。
5. 當工作線程得到主線程main thread的通知後,就會去自己的conn_queue隊列中取得一條連接配接資訊進行處理,建立libevent的socket讀寫事件。
6. 工作線程會監聽使用者的socket,如果使用者有消息傳遞過來,則會進行消息解析和處理,傳回相應的結果。
流程圖
資料結構:
1. CQ_ITEM:主要用于存儲使用者socket連接配接的基本資訊。
主線程會将使用者的socket連接配接資訊封裝成CQ_ITEM,并放入工作線程的處理隊列中。工作線程得到主線程的pipe通知後,就會将隊列中的ITEM取出來,建立libevent的socket讀事件。
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd; //socket的fd
enum conn_states init_state; //事件類型
int event_flags; //libevent的flags
int read_buffer_size; //讀取的buffer的size
enum network_transport transport;
CQ_ITEM *next; //下一個item的位址
};
2. CQ:每個線程的處理隊列結構。
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};
3. LIBEVENT_THREAD:每個工作線程的資料結構。
每一個工作線程都有有這麼一個自己的資料結構,主要存儲線程資訊、處理隊列、pipe資訊等。
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
2. main啟動入口
我們需要找到memcached.c中的main()方法。下面的代碼中隻列出了我們需要的重要部分。
int main (int argc, char **argv) {
//...省去一部分代碼
/* initialize main thread libevent instance */
//初始化一個event_base
main_base = event_init();
/* initialize other stuff */
stats_init();
assoc_init(settings.hashpower_init);
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate);
/*
* ignore SIGPIPE signals; we can use errno == EPIPE if we
* need that information
*/
if (sigignore(SIGPIPE) == -1) {
perror("failed to ignore SIGPIPE; sigaction");
exit(EX_OSERR);
}
/* start up worker threads if MT mode */
//這邊非常重要,這個方法主要用來建立工作線程,預設會建立8個工作線程
thread_init(settings.num_threads, main_base);
if (start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
if (settings.slab_reassign &&
start_slab_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
/* Run regardless of initializing it later */
init_lru_crawler();
/* initialise clock event */
clock_handler(0, 0, 0);
/* create unix mode sockets after dropping privileges */
if (settings.socketpath != NULL) {
errno = 0;
if (server_socket_unix(settings.socketpath,settings.access)) {
vperror("failed to listen on UNIX socket: %s", settings.socketpath);
exit(EX_OSERR);
}
}
/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;
if (portnumber_filename != NULL) {
snprintf(temp_portnumber_filename,
sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);
portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL) {
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}
errno = 0;
//這邊的server_sockets方法主要是socket的bind、listen、accept等操作
//主線程主要用于接收用戶端的socket連接配接,并且将連接配接交給工作線程接管。
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
}
/* enter the event loop */
//這邊開始進行主線程的事件循環
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
//...省去一部分代碼
}
主線程中主要是通過thread_init方法去建立N個工作線程:
thread_init(settings.num_threads, main_base);
通過server_sockets方法去建立socket server:
errno = 0;
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
3. worker thread工作線程源碼分析
我們在thread.c檔案中找到thread_init這個方法:
void thread_init(int nthreads, struct event_base *main_base) {
//...省了一部分代碼
//這邊通過循環的方式建立nthreads個線程
//nthreads應該是可以設定的
for (i = 0; i < nthreads; i++) {
int fds[2];
//這邊會建立pipe,主要用于主線程和工作線程之間的通信
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//threads是工作線程的基本結構:LIBEVENT_THREAD
//将pipe接收端和寫入端都放到工作線程的結構體中
threads[i].notify_receive_fd = fds[0]; //接收端
threads[i].notify_send_fd = fds[1]; //寫入端
//這個方法非常重要,主要是建立每個線程自己的libevent的event_base
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* Create threads after we've done all the libevent setup. */
//這裡是循環建立線程
//線程建立的回調函數是worker_libevent
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
setup_thread方法:
/*
* Set up a thread's information.
*/
static void setup_thread(LIBEVENT_THREAD *me) {
//建立一個event_base
//根據libevent的使用文檔,我們可以知道一般情況下每個獨立的線程都應該有自己獨立的event_base
me->base = event_init();
if (! me->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}
/* Listen for notifications from other threads */
//這邊非常重要,這邊主要建立pipe的讀事件EV_READ的監聽
//當pipe中有寫入事件的時候,libevent就會回調thread_libevent_process方法
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
//添加事件操作
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
//初始化一個工作隊列
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);
//初始化線程鎖
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
perror("Failed to initialize mutex");
exit(EXIT_FAILURE);
}
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
NULL, NULL);
if (me->suffix_cache == NULL) {
fprintf(stderr, "Failed to create suffix cache\n");
exit(EXIT_FAILURE);
}
}
create_worker方法:
/*
* Creates a worker thread.
*/
//這個方法是真正的建立工作線程
static void create_worker(void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
//這邊真正的建立線程
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
worker_libevent方法:
/*
* Worker thread: main event loop
*/
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
/* Any per-thread setup can happen here; thread_init() will block until
* all threads have finished initializing.
*/
/* set an indexable thread-specific memory item for the lock type.
* this could be unnecessary if we pass the conn *c struct through
* all item_lock calls...
*/
me->item_lock_type = ITEM_LOCK_GRANULAR;
pthread_setspecific(item_lock_type_key, &me->item_lock_type);
register_thread_initialized();
//這個方法主要是開啟事件的循環
//每個線程中都會有自己獨立的event_base和事件的循環機制
//memcache的每個工作線程都會獨立處理自己接管的連接配接
event_base_loop(me->base, 0);
return NULL;
}
thread_libevent_process方法:
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
//回調函數中回去讀取pipe中的資訊
//主線程中如果有新的連接配接,會向其中一個線程的pipe中寫入1
//這邊讀取pipe中的資料,如果為1,則說明從pipe中擷取的資料是正确的
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0]) {
case 'c':
//從工作線程的隊列中擷取一個CQ_ITEM連接配接資訊
item = cq_pop(me->new_conn_queue);
//如果item不為空,則需要進行連接配接的接管
if (NULL != item) {
//conn_new這個方法非常重要,主要是建立socket的讀寫等監聽事件。
//init_state 為初始化的類型,主要在drive_machine中通過這個狀态類判斷處理類型
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
/* we were told to flip the lock type and report in */
case 'l':
me->item_lock_type = ITEM_LOCK_GRANULAR;
register_thread_initialized();
break;
case 'g':
me->item_lock_type = ITEM_LOCK_GLOBAL;
register_thread_initialized();
break;
}
}
conn_new方法(主要看兩行):
//我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
//主線程主要是監聽使用者的socket連接配接事件;工作線程主要監聽socket的讀寫事件
//當使用者socket的連接配接有資料傳遞過來的時候,就會調用event_handler這個回調函數
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
//将事件添加到libevent的loop循環中
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
event_handler方法:
void event_handler(const int fd, const short which, void *arg) {
conn *c;
//組裝conn結構
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//最終轉交給了drive_machine這個方法
//memcache的大部分的網絡事件都是由drive_machine這個方法來處理的
//drive_machine這個方法主要通過c->state這個事件的類型來處理不同類型的事件
drive_machine(c);
/* wait for next event */
return;
}
然後繼續看最重要的,也是核心的處理事件的方法drive_machine(狀态機),監聽socket連接配接、監聽socket的讀寫、斷開連接配接等操作都是在drive_machine這個方法中實作的。而這些操作都是通過c->state這個狀态來判斷不同的操作類型。
static void drive_machine(conn *c) {
//....................
assert(c != NULL);
while (!stop) {
//這邊通過state來處理不同類型的事件
switch(c->state) {
//這邊主要處理tcp連接配接,隻有在主線程的下,才會執行listening監聽操作。
case conn_listening:
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
//......................
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
//連接配接等待
case conn_waiting:
//.........
break;
//讀取事件
//例如有使用者送出資料過來的時候,工作線程監聽到事件後,最終會調用這塊代碼
case conn_read:
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;
case conn_parse_cmd :
if (try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}
break;
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other
connections */
--nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.conn_yields++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rbytes > 0) {
/* We have already read in data into the input buffer,
so libevent will most likely not signal read events
on the socket (unless more data is available. As a
hack we should just put in a request to write data,
because that should be possible ;-)
*/
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
}
stop = true;
}
break;
case conn_nread:
//....................
break;
case conn_swallow:
//....................
break;
case conn_write:
//.................
break;
//連接配接關閉
case conn_closing:
if (IS_UDP(c->transport))
conn_cleanup(c);
else
conn_close(c);
stop = true;
break;
case conn_closed:
/* This only happens if dormando is an idiot. */
abort();
break;
case conn_max_state:
assert(false);
break;
}
}
return;
}
4. main thread主線程源碼分析
主線程的socket server主要通過server_sockets這個方法建立。而server_sockets中主要調用了server_socket這個方法,我們可以看下server_socket這個方法:
/**
* Create a socket and bind it to a specific port number
* @param interface the interface to bind to
* @param port the port number to bind to
* @param transport the transport protocol (TCP / UDP)
* @param portnumber_file A filepointer to write the port numbers to
* when they are successfully added to the list of ports we
* listen on.
*/
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
//建立一個新的事件
//我們發現上面的工作線程也是調用這個方法,但是差別是這個方法指定了state的類型為:conn_listening
//注意這邊有一個conn_listening,這個參數主要是指定調用drive_machine這個方法中的conn_listen代碼塊。
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
conn_new方法:
//我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
//主線程主要是監聽使用者的socket連接配接事件;工作線程主要監聽socket的讀寫事件
//當使用者socket的連接配接有資料傳遞過來的時候,就會調用event_handler這個回調函數
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
//将事件添加到libevent的loop循環中
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
然後我們跟蹤進入event_handler這個方法,并且進入drive_machine這個方法,我們上面說過server_socket這個方法中傳遞的state參數為預設寫死的conn_listening這個狀态,是以我們詳細看drive_machine中關于conn_listening這塊邏輯的代碼。
case conn_listening:
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
//我們可以看到下面的代碼是accept,接受用戶端的socket連接配接的代碼
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
if (sfd == -1) {
if (use_accept4 && errno == ENOSYS) {
use_accept4 = 0;
continue;
}
perror(use_accept4 ? "accept4()" : "accept()");
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* these are transient, so don't log anything */
stop = true;
} else if (errno == EMFILE) {
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(false);
stop = true;
} else {
perror("accept()");
stop = true;
}
break;
}
if (!use_accept4) {
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
}
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
} else {
//如果用戶端用socket連接配接上來,則會調用這個分發邏輯的函數
//這個函數會将連接配接資訊分發到某一個工作線程中,然後工作線程接管具體的讀寫操作
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
dispatch_conn_new方法:
/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
//每個連接配接連上來的時候,都會申請一塊CQ_ITEM的記憶體塊,用于存儲連接配接的基本資訊
CQ_ITEM *item = cqi_new();
char buf[1];
//如果item建立失敗,則關閉連接配接
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
//這個方法非常重要。主要是通過求餘數的方法來得到目前的連接配接需要哪個線程來接管
//而且last_thread會記錄每次最後一次使用的工作線程,每次記錄之後就可以讓工作線程進入一個輪詢,保證了每個工作線程處理的連接配接數的平衡
int tid = (last_thread + 1) % settings.num_threads;
//擷取線程的基本結構
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//向工作線程的隊列中放入CQ_ITEM
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
//向工作線程的pipe中寫入1
//工作線程監聽到pipe中有寫入資料,工作線程接收到通知後,就會向thread->new_conn_queue隊列中pop出一個item,然後進行連接配接的接管操作
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}