天天看点

Memcached源码分析之基于Libevent的网络模型

memcached是一款非常普及的服务器端缓存软件,memcached主要是基于Libevent库进行开发的。Memcached分析

1.  网络模型流程分析

Memcached主要是基于Libevent的事件库来实现网络线程模型的。我们先需要下载memcached的源码包,上面我们已经给出了源码包下载地址。

Memcached的网络线程模型主要涉及两个主要文件:memcached.c 和thread.c文件。

我们这边主要分析tcp的模型。memcached也支持udp。

流程

1. memcached首先在主线程中会创建main_base,memcached的主线程的主要工作就是监听和接收listen和accpet新进入的连接。

2. 当memcached启动的时候会初始化N个worker thread工作线程,每个工作线程都会有自己的LIBEVENT_THREAD数据结构来存储线程的信息(线程基本信息、线程队列、pipe信息)。worker thread工作线程和main thread主线程之间主要通过pipe来进行通信。

3. 当用户有连接进来的时候,main thread主线程会通过求余的方式选择一个worker thread工作线程。

4. main thread会将当前用户的连接信息放入一个CQ_ITEM,并且将CQ_ITEM放入这个线程的conn_queue处理队列,然后主线程会通过写入pipe的方式来通知worker thread工作线程。

5. 当工作线程得到主线程main thread的通知后,就会去自己的conn_queue队列中取得一条连接信息进行处理,创建libevent的socket读写事件。

6. 工作线程会监听用户的socket,如果用户有消息传递过来,则会进行消息解析和处理,返回相应的结果。

流程图

Memcached源码分析之基于Libevent的网络模型

数据结构:

1. CQ_ITEM:主要用于存储用户socket连接的基本信息。

主线程会将用户的socket连接信息封装成CQ_ITEM,并放入工作线程的处理队列中。工作线程得到主线程的pipe通知后,就会将队列中的ITEM取出来,创建libevent的socket读事件。

/* An item in the connection queue. */  
typedef struct conn_queue_item CQ_ITEM;  
struct conn_queue_item {  
    int               sfd;   //socket的fd  
    enum conn_states  init_state; //事件类型  
    int               event_flags; //libevent的flags  
    int               read_buffer_size; //读取的buffer的size  
    enum network_transport     transport;   
    CQ_ITEM          *next; //下一个item的地址  
};      

2. CQ:每个线程的处理队列结构。

/* A connection queue. */  
typedef struct conn_queue CQ;  
struct conn_queue {  
    CQ_ITEM *head;  
    CQ_ITEM *tail;  
    pthread_mutex_t lock;  
};      

3. LIBEVENT_THREAD:每个工作线程的数据结构。

每一个工作线程都有有这么一个自己的数据结构,主要存储线程信息、处理队列、pipe信息等。

typedef struct {  
    pthread_t thread_id;        /* unique ID of this thread */  
    struct event_base *base;    /* libevent handle this thread uses */  
    struct event notify_event;  /* listen event for notify pipe */  
    int notify_receive_fd;      /* receiving end of notify pipe */  
    int notify_send_fd;         /* sending end of notify pipe */  
    struct thread_stats stats;  /* Stats generated by this thread */  
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */  
    cache_t *suffix_cache;      /* suffix cache */  
    uint8_t item_lock_type;     /* use fine-grained or global item lock */  
} LIBEVENT_THREAD;      

2. main启动入口

我们需要找到memcached.c中的main()方法。下面的代码中只列出了我们需要的重要部分。

