天天看點

Libevent源碼分析-----連接配接監聽器evconnlistener使用evconnlistener:evconnlistener的封裝:釋放evconnlistener:

        轉載請注明出處:http://blog.csdn.net/luotuo44/article/details/38800363

使用evconnlistener:

        基于event和event_base已經可以寫一個CS模型了。但是對于伺服器端來說,仍然需要使用者自行調用socket、bind、listen、accept等步驟。這個過程有點繁瑣,為此在2.0.2-alpha版本的Libevent推出了一些對應的封裝函數。

        使用者隻需初始化struct sockaddr_in結構體變量,然後把它作為參數傳給函數evconnlistener_new_bind即可。該函數會完成上面說到的那4個過程。下面的代碼是一個使用例子。

#include<netinet/in.h>
#include<sys/socket.h>
#include<unistd.h>

#include<stdio.h>
#include<string.h>

#include<event.h>
#include<listener.h>
#include<bufferevent.h>
#include<thread.h>


void listener_cb(evconnlistener *listener, evutil_socket_t fd,
                 struct sockaddr *sock, int socklen, void *arg);

void socket_read_cb(bufferevent *bev, void *arg);
void socket_error_cb(bufferevent *bev, short events, void *arg);

int main()
{
    evthread_use_pthreads();//enable threads

    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(struct sockaddr_in));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(8989);

    event_base *base = event_base_new();
    evconnlistener *listener
            = evconnlistener_new_bind(base, listener_cb, base,
                                      LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE,
                                      10, (struct sockaddr*)&sin,
                                      sizeof(struct sockaddr_in));

    event_base_dispatch(base);

	 evconnlistener_free(listener);
    event_base_free(base);

    return 0;
}


//有新的用戶端連接配接到伺服器
//當此函數被調用時,libevent已經幫我們accept了這個用戶端。該用戶端的
//檔案描述符為fd
void listener_cb(evconnlistener *listener, evutil_socket_t fd,
                 struct sockaddr *sock, int socklen, void *arg)
{
    event_base *base = (event_base*)arg;

	//下面代碼是為這個fd建立一個bufferevent
    bufferevent *bev =  bufferevent_socket_new(base, fd,
                                               BEV_OPT_CLOSE_ON_FREE);

    bufferevent_setcb(bev, socket_read_cb, NULL, socket_error_cb, NULL);
    bufferevent_enable(bev, EV_READ | EV_PERSIST);
}


void socket_read_cb(bufferevent *bev, void *arg)
{
    char msg[4096];

    size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );

    msg[len] = '\0';
    printf("server read the data %s\n", msg);

    char reply[] = "I has read your data";
    bufferevent_write(bev, reply, strlen(reply) );
}


void socket_error_cb(bufferevent *bev, short events, void *arg)
{
    if (events & BEV_EVENT_EOF)
        printf("connection closed\n");
    else if (events & BEV_EVENT_ERROR)
        printf("some other error\n");

    //這将自動close套接字和free讀寫緩沖區
    bufferevent_free(bev);
}
           

        上面的代碼是一個伺服器端的例子,用戶端代碼可以使用《Libevent使用例子,從簡單到複雜》博文中的用戶端。這裡就不貼用戶端代碼了。

        從上面代碼可以看到,當伺服器端監聽到一個用戶端的連接配接請求後,就會調用listener_cb這個回調函數。這個回調函數是在evconnlistener_new_bind函數中設定的。現在來看一下這個函數的參數有哪些,下面是其函數原型。

//listener.h檔案
typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);

