天天看點

RabbitMQ實戰-消費端ACK、NACK及重回隊列機制(上)Delivery Identifiers: Delivery TagsPositively Acknowledging DeliveriesAcknowledging Multiple Deliveries at OnceNegative Acknowledgement and Requeuing of DeliveriesRabbitMQ ACK 機制的意義1 ACK和NACK

當連接配接失敗時,消息可能還在用戶端和伺服器之間傳輸 - 它們可能處于兩側的解碼或編碼的中間過程,在 TCP 堆棧緩沖區中,或在電線上飛行。

在這種情況下,傳輸中的資訊将無法正常投遞 - 它們需要被重新投遞。Acknowledgements機制讓伺服器和用戶端知道何時需要重新投遞。

根據定義,使用消息代理(如RabbitMQ)的系統是分布式的。由于發送的協定方法(消息)不能保證到達協作方或由其成功處理,是以釋出者和消費者都需要一個投遞和處理确認的機制。

  • 從消費者到 RabbitMQ 的遞送處理确認,在消息協定中稱為

    acknowledgements

  • broker對publishers的确認是一個協定擴充,稱為

    publisher confirms

這兩個功能都啟發于 TCP。它們對于從publish到MQ節點和從MQ節點到消費者的可靠投遞都至關重要。即對于資料安全至關重要,應用程式對資料安全的責任與MQ節點一樣多。

當 RabbitMQ 向 Con 傳遞消息時,它需要知道何時考慮該消息才能成功發送。什麼樣的邏輯是最佳的取決于系統。是以,它主要是應用決定的。在 AMQP 0-9-1 中,當 Con:

  • 使用

    basicConsume

    方法進行注冊
    RabbitMQ實戰-消費端ACK、NACK及重回隊列機制(上)Delivery Identifiers: Delivery TagsPositively Acknowledging DeliveriesAcknowledging Multiple Deliveries at OnceNegative Acknowledgement and Requeuing of DeliveriesRabbitMQ ACK 機制的意義1 ACK和NACK
  • 或使用

    basicGet

    方法按需擷取消息
RabbitMQ實戰-消費端ACK、NACK及重回隊列機制(上)Delivery Identifiers: Delivery TagsPositively Acknowledging DeliveriesAcknowledging Multiple Deliveries at OnceNegative Acknowledgement and Requeuing of DeliveriesRabbitMQ ACK 機制的意義1 ACK和NACK

就會進行。

Consumer Acknowledgement Modes and Data Safety Considerations

當節點向消費者傳遞消息時,它必須決定該消息是否應由消費者考慮處理(或至少接收)。由于多種内容(用戶端連接配接、消費者應用等)可能會失敗,是以此決定是資料安全問題。消息傳遞協定通常提供一個确認機制,允許消費者确認傳遞到他們連接配接到的節點。是否使用該機制由消費者訂閱時決定。根據使用的确認模式,RabbitMQ 可以考慮在消息發出後立即成功傳遞(寫入 TCP 插座)或收到明确(‘手冊’)客戶确認時。手動發送的确認可能是正面的或負面的,并使用以下協定方法之一:

basic.ack

積極地确認

basic.nack

消極地确認。

basicReject

消極地确認,但還有一個limitation

RabbitMQ實戰-消費端ACK、NACK及重回隊列機制(上)Delivery Identifiers: Delivery TagsPositively Acknowledging DeliveriesAcknowledging Multiple Deliveries at OnceNegative Acknowledgement and Requeuing of DeliveriesRabbitMQ ACK 機制的意義1 ACK和NACK

Delivery Identifiers: Delivery Tags

如何确定投遞(确認表明他們各自的投遞)。當一個 Con(訂閱)被注冊,MQ将使用basic.deliver方法發送(推送)消息。該方法帶有delivery tag,該tag可唯一辨別channel上的投遞。是以,Delivery tags作用域在每個 channel 内。交