int main (int argc, char **argv) {  
    //...省去一部分代码  
    /* initialize main thread libevent instance */  
    //初始化一个event_base  
    main_base = event_init();  
  
    /* initialize other stuff */  
    stats_init();  
    assoc_init(settings.hashpower_init);  
    conn_init();  
    slabs_init(settings.maxbytes, settings.factor, preallocate);  
  
    /* 
     * ignore SIGPIPE signals; we can use errno == EPIPE if we 
     * need that information 
     */  
    if (sigignore(SIGPIPE) == -1) {  
        perror("failed to ignore SIGPIPE; sigaction");  
        exit(EX_OSERR);  
    }  
    /* start up worker threads if MT mode */  
    //这边非常重要,这个方法主要用来创建工作线程,默认会创建8个工作线程  
    thread_init(settings.num_threads, main_base);  
  
    if (start_assoc_maintenance_thread() == -1) {  
        exit(EXIT_FAILURE);  
    }  
  
    if (settings.slab_reassign &&  
        start_slab_maintenance_thread() == -1) {  
        exit(EXIT_FAILURE);  
    }  
  
    /* Run regardless of initializing it later */  
    init_lru_crawler();  
  
    /* initialise clock event */  
    clock_handler(0, 0, 0);  
  
    /* create unix mode sockets after dropping privileges */  
    if (settings.socketpath != NULL) {  
        errno = 0;  
        if (server_socket_unix(settings.socketpath,settings.access)) {  
            vperror("failed to listen on UNIX socket: %s", settings.socketpath);  
            exit(EX_OSERR);  
        }  
    }  
  
    /* create the listening socket, bind it, and init */  
    if (settings.socketpath == NULL) {  
        const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");  
        char temp_portnumber_filename[PATH_MAX];  
        FILE *portnumber_file = NULL;  
  
        if (portnumber_filename != NULL) {  
            snprintf(temp_portnumber_filename,  
                     sizeof(temp_portnumber_filename),  
                     "%s.lck", portnumber_filename);  
  
            portnumber_file = fopen(temp_portnumber_filename, "a");  
            if (portnumber_file == NULL) {  
                fprintf(stderr, "Failed to open \"%s\": %s\n",  
                        temp_portnumber_filename, strerror(errno));  
            }  
        }  
  
        errno = 0;  
        //这边的server_sockets方法主要是socket的bind、listen、accept等操作  
        //主线程主要用于接收客户端的socket连接,并且将连接交给工作线程接管。  
        if (settings.port && server_sockets(settings.port, tcp_transport,  
                                           portnumber_file)) {  
            vperror("failed to listen on TCP port %d", settings.port);  
            exit(EX_OSERR);  
        }  
    }  
    /* enter the event loop */  
    //这边开始进行主线程的事件循环  
    if (event_base_loop(main_base, 0) != 0) {  
        retval = EXIT_FAILURE;  
    }  
 //...省去一部分代码  
}
           

主线程中主要是通过thread_init方法去创建N个工作线程:

thread_init(settings.num_threads, main_base);      

通过server_sockets方法去创建socket server:

errno = 0;  
if (settings.port && server_sockets(settings.port, tcp_transport,  
                                   portnumber_file)) {  
    vperror("failed to listen on TCP port %d", settings.port);  
    exit(EX_OSERR);  
}      

3. worker thread工作线程源码分析

我们在thread.c文件中找到thread_init这个方法:

void thread_init(int nthreads, struct event_base *main_base) {  
    //...省了一部分代码  
    //这边通过循环的方式创建nthreads个线程  
    //nthreads应该是可以设置的  
    for (i = 0; i < nthreads; i++) {  
        int fds[2];  
        //这边会创建pipe,主要用于主线程和工作线程之间的通信  
        if (pipe(fds)) {  
            perror("Can't create notify pipe");  
            exit(1);  
        }  
        //threads是工作线程的基本结构:LIBEVENT_THREAD  
        //将pipe接收端和写入端都放到工作线程的结构体中  
        threads[i].notify_receive_fd = fds[0]; //接收端  
        threads[i].notify_send_fd = fds[1]; //写入端  
  
        //这个方法非常重要,主要是创建每个线程自己的libevent的event_base  
        setup_thread(&threads[i]);  
        /* Reserve three fds for the libevent base, and two for the pipe */  
        stats.reserved_fds += 5;  
    }  
  
    /* Create threads after we've done all the libevent setup. */  
    //这里是循环创建线程  
    //线程创建的回调函数是worker_libevent  
    for (i = 0; i < nthreads; i++) {  
        create_worker(worker_libevent, &threads[i]);  
    }  
  
    /* Wait for all the threads to set themselves up before returning. */  
    pthread_mutex_lock(&init_lock);  
    wait_for_thread_registration(nthreads);  
    pthread_mutex_unlock(&init_lock);  
}
           