struct evconnlistener *evconnlistener_new_bind(struct event_base *base,
    evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
    const struct sockaddr *sa, int socklen);
           

        第一個參數是很熟悉的event_base,無論怎麼樣都是離不開event_base這個發動機的。

        第二個參數是一個函數指針,該函數指針的格式如代碼所示。當有新的用戶端請求連接配接時,該函數就會調用。要注意的是:當這個回調函數被調用時,Libevent已經幫我們accept了這個用戶端。是以,該回調函數有一個參數是檔案描述符fd。我們直接使用這個fd即可。真是友善。這個參數是可以為NULL的,此時使用者并不能接收到用戶端。當使用者調用evconnlistener_set_cb函數設定回調函數後,就可以了。

        第三個參數是傳給回調函數的使用者參數,作用就像event_new函數的最後一個參數。

        參數flags是一些标志值,有下面這些:

  • LEV_OPT_LEAVE_SOCKETS_BLOCKING:預設情況下,當連接配接監聽器接收到新的用戶端socket連接配接後,會把該socket設定為非阻塞的。如果設定該選項,那麼就把之用戶端socket保留為阻塞的
  • LEV_OPT_CLOSE_ON_FREE:當連接配接監聽器釋放時,會自動關閉底層的socket
  • LEV_OPT_CLOSE_ON_EXEC:為底層的socket設定close-on-exec标志
  • LEV_OPT_REUSEABLE: 在某些平台,預設情況下當一個監聽socket被關閉時,其他socket不能馬上綁定到同一個端口,要等一會兒才行。設定該标志後,Libevent會把該socket設定成reuseable。這樣,關閉該socket後,其他socket就能馬上使用同一個端口
  • LEV_OPT_THREADSAFE:為連接配接監聽器配置設定鎖。這樣可以確定線程安全

        參數backlog是系統調用listen的第二個參數。最後兩個參數就不多說了。

evconnlistener的封裝:

        接下來看一下Libevent是怎麼封裝evconnlistener的。

用到的結構體:

//listener.c檔案
struct evconnlistener_ops {//一系列的工作函數
	int (*enable)(struct evconnlistener *);
	int (*disable)(struct evconnlistener *);
	void (*destroy)(struct evconnlistener *);
	void (*shutdown)(struct evconnlistener *);
	evutil_socket_t (*getfd)(struct evconnlistener *);
	struct event_base *(*getbase)(struct evconnlistener *);
};

struct evconnlistener {
	const struct evconnlistener_ops *ops;//操作函數
	void *lock; //鎖變量,用于線程安全
	evconnlistener_cb cb;//使用者的回調函數
	evconnlistener_errorcb errorcb;//發生錯誤時的回調函數
	void *user_data;//回調函數的參數
	unsigned flags;//屬性标志
	short refcnt;//引用計數
	unsigned enabled : 1;//位域為1.即隻需一個比特位來存儲這個成員
};

struct evconnlistener_event {
	struct evconnlistener base;
	struct event listener; //内部event,插入到event_base
};
           

        在evconnlistener_event結構體有一個event結構體。可以想象,在實作時必然是将伺服器端的socket fd指派給struct event 類型變量listener的fd成員。然後将listener加入到event_base,這樣就完成了自動監聽工作。這也回歸到之前學過的内容。

        下面看一下具體是怎麼實作的。

初始化伺服器socket:

//listener.c檔案
struct evconnlistener *
evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
    void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,
    int socklen)
{
	struct evconnlistener *listener;
	evutil_socket_t fd;
	int on = 1;
	int family = sa ? sa->sa_family : AF_UNSPEC;

	//監聽個數不能為0
	if (backlog == 0)
		return NULL;

	fd = socket(family, SOCK_STREAM, 0);
	if (fd == -1)
		return NULL;

	//LEV_OPT_LEAVE_SOCKETS_BLOCKING選項是應用于accept到的用戶端socket
	//是以對于伺服器端的socket,直接将之設定為非阻塞的
	if (evutil_make_socket_nonblocking(fd) < 0) {
		evutil_closesocket(fd);
		return NULL;
	}

	if (flags & LEV_OPT_CLOSE_ON_EXEC) {
		if (evutil_make_socket_closeonexec(fd) < 0) {
			evutil_closesocket(fd);
			return NULL;
		}
	}

	if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0) {
		evutil_closesocket(fd);
		return NULL;
	}
	if (flags & LEV_OPT_REUSEABLE) {
		if (evutil_make_listen_socket_reuseable(fd) < 0) {
			evutil_closesocket(fd);
			return NULL;
		}
	}

	if (sa) {
		if (bind(fd, sa, socklen)<0) {//綁定
			evutil_closesocket(fd);
			return NULL;
		}
	}

	listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);
	if (!listener) {
		evutil_closesocket(fd);
		return NULL;
	}

	return listener;
}
           

        evconnlistener_new_bind函數申請一個socket,然後對之進行一些有關非阻塞、重用、保持連接配接的處理、綁定到特定的IP和端口。最後把業務邏輯交給evconnlistener_new處理。