Delivery Tags是單調增長的正整數,并由客戶庫提供。用戶端庫方法,承認傳遞以傳遞标簽作為參數。由于每個通道的遞送标簽範圍很廣,是以必須在接收的同一通道上确認傳遞。在不同的通道上确認将導緻’未知交貨标簽’協定異常并關閉通道。

Positively Acknowledging Deliveries

用于傳遞确認的 API 方法通常暴露為客戶庫中通道上的操作。Java 用戶端使用者将使用channel:

// 假設已有channel執行個體
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge a single delivery, the message will
             // be discarded
             channel.basicAck(deliveryTag, false);
         }
     });
      

Acknowledging Multiple Deliveries at Once

Manual确認模式可批量進行,以減少網絡流量。這是通過将acknowledgement方法的multiple字段設定為true來實作的。

basicReject在曆史上都沒有該字段,這就是為什麼basicNack被MQ引入,作為協定的擴充。

當multiple=true,MQ 将确認所有未完成的delivery tag,并包括确認中指定的tag。

與确認相關的其他所有内容一樣,這個作用域是channel内。比如,假定channel Ch 上有未确認的delivery tag 5、6、7 和 8,當一個delivery tag=8、multiple=true的acknowledgement frame到達該channel時,則從 5 到 8 的所有投遞都将被确認。

若multiple=false,則仍不确認投遞 5、6 和 7。

要确認與MQ Java用戶端的多次投遞,将Channel#basicAck的multiple參數設定為 true。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge all deliveries up to
             // this delivery tag
             channel.basicAck(deliveryTag, true);
         }
     });
      

Negative Acknowledgement and Requeuing of Deliveries

有時,消費者無法及時處理投遞,但其他執行個體可能能夠處理。這時可能更想讓它重新入隊,讓其他Con接收和處理它。

basicReject和basicNack就是用于實作這種想法的兩個協定方法。這些方法通常用于消極地确認投遞。

此類投遞可被Broker丢棄或重新入隊。此行為由requeue字段控制:

  • 當字段設定為true,Broker将用指定的delivery tag重新入隊投遞(或多個投遞)。

這兩個方法通常暴露作為用戶端庫中channel上的操作。Java 用戶端使用者可以調用:

  • Channel#basicReject
  • Channel#basicNack
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // negatively acknowledge, the message will
             // be discarded
             channel.basicReject(deliveryTag, false);
         }
     });
      

RabbitMQ ACK 機制的意義

ACK機制可以保證Con拉取到了消息,若處理失敗了,則隊列中還有這個消息,仍然可以給Con處理。

ack機制是 Con 告訴 Broker 目前消息是否成功消費,至于 Broker 如何處理 NACK,取決于 Con 是否設定了 requeue:如果 requeue=false, 則NACK 後 Broker 還是會删除消息的。

但一般處理消息失敗都是因為代碼邏輯出bug,即使隊列中後來仍然保留該消息,然後再給Con消費,依舊報錯。

當然,若一台機器當機,消息還有,還可以給另外機器消費,這種情景下 ACK 很有用。

如果不使用 ACK 機制,直接把出錯消息存庫,便于日後查bug或重新執行。 參考 Quartz 定時任務排程,Quartz可以讓失敗的任務重新執行一次,或者不管,或者怎麼怎麼樣,但是 RabbitMQ 好像缺了這一點。

1 ACK和NACK

當設定

autoACK=false

 時,就可以使用手工ACK。

其實手工方式包括了手工ACK、手工NACK。

  • 手工 ACK 時,會發送給Broker一個應答,代表消息處理成功,Broker就可回送響應給Pro
  • NACK 則表示消息處理失敗,如果設定了重回隊列,Broker端就會将沒有成功處理的消息重新發送

使用方式

Con消費時,若由于業務異常,可手工 NACK 記錄日志,然後進行補償

void basicNack(long deliveryTag, 
               boolean multiple,
               boolean requeue)      

如果由于伺服器當機等嚴重問題,就需要手工 ACK 保障Con消費成功

void basicAck(long deliveryTag, boolean multiple)