setup_thread方法:

/* 
 * Set up a thread's information. 
 */  
static void setup_thread(LIBEVENT_THREAD *me) {  
    //创建一个event_base  
    //根据libevent的使用文档,我们可以知道一般情况下每个独立的线程都应该有自己独立的event_base  
    me->base = event_init();  
    if (! me->base) {  
        fprintf(stderr, "Can't allocate event base\n");  
        exit(1);  
    }  
  
    /* Listen for notifications from other threads */  
    //这边非常重要,这边主要创建pipe的读事件EV_READ的监听  
    //当pipe中有写入事件的时候,libevent就会回调thread_libevent_process方法  
    event_set(&me->notify_event, me->notify_receive_fd,  
              EV_READ | EV_PERSIST, thread_libevent_process, me);  
    event_base_set(me->base, &me->notify_event);  
    //添加事件操作  
    if (event_add(&me->notify_event, 0) == -1) {  
        fprintf(stderr, "Can't monitor libevent notify pipe\n");  
        exit(1);  
    }  
    //初始化一个工作队列  
    me->new_conn_queue = malloc(sizeof(struct conn_queue));  
    if (me->new_conn_queue == NULL) {  
        perror("Failed to allocate memory for connection queue");  
        exit(EXIT_FAILURE);  
    }  
    cq_init(me->new_conn_queue);  
  
    //初始化线程锁  
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {  
        perror("Failed to initialize mutex");  
        exit(EXIT_FAILURE);  
    }  
  
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),  
                                    NULL, NULL);  
    if (me->suffix_cache == NULL) {  
        fprintf(stderr, "Failed to create suffix cache\n");  
        exit(EXIT_FAILURE);  
    }  
}
           

create_worker方法:

/* 
 * Creates a worker thread. 
 */  
//这个方法是真正的创建工作线程  
static void create_worker(void *(*func)(void *), void *arg) {  
    pthread_t       thread;  
    pthread_attr_t  attr;  
    int             ret;  
  
    pthread_attr_init(&attr);  
    //这边真正的创建线程  
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {  
        fprintf(stderr, "Can't create thread: %s\n",  
                strerror(ret));  
        exit(1);  
    }  
}
           

worker_libevent方法:

/* 
 * Worker thread: main event loop 
 */  
static void *worker_libevent(void *arg) {  
    LIBEVENT_THREAD *me = arg;  
  
    /* Any per-thread setup can happen here; thread_init() will block until 
     * all threads have finished initializing. 
     */  
  
    /* set an indexable thread-specific memory item for the lock type. 
     * this could be unnecessary if we pass the conn *c struct through 
     * all item_lock calls... 
     */  
    me->item_lock_type = ITEM_LOCK_GRANULAR;  
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);  
  
    register_thread_initialized();  
    //这个方法主要是开启事件的循环  
    //每个线程中都会有自己独立的event_base和事件的循环机制  
    //memcache的每个工作线程都会独立处理自己接管的连接  
    event_base_loop(me->base, 0);  
    return NULL;  
}
           

thread_libevent_process方法:

static void thread_libevent_process(int fd, short which, void *arg) {  
    LIBEVENT_THREAD *me = arg;  
    CQ_ITEM *item;  
    char buf[1];  
  
    //回调函数中回去读取pipe中的信息  
    //主线程中如果有新的连接,会向其中一个线程的pipe中写入1  
    //这边读取pipe中的数据,如果为1,则说明从pipe中获取的数据是正确的  
    if (read(fd, buf, 1) != 1)  
        if (settings.verbose > 0)  
            fprintf(stderr, "Can't read from libevent pipe\n");  
  
    switch (buf[0]) {  
    case 'c':  
    //从工作线程的队列中获取一个CQ_ITEM连接信息  
    item = cq_pop(me->new_conn_queue);  
    //如果item不为空,则需要进行连接的接管  
    if (NULL != item) {  
        //conn_new这个方法非常重要,主要是创建socket的读写等监听事件。  
        //init_state 为初始化的类型,主要在drive_machine中通过这个状态类判断处理类型  
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,  
                           item->read_buffer_size, item->transport, me->base);  
        if (c == NULL) {  
            if (IS_UDP(item->transport)) {  
                fprintf(stderr, "Can't listen for events on UDP socket\n");  
                exit(1);  
            } else {  
                if (settings.verbose > 0) {  
                    fprintf(stderr, "Can't listen for events on fd %d\n",  
                        item->sfd);  
                }  
                close(item->sfd);  
            }  
        } else {  
            c->thread = me;  
        }  
        cqi_free(item);  
    }  
        break;  
    /* we were told to flip the lock type and report in */  
    case 'l':  
    me->item_lock_type = ITEM_LOCK_GRANULAR;  
    register_thread_initialized();  
        break;  
    case 'g':  
    me->item_lock_type = ITEM_LOCK_GLOBAL;  
    register_thread_initialized();  
        break;  
    }  
}
           

conn_new方法(主要看两行):

//我们发现这个方法中又在创建event了,这边实际上是监听socket的读写等事件  
//主线程主要是监听用户的socket连接事件;工作线程主要监听socket的读写事件  
//当用户socket的连接有数据传递过来的时候,就会调用event_handler这个回调函数  
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);  
event_base_set(base, &c->event);   
c->ev_flags = event_flags;  
//将事件添加到libevent的loop循环中  
if (event_add(&c->event, 0) == -1) {  
    perror("event_add");  
    return NULL;  
}
           

event_handler方法:

void event_handler(const int fd, const short which, void *arg) {  
    conn *c;  
    //组装conn结构  
    c = (conn *)arg;  
    assert(c != NULL);  
  
    c->which = which;  
  
    /* sanity */  
    if (fd != c->sfd) {  
        if (settings.verbose > 0)  
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");  
        conn_close(c);  
        return;  
    }  
    //最终转交给了drive_machine这个方法  
    //memcache的大部分的网络事件都是由drive_machine这个方法来处理的  
    //drive_machine这个方法主要通过c->state这个事件的类型来处理不同类型的事件     
    drive_machine(c);  
  
    /* wait for next event */  
    return;  
} 
           

然后继续看最重要的,也是核心的处理事件的方法drive_machine(状态机),监听socket连接、监听socket的读写、断开连接等操作都是在drive_machine这个方法中实现的。而这些操作都是通过c->state这个状态来判断不同的操作类型。