//listener.c檔案
static const struct evconnlistener_ops evconnlistener_event_ops = {
	event_listener_enable,
	event_listener_disable,
	event_listener_destroy,
	NULL, /* shutdown */
	event_listener_getfd,
	event_listener_getbase
};


struct evconnlistener *
evconnlistener_new(struct event_base *base,
    evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
    evutil_socket_t fd)
{
	struct evconnlistener_event *lev;

	if (backlog > 0) {
		if (listen(fd, backlog) < 0)
			return NULL;
	} else if (backlog < 0) {
		if (listen(fd, 128) < 0)
			return NULL;
	}

	lev = mm_calloc(1, sizeof(struct evconnlistener_event));
	if (!lev)
		return NULL;

	//指派
	lev->base.ops = &evconnlistener_event_ops;
	lev->base.cb = cb;
	lev->base.user_data = ptr;
	lev->base.flags = flags;
	lev->base.refcnt = 1;

	if (flags & LEV_OPT_THREADSAFE) {//線程安全就需要配置設定鎖
		EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
	}

	//在多路IO複用函數中,新用戶端的連接配接請求也被當作讀事件
	event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
	    listener_read_cb, lev);

	//會調用event_add,把event加入到event_base中
	evconnlistener_enable(&lev->base);

	return &lev->base;
}

int
evconnlistener_enable(struct evconnlistener *lev)
{
	int r;
	LOCK(lev);
	lev->enabled = 1;
	if (lev->cb)
		r = lev->ops->enable(lev);//實際上是調用下面的event_listener_enable函數
	else
		r = 0;
	UNLOCK(lev);
	return r;
}

static int
event_listener_enable(struct evconnlistener *lev)
{
	struct evconnlistener_event *lev_e =
	    EVUTIL_UPCAST(lev, struct evconnlistener_event, base);

	//加入到event_base,完成監聽工作。
	return event_add(&lev_e->listener, NULL);
}
           

        幾個函數的一路調用,思路還是挺清晰的。就是申請一個socket,進行一些處理,然後用之指派給event。最後把之add到event_base中。event_base會對新用戶端的請求連接配接進行監聽。

        在evconnlistener_enable函數裡面,如果使用者沒有設定回調函數,那麼就不會調用event_listener_enable。也就是說并不會add到event_base中。

        event_listener_enable函數裡面的宏EVUTIL_UPCAST可以根據結構體成員變量的位址推算出結構體的起始位址。有關這個宏,可以檢視”結構體偏移量”。

處理用戶端的連接配接請求:

        現在來看一下event的回調函數listener_read_cb。

//listener.c檔案
static void
listener_read_cb(evutil_socket_t fd, short what, void *p)
{
	struct evconnlistener *lev = p;
	int err;
	evconnlistener_cb cb;
	evconnlistener_errorcb errorcb;
	void *user_data;
	LOCK(lev);
	while (1) { //可能有多個用戶端同時請求連接配接
		struct sockaddr_storage ss;
#ifdef WIN32
		int socklen = sizeof(ss);
#else
		socklen_t socklen = sizeof(ss);
#endif
		evutil_socket_t new_fd = accept(fd, (struct sockaddr*)&ss, &socklen);
		if (new_fd < 0)
			break;
		if (socklen == 0) {
			/* This can happen with some older linux kernels in
			 * response to nmap. */
			evutil_closesocket(new_fd);
			continue;
		}

		if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
			evutil_make_socket_nonblocking(new_fd);

		//使用者還沒設定連接配接監聽器的回調函數
		if (lev->cb == NULL) {
			UNLOCK(lev);
			return;
		}

		//由于refcnt被初始化為1.這裡有++了,是以一般情況下并不會進入下面的
		//if判斷裡面。但如果程在下面UNLOCK之後,第二個線調用evconnlistener_free
		//釋放這個evconnlistener時,就有可能使得refcnt為1了。即進入那個判斷體裡
		//執行listener_decref_and_unlock。在下面會讨論這個問題。
		++lev->refcnt;
		cb = lev->cb;
		user_data = lev->user_data;
		UNLOCK(lev);
		cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
		    user_data);//調用使用者設定的回調函數,讓使用者處理這個fd
		LOCK(lev);
		if (lev->refcnt == 1) {
			int freed = listener_decref_and_unlock(lev);
			EVUTIL_ASSERT(freed);
			return;
		}
		--lev->refcnt;
	}
	
	err = evutil_socket_geterror(fd);
	if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {//還可以accept
		UNLOCK(lev);
		return;
	}

	//當有錯誤發生時才會運作到這裡
	if (lev->errorcb != NULL) {
		++lev->refcnt;
		errorcb = lev->errorcb;
		user_data = lev->user_data;
		UNLOCK(lev);
		errorcb(lev, user_data);//調用使用者設定的錯誤回調函數
		LOCK(lev);
		listener_decref_and_unlock(lev);
	}
}
           

        這個函數所做的工作也比較簡單,就是accept用戶端,然後調用使用者設定的回調函數。是以,使用者回調函數的參數fd是一個已經連接配接好了的socket。

        上面函數說到了錯誤回調函數,可以通過下面的函數設定連接配接監聽器的錯誤監聽函數。

