學習 mysql proxy 0.8.3 的源碼後可知,其全部事件處理線程均對全局 socketpair 的讀端進行了監聽,以實作通知管道的功能:threads->event_notify_fds[0] 。
<a href="http://my.oschina.net/moooofly/blog/121588#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
<code>int</code> <code>chassis_event_threads_init_thread(chassis_event_threads_t *threads, chassis_event_thread_t *event_thread, chassis *chas) {</code>
<code> </code><code>event_thread->event_base = event_base_new();</code>
<code> </code><code>...</code>
<code> </code><code>// 設定目前線程監聽 fd 為 socketpair 的讀端 fd</code>
<code> </code><code>event_thread->notify_fd = dup(threads->event_notify_fds[0]);</code>
<code> </code><code>event_set(&(event_thread->notify_fd_event), event_thread->notify_fd, ev_read | ev_persist, chassis_event_handle, event_thread);</code>
<code> </code><code>event_base_set(event_thread->event_base, &(event_thread->notify_fd_event));</code>
<code> </code><code>event_add(&(event_thread->notify_fd_event), null);</code>
<code> </code><code>return</code> <code>0;</code>
<code>}</code>
該 socketpair 是在主線程初始化過程中建立的:
14
15
16
17
18
19
20
<code>chassis_event_threads_t *chassis_event_threads_new() {</code>
<code> </code><code>threads = g_new0(chassis_event_threads_t, 1);</code>
<code> </code><code>/* create the ping-fds</code>
<code> </code><code>*</code>
<code> </code><code>* the event-thread write a byte to the ping-pipe to trigger a fd-event when</code>
<code> </code><code>* something is available in the event-async-queues</code>
<code> </code><code>*/</code>
<code> </code><code>// 建立 socketpair</code>
<code> </code><code>if</code> <code>(0 != evutil_socketpair(af_unix, sock_stream, 0, threads->event_notify_fds)) {</code>
<code> </code><code>...</code>
<code> </code><code>}</code>
<code> </code><code>/* make both ends non-blocking */</code>
<code> </code><code>evutil_make_socket_nonblocking(threads->event_notify_fds[0]);</code>
<code> </code><code>evutil_make_socket_nonblocking(threads->event_notify_fds[1]);</code>
<code> </code><code>return</code> <code>threads;</code>
其中 evutil_socketpair 實作如下(取自 libevent 1.4.13):
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<code>int</code>
<code>evutil_socketpair(</code><code>int</code> <code>family,</code><code>int</code> <code>type,</code><code>int</code> <code>protocol,</code><code>int</code> <code>fd[2])</code>
<code>{</code>
<code>#ifndef win32</code>
<code> </code><code>return</code> <code>socketpair(family, type, protocol, fd);</code>
<code>#else</code>
<code> </code><code>/* this code is originally from tor. used with permission. */</code>
<code> </code><code>/* this socketpair does not work when localhost is down. so</code>
<code> </code><code>* it's really not the same thing at all. but it's close enough</code>
<code> </code><code>* for now, and really, when localhost is down sometimes, we</code>
<code> </code><code>* have other problems too.</code>
<code> </code><code>int</code> <code>listener = -1;</code>
<code> </code><code>int</code> <code>connector = -1;</code>
<code> </code><code>int</code> <code>acceptor = -1;</code>
<code> </code><code>struct</code> <code>sockaddr_in listen_addr;</code>
<code> </code><code>struct</code> <code>sockaddr_in connect_addr;</code>
<code> </code><code>int</code> <code>size;</code>
<code> </code><code>int</code> <code>saved_errno = -1;</code>
<code> </code><code>if</code> <code>(protocol</code>
<code>#ifdef af_unix</code>
<code> </code><code>|| family != af_unix</code>
<code>#endif</code>
<code> </code><code>) {</code>
<code> </code><code>evutil_set_socket_error(wsaeafnosupport);</code>
<code> </code><code>return</code> <code>-1;</code>
<code> </code><code>if</code> <code>(!fd) {</code>
<code> </code><code>evutil_set_socket_error(wsaeinval);</code>
<code> </code><code>// 建立作為listener 的socket</code>
<code> </code><code>listener = socket(af_inet, type, 0);</code>
<code> </code><code>if</code> <code>(listener < 0)</code>
<code> </code><code>memset</code><code>(&listen_addr, 0,</code><code>sizeof</code><code>(listen_addr));</code>
<code> </code><code>listen_addr.sin_family = af_inet;</code>
<code> </code><code>listen_addr.sin_addr.s_addr = htonl(inaddr_loopback);</code>
<code> </code><code>listen_addr.sin_port = 0; </code><code>/* kernel chooses port. */</code>
<code> </code><code>// 進行綁定,核心會配置設定port</code>
<code> </code><code>if</code> <code>(bind(listener, (</code><code>struct</code> <code>sockaddr *) &listen_addr,</code><code>sizeof</code> <code>(listen_addr)) == -1)</code>
<code> </code><code>goto</code> <code>tidy_up_and_fail;</code>
<code> </code><code>// 宣告開始監聽連接配接請求</code>
<code> </code><code>if</code> <code>(listen(listener, 1) == -1)</code>
<code> </code><code>// 建立作為connector 的socket</code>
<code> </code><code>connector = socket(af_inet, type, 0);</code>
<code> </code><code>if</code> <code>(connector < 0)</code>
<code> </code><code>/* we want to find out the port number to connect to. */</code>
<code> </code><code>size =</code><code>sizeof</code><code>(connect_addr);</code>
<code> </code><code>// 擷取bind 後核心為listener 配置設定的port ( ip 為inaddr_loopback )</code>
<code> </code><code>if</code> <code>(getsockname(listener, (</code><code>struct</code> <code>sockaddr *) &connect_addr, &size) == -1)</code>
<code> </code><code>if</code> <code>(size !=</code><code>sizeof</code> <code>(connect_addr))</code>
<code> </code><code>goto</code> <code>abort_tidy_up_and_fail;</code>
<code> </code><code>// 從connector 向listener 發起連接配接,connect_addr 為連接配接目的位址</code>
<code> </code><code>if</code> <code>(connect(connector, (</code><code>struct</code> <code>sockaddr *) &connect_addr,</code><code>sizeof</code><code>(connect_addr)) == -1)</code>
<code> </code><code>size =</code><code>sizeof</code><code>(listen_addr);</code>
<code> </code><code>// 在套接字listener 上accept ,函數傳回後listen_addr 中為對端位址</code>
<code> </code><code>acceptor = accept(listener, (</code><code>struct</code> <code>sockaddr *) &listen_addr, &size);</code>
<code> </code><code>if</code> <code>(acceptor < 0)</code>
<code> </code><code>if</code> <code>(size !=</code><code>sizeof</code><code>(listen_addr))</code>
<code> </code><code>// 關閉listener</code>
<code> </code><code>evutil_closesocket(listener);</code>
<code> </code><code>/* now check we are talking to ourself by matching port and host on the</code>
<code> </code><code>two sockets. */</code>
<code> </code><code>// 擷取connect 後核心為connector 配置設定的位址資訊-- 自動綁定功能</code>
<code> </code><code>if</code> <code>(getsockname(connector, (</code><code>struct</code> <code>sockaddr *) &connect_addr, &size) == -1)</code>
<code> </code><code>// 将從兩側分别獲得的位址位址進行比較</code>
<code> </code><code>if</code> <code>(size !=</code><code>sizeof</code> <code>(connect_addr)</code>
<code> </code><code>|| listen_addr.sin_family != connect_addr.sin_family</code>
<code> </code><code>|| listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr</code>
<code> </code><code>|| listen_addr.sin_port != connect_addr.sin_port)</code>
<code> </code><code>fd[0] = connector;</code>
<code> </code><code>fd[1] = acceptor;</code>
<code> </code><code>abort_tidy_up_and_fail:</code>
<code> </code><code>saved_errno = wsaeconnaborted;</code>
<code> </code><code>tidy_up_and_fail:</code>
<code> </code><code>if</code> <code>(saved_errno < 0)</code>
<code> </code><code>saved_errno = wsagetlasterror();</code>
<code> </code><code>if</code> <code>(listener != -1)</code>
<code> </code><code>evutil_closesocket(listener);</code>
<code> </code><code>if</code> <code>(connector != -1)</code>
<code> </code><code>evutil_closesocket(connector);</code>
<code> </code><code>if</code> <code>(acceptor != -1)</code>
<code> </code><code>evutil_closesocket(acceptor);</code>
<code> </code><code>evutil_set_socket_error(saved_errno);</code>
<code> </code><code>return</code> <code>-1;</code>
從上述實作中可以看出,在非 win32 平台,直接就可以使用現成的 api 函數建立 socketpair ;在 win32 平台上,是通過建立兩個本地 socket 互相連接配接建立的 socketpair 。
實作上述功能的另外一種方式是,使用 pipe 。用法很簡單,摘抄代碼如下(摘自 memcached-1.4.14):
<code>void</code> <code>thread_init(</code><code>int</code> <code>nthreads,</code><code>struct</code> <code>event_base *main_base) {</code>
<code>...</code>
<code> </code><code>// nthreads 為建立的工作線程數</code>
<code> </code><code>for</code> <code>(i = 0; i < nthreads; i++) {</code>
<code> </code><code>int</code> <code>fds[2];</code>
<code> </code><code>if</code> <code>(pipe(fds)) { </code><code>// 使用pipe 作為工作線程擷取任務的通道</code>
<code> </code><code>perror</code><code>(</code><code>"can't create notify pipe"</code><code>);</code>
<code> </code><code>exit</code><code>(1);</code>
<code> </code><code>}</code>
<code> </code><code>threads[i].notify_receive_fd = fds[0]; </code><code>// 讀端</code>
<code> </code><code>threads[i].notify_send_fd = fds[1]; </code><code>// 寫端</code>
<code> </code><code>// 設定用于每個工作線程的libevent 相關資訊并建立cq 結構</code>
<code> </code><code>setup_thread(&threads[i]);</code>
<code> </code><code>...</code>
<code> </code><code>/* create threads after we've done all the libevent setup. */</code>
<code> </code><code>// 建立工作線程</code>
<code> </code><code>create_worker(worker_libevent, &threads[i]);</code>
至于用哪種更好,大家自己思考~~
====== 更新 2013-11-11 ======
最近寫 modb 代碼時,想要利用上面的線程間通信機制,是以使用了相對簡單的 pipe 實作方案,但在 windows 下調試時總會遇到 “unknown error 10038” 錯誤。查閱相關文檔後發現,結論是 windows 下不能将 pipe 和 select 一起使用,因為會認為 pipe 不是一個合法的 socket 句柄,然後 linux 下是沒有這個問題的。
解決方案:
通過 socket 模拟 pipe 的實作;
使用上面的 socketpair 實作;
<code>int</code> <code>pipe(</code><code>int</code> <code>fildes[2])</code>
<code> </code><code>int</code> <code>tcp1, tcp2;</code>
<code> </code><code>sockaddr_in name;</code>
<code> </code><code>memset</code><code>(&name, 0,</code><code>sizeof</code><code>(name));</code>
<code> </code><code>name.sin_family = af_inet;</code>
<code> </code><code>name.sin_addr.s_addr = htonl(inaddr_loopback);</code>
<code> </code><code>int</code> <code>namelen =</code><code>sizeof</code><code>(name);</code>
<code> </code><code>tcp1 = tcp2 = -1;</code>
<code> </code><code>int</code> <code>tcp = socket(af_inet, sock_stream, 0);</code>
<code> </code><code>if</code> <code>(tcp == -1){</code>
<code> </code><code>goto</code> <code>clean;</code>
<code> </code><code>if</code> <code>(bind(tcp, (sockaddr*)&name, namelen) == -1){</code>
<code> </code><code>if</code> <code>(listen(tcp, 5) == -1){</code>
<code> </code><code>if</code> <code>(getsockname(tcp, (sockaddr*)&name, &namelen) == -1){</code>
<code> </code><code>tcp1 = socket(af_inet, sock_stream, 0);</code>
<code> </code><code>if</code> <code>(tcp1 == -1){</code>
<code> </code><code>if</code> <code>(-1 == connect(tcp1, (sockaddr*)&name, namelen)){</code>
<code> </code><code>tcp2 = accept(tcp, (sockaddr*)&name, &namelen);</code>
<code> </code><code>if</code> <code>(tcp2 == -1){</code>
<code> </code><code>if</code> <code>(closesocket(tcp) == -1){</code>
<code> </code><code>fildes[0] = tcp1;</code>
<code> </code><code>fildes[1] = tcp2;</code>
<code>clean:</code>
<code> </code><code>if</code> <code>(tcp != -1){</code>
<code> </code><code>closesocket(tcp);</code>
<code> </code><code>if</code> <code>(tcp2 != -1){</code>
<code> </code><code>closesocket(tcp2);</code>
<code> </code><code>if</code> <code>(tcp1 != -1){</code>
<code> </code><code>closesocket(tcp1);</code>
原文作者指出有如下缺點:
效率低下(是否所有其他實作方式都比基于 socket 的方式高效?)
占用了兩個 tcp 端口(pipe 不會占用端口)
accept 的傳回值未必就是 tcp1 連接配接過來的(多線程或者别的程序在幹預), 是以最好通過發送資料進行确認(這個比較嚴重,在有多個連接配接同時進入的時候确實無法保證目前連接配接時正确的)
由于不是匿名的, 是以可以在 netstat 裡面看到(看到又怎樣?)
優點隻有一個, 可以使用 select 調用。
将該 pipe 實作和上面的 socketpair 的實作進行對比,發現兩者根本就是同一個東東,并且 pipe 的實作沒有 libevent 中 socketpair 實作寫的好。是以 pipe 實作的作者指出的那些缺點,本人持保留意見。看客自己斟酌。
補充:由于上面的 socketpair 是基于 inaddr_loopback 的,是以如果 lo 必須處于 up 狀态才行。