static void drive_machine(conn *c) {  
     //....................  
  
    assert(c != NULL);  
  
    while (!stop) {  
        //这边通过state来处理不同类型的事件  
        switch(c->state) {  
        //这边主要处理tcp连接,只有在主线程的下,才会执行listening监听操作。  
        case conn_listening:  
            addrlen = sizeof(addr);  
#ifdef HAVE_ACCEPT4  
            if (use_accept4) {  
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);  
            } else {  
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);  
            }  
#else  
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);  
#endif  
            //......................  
  
            if (settings.maxconns_fast &&  
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {  
                str = "ERROR Too many open connections\r\n";  
                res = write(sfd, str, strlen(str));  
                close(sfd);  
                STATS_LOCK();  
                stats.rejected_conns++;  
                STATS_UNLOCK();  
            } else {  
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,  
                                     DATA_BUFFER_SIZE, tcp_transport);  
            }  
  
            stop = true;  
            break;  
  
        //连接等待  
        case conn_waiting:  
            //.........  
            break;  
  
        //读取事件  
        //例如有用户提交数据过来的时候,工作线程监听到事件后,最终会调用这块代码  
        case conn_read:  
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);  
  
            switch (res) {  
            case READ_NO_DATA_RECEIVED:  
                conn_set_state(c, conn_waiting);  
                break;  
            case READ_DATA_RECEIVED:  
                conn_set_state(c, conn_parse_cmd);  
                break;  
            case READ_ERROR:  
                conn_set_state(c, conn_closing);  
                break;  
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */  
                /* State already set by try_read_network */  
                break;  
            }  
            break;  
  
        case conn_parse_cmd :  
            if (try_read_command(c) == 0) {  
                /* wee need more data! */  
                conn_set_state(c, conn_waiting);  
            }  
  
            break;  
  
        case conn_new_cmd:  
            /* Only process nreqs at a time to avoid starving other 
               connections */  
  
            --nreqs;  
            if (nreqs >= 0) {  
                reset_cmd_handler(c);  
            } else {  
                pthread_mutex_lock(&c->thread->stats.mutex);  
                c->thread->stats.conn_yields++;  
                pthread_mutex_unlock(&c->thread->stats.mutex);  
                if (c->rbytes > 0) {  
                    /* We have already read in data into the input buffer, 
                       so libevent will most likely not signal read events 
                       on the socket (unless more data is available. As a 
                       hack we should just put in a request to write data, 
                       because that should be possible ;-) 
                    */  
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {  
                        if (settings.verbose > 0)  
                            fprintf(stderr, "Couldn't update event\n");  
                        conn_set_state(c, conn_closing);  
                        break;  
                    }  
                }  
                stop = true;  
            }  
            break;  
  
        case conn_nread:  
           //....................  
            break;  
  
        case conn_swallow:  
          //....................  
            break;  
  
        case conn_write:  
           //.................  
            break;  
           
        //连接关闭  
        case conn_closing:  
            if (IS_UDP(c->transport))  
                conn_cleanup(c);  
            else  
                conn_close(c);  
            stop = true;  
            break;  
  
        case conn_closed:  
            /* This only happens if dormando is an idiot. */  
            abort();  
            break;  
  
        case conn_max_state:  
            assert(false);  
            break;  
        }  
    }  
  
    return;  
}
           

4. main thread主线程源码分析

主线程的socket server主要通过server_sockets这个方法创建。而server_sockets中主要调用了server_socket这个方法,我们可以看下server_socket这个方法:

/** 
 * Create a socket and bind it to a specific port number 
 * @param interface the interface to bind to 
 * @param port the port number to bind to 
 * @param transport the transport protocol (TCP / UDP) 
 * @param portnumber_file A filepointer to write the port numbers to 
 *        when they are successfully added to the list of ports we 
 *        listen on. 
 */  
static int server_socket(const char *interface,  
                         int port,  
                         enum network_transport transport,  
                         FILE *portnumber_file) {  
            //创建一个新的事件  
            //我们发现上面的工作线程也是调用这个方法,但是区别是这个方法指定了state的类型为:conn_listening  
            //注意这边有一个conn_listening,这个参数主要是指定调用drive_machine这个方法中的conn_listen代码块。  
            if (!(listen_conn_add = conn_new(sfd, conn_listening,  
                                             EV_READ | EV_PERSIST, 1,  
                                             transport, main_base))) {  
                fprintf(stderr, "failed to create listening connection\n");  
                exit(EXIT_FAILURE);  
            }  
            listen_conn_add->next = listen_conn;  
            listen_conn = listen_conn_add;  
}      

conn_new方法:

//我们发现这个方法中又在创建event了,这边实际上是监听socket的读写等事件  
//主线程主要是监听用户的socket连接事件;工作线程主要监听socket的读写事件  
//当用户socket的连接有数据传递过来的时候,就会调用event_handler这个回调函数  
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);  
event_base_set(base, &c->event);   
c->ev_flags = event_flags;  
//将事件添加到libevent的loop循环中  
if (event_add(&c->event, 0) == -1) {  
    perror("event_add");  
    return NULL;  
}
           