//listener.h檔案
typedef void (*evconnlistener_errorcb)(struct evconnlistener *, void *);

//listener.c檔案
void
evconnlistener_set_error_cb(struct evconnlistener *lev,
    evconnlistener_errorcb errorcb)
{
	LOCK(lev);
	lev->errorcb = errorcb;
	UNLOCK(lev);
}
           

釋放evconnlistener:

        調用evconnlistener_free可以釋放一個evconnlistener。由于evconnlistener擁有一些系統資源,在釋放evconnlistener_free的時候會釋放這些系統資源。

//listener.c檔案
void
evconnlistener_free(struct evconnlistener *lev)
{
	LOCK(lev);
	lev->cb = NULL;
	lev->errorcb = NULL;
	if (lev->ops->shutdown)//這裡的shutdown為NULL
		lev->ops->shutdown(lev);

	//引用次數減一,并解鎖
	listener_decref_and_unlock(lev);
}

static int
listener_decref_and_unlock(struct evconnlistener *listener)
{
	int refcnt = --listener->refcnt;
	if (refcnt == 0) {
		//實際調用event_listener_destroy
		listener->ops->destroy(listener);
		UNLOCK(listener);
		//釋放鎖
		EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
		mm_free(listener);
		return 1;
	} else {
		UNLOCK(listener);
		return 0;
	}
}

static void
event_listener_destroy(struct evconnlistener *lev)
{
	struct evconnlistener_event *lev_e =
	    EVUTIL_UPCAST(lev, struct evconnlistener_event, base);

	//把event從event_base中删除
	event_del(&lev_e->listener);
	if (lev->flags & LEV_OPT_CLOSE_ON_FREE)//如果使用者設定了這個選項,那麼要關閉socket
		evutil_closesocket(event_get_fd(&lev_e->listener));
}
           

        要注意一點,LEV_OPT_CLOSE_ON_FREE選項關閉的是伺服器端的監聽socket,而非那些連接配接用戶端的socket。

        現在來說一下那個listener_decref_and_unlock。前面注釋說到,在函數listener_read_cb中,一般情況下是不會調用listener_decref_and_unlock,但在多線程的時候可能會調用。這種特殊情況是:當主線程accept到一個新用戶端時,會解鎖,并調用使用者設定的回調函數。此時,引用計數等于2。就在這個時候,第二個線程執行evconnlistener_free函數。該函數會執行listener_decref_and_unlock。明顯主線程還在用這個evconnlistener,肯定不能删除。此時引用計數也等于2也不會删除。但使用者已經調用了evconnlistener_free。Libevent必須要響應。當第二個線程執行完後,主線程搶到CPU,此時引用計數就變成1了,也就進入到if判斷裡面了。在判斷體裡面執行函數listener_decref_and_unlock,并且完成删除工作。

        總得來說,Libevent封裝的這個evconnlistener和一系列操作函數,還是比較簡單的。思路也比較清晰。

參考:

        http://www.wangafu.net/~nickm/libevent-book/Ref8_listener.html

繼續閱讀