熟悉 rabbitmq 的人肯定知道 no_ack 屬性是在調用 basic.consume 方法時可以設定的一個重要參數。本文主要針對 no_ack 設定的兩種情況,通過抓包分析的形式講解下實際應用中的異同,并總結一下相關的處理經驗。
============ 我是分隔線 =============
no_ack 的用途:確定 message 被 consumer “成功”處理了。這裡“成功”的意思是,(在設定了 no_ack=false 的情況下)隻要 consumer 手動應答了 basic.ack ,就算其“成功”處理了。
情況一:no_ack=true (此時為自動應答)
在這種情況下,consumer 會在接收到 basic.deliver + content-header + content-body 之後,立即回複 ack 。而這個 ack 是 tcp 協定中的 ack 。此 ack 的回複不關心 consumer 是否對接收到的資料進行了處理,當然也不關心處理資料所需要的耗時。
圖1:(producer+consumer)
圖2:(consumer)
圖3:(producer)
情況二:no_ack=false (此時為手動應答)
在這種情況下,要求 consumer 在處理完接收到的 basic.deliver + content-header + content-body 之後才回複 ack 。而這個 ack 是 amqp 協定中的 basic.ack 。此 ack 的回複是和業務處理相關的,是以具體的回複時間應該要取決于業務處理的耗時。
圖4:(producer+consumer)
圖5:(consumer)
圖6:(producer)
總結:
basic.ack 發回給 rabbitmq 以告知,可以将相應 message 從 rabbitmq 的消息緩存中移除。
basic.ack 未被 consumer 發回給 rabbitmq 前出現了異常,rabbitmq 發現與該 consumer 對應的連接配接被斷開,之後将該 message 以輪詢方式發送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。
在 no_ack=true 的情況下,rabbitmq 認為 message 一旦被 deliver 出去了,就已被确認了,是以會立即将緩存中的 message 删除。是以在 consumer 異常時會導緻消息丢失。
來自 consumer 側的 basic.ack 與 發送給 producer 側的 basic.ack 沒有直接關系。
============ 我是分隔線 =============
最後貼上自己改造的、基于 libevent 實作的 rabbitmq-c 的測試列印。
情況一:
<a href="http://my.oschina.net/moooofly/blog/143883#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<code>[warn] evsignal_init: socketpair: no error</code>
<code>drive_machine: [conn_init] --- </code><code>in</code> <code>tcp 3-way connecting!</code>
<code>drive_machine: [conn_connecting] --- connection timeout 1</code><code>time</code> <code>on socket(6040)</code>
<code>drive_machine: [conn_connected] --- connected on socket(6040)</code>
<code>6040: conn_state change connected ==> snd_protocol_header</code>
<code> </code><code>--> send protocol.header!</code>
<code>6040: conn_state change snd_protocol_header ==> rcv_connection_start_method</code>
<code> </code><code><-- recv connection.start method frame!</code>
<code>6040: conn_state change rcv_connection_start_method ==> snd_connection_start_rsp_method</code>
<code> </code><code>--> send connection.start-ok method frame!</code>
<code>6040: conn_state change snd_connection_start_rsp_method ==> rcv_connection_tune_method</code>
<code> </code><code><-- recv connection.tune method frame!</code>
<code>6040: conn_state change rcv_connection_tune_method ==> snd_connection_tune_rsp_method</code>
<code> </code><code>--> send connection.tune-ok method frame!</code>
<code>6040: conn_state change snd_connection_tune_rsp_method ==> snd_connection_open_method</code>
<code> </code><code>--> send connection.open method frame!</code>
<code>6040: conn_state change snd_connection_open_method ==> rcv_connection_open_rsp_method</code>
<code> </code><code><-- recv connection.open-ok method frame!</code>
<code>6040: conn_state change rcv_connection_open_rsp_method ==> snd_channel_open_method</code>
<code> </code><code>--> send channel.open method frame!</code>
<code>6040: conn_state change snd_channel_open_method ==> rcv_channel_open_rsp_method</code>
<code> </code><code><-- recv channel.open-ok method frame!</code>
<code>6040: conn_state change rcv_channel_open_rsp_method ==> idle</code>
<code>drive_machine: [conn_idle] --- [consumer]: queue declaring!</code>
<code>6040: conn_state change idle ==> snd_queue_declare_method</code>
<code> </code><code>--> send queue.declare method frame!</code>
<code>6040: conn_state change snd_queue_declare_method ==> rcv_queue_declare_rsp_method</code>
<code> </code><code><-- recv queue.declare-ok method frame!</code>
<code>6040: conn_state change rcv_queue_declare_rsp_method ==> idle</code>
<code>drive_machine: [conn_idle] --- [consumer]: queue binding!</code>
<code>6040: conn_state change idle ==> snd_queue_bind_method</code>
<code> </code><code>--> send queue.bind method frame!</code>
<code>6040: conn_state change snd_queue_bind_method ==> rcv_queue_bind_rsp_method</code>
<code> </code><code><-- recv queue.bind-ok method frame!</code>
<code>6040: conn_state change rcv_queue_bind_rsp_method ==> idle</code>
<code>drive_machine: [conn_idle] --- [consumer]: basic qos!</code>
<code>6040: conn_state change idle ==> snd_basic_qos_method</code>
<code> </code><code>--> send basic.qos method frame!</code>
<code>6040: conn_state change snd_basic_qos_method ==> rcv_basic_qos_rsp_method</code>
<code> </code><code><-- recv basic.qos-ok method frame!</code>
<code>6040: conn_state change rcv_basic_qos_rsp_method ==> idle</code>
<code>drive_machine: [conn_idle] --- [consumer]: basic consuming!</code>
<code>6040: conn_state change idle ==> snd_basic_consume_method</code>
<code> </code><code>--> send basic.consume method frame!</code>
<code>6040: conn_state change snd_basic_consume_method ==> rcv_basic_consume_rsp_method</code>
<code> </code><code><-- recv basic.consume-ok method frame!</code>
<code>6040: conn_state change rcv_basic_consume_rsp_method ==> idle</code>
<code>drive_machine: [conn_idle] --- [consumer]: start waiting to recv!</code>
<code>6040: conn_state change idle ==> rcv_basic_deliver_method</code>
<code>drive_machine: wait</code><code>for</code> <code>basic.deliver method another 10 seconds!!</code>
<code> </code><code><-- recv basic.deliver method frame!</code>
<code>6040: conn_state change rcv_basic_deliver_method ==> rcv_basic_content_header</code>
<code> </code><code><-- recv content.header frame!</code>
<code>6040: conn_state change rcv_basic_content_header ==> rcv_basic_content_body</code>
<code> </code><code><-- recv content.body frame!</code>
<code>content body is [hello world betty].</code>
<code>@@@ cb: body len : [17] body : [hello world betty]</code>
<code>6040: conn_state change rcv_basic_content_body ==> idle</code>
情況二:
72
73
<code>drive_machine: connected on socket(6040)</code>
<code>6040: conn_state change rcv_basic_content_body ==> snd_basic_ack_method</code>
<code> </code><code>--> send basic.ack method frame!</code>
<code>6040: conn_state change snd_basic_ack_method ==> idle</code>