然后我们跟踪进入event_handler这个方法,并且进入drive_machine这个方法,我们上面说过server_socket这个方法中传递的state参数为默认写死的conn_listening这个状态,所以我们详细看drive_machine中关于conn_listening这块逻辑的代码。 

case conn_listening:  
            addrlen = sizeof(addr);  
#ifdef HAVE_ACCEPT4  
            //我们可以看到下面的代码是accept,接受客户端的socket连接的代码  
            if (use_accept4) {  
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);  
            } else {  
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);  
            }  
#else  
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);  
#endif  
            if (sfd == -1) {  
                if (use_accept4 && errno == ENOSYS) {  
                    use_accept4 = 0;  
                    continue;  
                }  
                perror(use_accept4 ? "accept4()" : "accept()");  
                if (errno == EAGAIN || errno == EWOULDBLOCK) {  
                    /* these are transient, so don't log anything */  
                    stop = true;  
                } else if (errno == EMFILE) {  
                    if (settings.verbose > 0)  
                        fprintf(stderr, "Too many open connections\n");  
                    accept_new_conns(false);  
                    stop = true;  
                } else {  
                    perror("accept()");  
                    stop = true;  
                }  
                break;  
            }  
            if (!use_accept4) {  
                if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {  
                    perror("setting O_NONBLOCK");  
                    close(sfd);  
                    break;  
                }  
            }  
  
            if (settings.maxconns_fast &&  
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {  
                str = "ERROR Too many open connections\r\n";  
                res = write(sfd, str, strlen(str));  
                close(sfd);  
                STATS_LOCK();  
                stats.rejected_conns++;  
                STATS_UNLOCK();  
            } else {  
                //如果客户端用socket连接上来,则会调用这个分发逻辑的函数  
                //这个函数会将连接信息分发到某一个工作线程中,然后工作线程接管具体的读写操作  
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,  
                                     DATA_BUFFER_SIZE, tcp_transport);  
            }  
  
            stop = true;  
            break; 
           

dispatch_conn_new方法:

/* 
 * Dispatches a new connection to another thread. This is only ever called 
 * from the main thread, either during initialization (for UDP) or because 
 * of an incoming connection. 
 */  
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,  
                       int read_buffer_size, enum network_transport transport) {  
    //每个连接连上来的时候,都会申请一块CQ_ITEM的内存块,用于存储连接的基本信息  
    CQ_ITEM *item = cqi_new();  
    char buf[1];  
    //如果item创建失败,则关闭连接  
    if (item == NULL) {  
        close(sfd);  
        /* given that malloc failed this may also fail, but let's try */  
        fprintf(stderr, "Failed to allocate memory for connection object\n");  
        return ;  
    }  
    //这个方法非常重要。主要是通过求余数的方法来得到当前的连接需要哪个线程来接管  
    //而且last_thread会记录每次最后一次使用的工作线程,每次记录之后就可以让工作线程进入一个轮询,保证了每个工作线程处理的连接数的平衡  
    int tid = (last_thread + 1) % settings.num_threads;  
  
    //获取线程的基本结构  
    LIBEVENT_THREAD *thread = threads + tid;  
  
    last_thread = tid;  
  
    item->sfd = sfd;  
    item->init_state = init_state;  
    item->event_flags = event_flags;  
    item->read_buffer_size = read_buffer_size;  
    item->transport = transport;  
    //向工作线程的队列中放入CQ_ITEM  
    cq_push(thread->new_conn_queue, item);  
  
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);  
    buf[0] = 'c';  
    //向工作线程的pipe中写入1  
    //工作线程监听到pipe中有写入数据,工作线程接收到通知后,就会向thread->new_conn_queue队列中pop出一个item,然后进行连接的接管操作  
    if (write(thread->notify_send_fd, buf, 1) != 1) {  
        perror("Writing to thread notify pipe");  
    }  
}
           

继续阅读