天天看點

【原創】RabbitMQ 之 no_ack 分析

      熟悉 rabbitmq 的人肯定知道  no_ack 屬性是在調用 basic.consume 方法時可以設定的一個重要參數。本文主要針對 no_ack 設定的兩種情況,通過抓包分析的形式講解下實際應用中的異同,并總結一下相關的處理經驗。

============ 我是分隔線 =============

      no_ack 的用途:確定 message 被 consumer “成功”處理了。這裡“成功”的意思是,(在設定了 no_ack=false 的情況下)隻要 consumer 手動應答了 basic.ack ,就算其“成功”處理了。 

情況一:no_ack=true (此時為自動應答) 

      在這種情況下,consumer 會在接收到 basic.deliver + content-header + content-body 之後,立即回複 ack 。而這個 ack 是 tcp 協定中的 ack 。此 ack 的回複不關心 consumer 是否對接收到的資料進行了處理,當然也不關心處理資料所需要的耗時。 

圖1:(producer+consumer) 

【原創】RabbitMQ 之 no_ack 分析

圖2:(consumer)

【原創】RabbitMQ 之 no_ack 分析

圖3:(producer) 

【原創】RabbitMQ 之 no_ack 分析

情況二:no_ack=false (此時為手動應答) 

      在這種情況下,要求 consumer 在處理完接收到的 basic.deliver + content-header + content-body 之後才回複 ack 。而這個 ack 是 amqp 協定中的 basic.ack 。此 ack 的回複是和業務處理相關的,是以具體的回複時間應該要取決于業務處理的耗時。

圖4:(producer+consumer) 

【原創】RabbitMQ 之 no_ack 分析

圖5:(consumer) 

【原創】RabbitMQ 之 no_ack 分析

圖6:(producer) 

【原創】RabbitMQ 之 no_ack 分析

總結: 

basic.ack 發回給 rabbitmq 以告知,可以将相應 message 從 rabbitmq 的消息緩存中移除。

basic.ack 未被 consumer 發回給 rabbitmq 前出現了異常,rabbitmq 發現與該 consumer 對應的連接配接被斷開,之後将該 message 以輪詢方式發送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。

在 no_ack=true 的情況下,rabbitmq 認為 message 一旦被 deliver 出去了,就已被确認了,是以會立即将緩存中的 message 删除。是以在 consumer 異常時會導緻消息丢失。

來自 consumer 側的 basic.ack 與 發送給 producer 側的 basic.ack 沒有直接關系。

============ 我是分隔線 ============= 

最後貼上自己改造的、基于 libevent 實作的 rabbitmq-c 的測試列印。 

情況一: 

<a href="http://my.oschina.net/moooofly/blog/143883#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

<code>[warn] evsignal_init: socketpair: no error</code>

<code>drive_machine: [conn_init]  --- </code><code>in</code> <code>tcp 3-way connecting!</code>

<code>drive_machine: [conn_connecting]  ---  connection timeout 1</code><code>time</code> <code>on socket(6040)</code>

<code>drive_machine: [conn_connected]  ---  connected on socket(6040)</code>

<code>6040: conn_state change   connected ==&gt; snd_protocol_header</code>

<code>  </code><code>--&gt; send protocol.header!</code>

<code>6040: conn_state change   snd_protocol_header ==&gt; rcv_connection_start_method</code>

<code>  </code><code>&lt;-- recv connection.start method frame!</code>

<code>6040: conn_state change   rcv_connection_start_method ==&gt; snd_connection_start_rsp_method</code>

<code>  </code><code>--&gt; send connection.start-ok method frame!</code>

<code>6040: conn_state change   snd_connection_start_rsp_method ==&gt; rcv_connection_tune_method</code>

<code>  </code><code>&lt;-- recv connection.tune method frame!</code>

<code>6040: conn_state change   rcv_connection_tune_method ==&gt; snd_connection_tune_rsp_method</code>

<code>  </code><code>--&gt; send connection.tune-ok method frame!</code>

<code>6040: conn_state change   snd_connection_tune_rsp_method ==&gt; snd_connection_open_method</code>

<code>  </code><code>--&gt; send connection.open method frame!</code>

<code>6040: conn_state change   snd_connection_open_method ==&gt; rcv_connection_open_rsp_method</code>

<code>  </code><code>&lt;-- recv connection.open-ok method frame!</code>

<code>6040: conn_state change   rcv_connection_open_rsp_method ==&gt; snd_channel_open_method</code>

<code>  </code><code>--&gt; send channel.open method frame!</code>

<code>6040: conn_state change   snd_channel_open_method ==&gt; rcv_channel_open_rsp_method</code>

<code>  </code><code>&lt;-- recv channel.open-ok method frame!</code>

<code>6040: conn_state change   rcv_channel_open_rsp_method ==&gt; idle</code>

<code>drive_machine: [conn_idle]  ---  [consumer]: queue declaring!</code>

<code>6040: conn_state change   idle ==&gt; snd_queue_declare_method</code>

<code>  </code><code>--&gt; send queue.declare method frame!</code>

<code>6040: conn_state change   snd_queue_declare_method ==&gt; rcv_queue_declare_rsp_method</code>

<code>  </code><code>&lt;-- recv queue.declare-ok method frame!</code>

<code>6040: conn_state change   rcv_queue_declare_rsp_method ==&gt; idle</code>

<code>drive_machine: [conn_idle]  ---  [consumer]: queue binding!</code>

<code>6040: conn_state change   idle ==&gt; snd_queue_bind_method</code>

<code>  </code><code>--&gt; send queue.bind method frame!</code>

<code>6040: conn_state change   snd_queue_bind_method ==&gt; rcv_queue_bind_rsp_method</code>

<code>  </code><code>&lt;-- recv queue.bind-ok method frame!</code>

<code>6040: conn_state change   rcv_queue_bind_rsp_method ==&gt; idle</code>

<code>drive_machine: [conn_idle]  ---  [consumer]: basic qos!</code>

<code>6040: conn_state change   idle ==&gt; snd_basic_qos_method</code>

<code>  </code><code>--&gt; send basic.qos method frame!</code>

<code>6040: conn_state change   snd_basic_qos_method ==&gt; rcv_basic_qos_rsp_method</code>

<code>  </code><code>&lt;-- recv basic.qos-ok method frame!</code>

<code>6040: conn_state change   rcv_basic_qos_rsp_method ==&gt; idle</code>

<code>drive_machine: [conn_idle]  ---  [consumer]: basic consuming!</code>

<code>6040: conn_state change   idle ==&gt; snd_basic_consume_method</code>

<code>  </code><code>--&gt; send basic.consume method frame!</code>

<code>6040: conn_state change   snd_basic_consume_method ==&gt; rcv_basic_consume_rsp_method</code>

<code>  </code><code>&lt;-- recv basic.consume-ok method frame!</code>

<code>6040: conn_state change   rcv_basic_consume_rsp_method ==&gt; idle</code>

<code>drive_machine: [conn_idle]  ---  [consumer]: start waiting to recv!</code>

<code>6040: conn_state change   idle ==&gt; rcv_basic_deliver_method</code>

<code>drive_machine: wait</code><code>for</code> <code>basic.deliver method another 10 seconds!!</code>

<code>  </code><code>&lt;-- recv basic.deliver method frame!</code>

<code>6040: conn_state change   rcv_basic_deliver_method ==&gt; rcv_basic_content_header</code>

<code>  </code><code>&lt;-- recv content.header frame!</code>

<code>6040: conn_state change   rcv_basic_content_header ==&gt; rcv_basic_content_body</code>

<code>  </code><code>&lt;-- recv content.body frame!</code>

<code>content body is [hello world betty].</code>

<code>@@@ cb: body len : [17]    body : [hello world betty]</code>

<code>6040: conn_state change   rcv_basic_content_body ==&gt; idle</code>

情況二: 

72

73

<code>drive_machine: connected on socket(6040)</code>

<code>6040: conn_state change   rcv_basic_content_body ==&gt; snd_basic_ack_method</code>

<code>  </code><code>--&gt; send basic.ack method frame!</code>

<code>6040: conn_state change   snd_basic_ack_method ==&gt; idle</code>

繼續閱讀