當連接配接失敗時,消息可能還在用戶端和伺服器之間傳輸 - 它們可能處于兩側的解碼或編碼的中間過程,在 TCP 堆棧緩沖區中,或在電線上飛行。
在這種情況下,傳輸中的資訊将無法正常投遞 - 它們需要被重新投遞。Acknowledgements機制讓伺服器和用戶端知道何時需要重新投遞。
根據定義,使用消息代理(如RabbitMQ)的系統是分布式的。由于發送的協定方法(消息)不能保證到達協作方或由其成功處理,是以釋出者和消費者都需要一個投遞和處理确認的機制。
- 從消費者到 RabbitMQ 的遞送處理确認,在消息協定中稱為
acknowledgements
- broker對publishers的确認是一個協定擴充,稱為
publisher confirms
這兩個功能都啟發于 TCP。它們對于從publish到MQ節點和從MQ節點到消費者的可靠投遞都至關重要。即對于資料安全至關重要,應用程式對資料安全的責任與MQ節點一樣多。
當 RabbitMQ 向 Con 傳遞消息時,它需要知道何時考慮該消息才能成功發送。什麼樣的邏輯是最佳的取決于系統。是以,它主要是應用決定的。在 AMQP 0-9-1 中,當 Con:
- 使用
方法進行注冊basicConsume
- 或使用
方法按需擷取消息basicGet
就會進行。
Consumer Acknowledgement Modes and Data Safety Considerations
當節點向消費者傳遞消息時,它必須決定該消息是否應由消費者考慮處理(或至少接收)。由于多種内容(用戶端連接配接、消費者應用等)可能會失敗,是以此決定是資料安全問題。消息傳遞協定通常提供一個确認機制,允許消費者确認傳遞到他們連接配接到的節點。是否使用該機制由消費者訂閱時決定。根據使用的确認模式,RabbitMQ 可以考慮在消息發出後立即成功傳遞(寫入 TCP 插座)或收到明确(‘手冊’)客戶确認時。手動發送的确認可能是正面的或負面的,并使用以下協定方法之一:
basic.ack
basic.ack
積極地确認
basic.nack
消極地确認。
basicReject
消極地确認,但還有一個limitation
Delivery Identifiers: Delivery Tags
如何确定投遞(确認表明他們各自的投遞)。當一個 Con(訂閱)被注冊,MQ将使用basic.deliver方法發送(推送)消息。該方法帶有delivery tag,該tag可唯一辨別channel上的投遞。是以,Delivery tags作用域在每個 channel 内。交
Delivery Tags是單調增長的正整數,并由客戶庫提供。用戶端庫方法,承認傳遞以傳遞标簽作為參數。由于每個通道的遞送标簽範圍很廣,是以必須在接收的同一通道上确認傳遞。在不同的通道上确認将導緻’未知交貨标簽’協定異常并關閉通道。
Positively Acknowledging Deliveries
用于傳遞确認的 API 方法通常暴露為客戶庫中通道上的操作。Java 用戶端使用者将使用channel:
// 假設已有channel執行個體
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});
Acknowledging Multiple Deliveries at Once
Manual确認模式可批量進行,以減少網絡流量。這是通過将acknowledgement方法的multiple字段設定為true來實作的。
basicReject在曆史上都沒有該字段,這就是為什麼basicNack被MQ引入,作為協定的擴充。
當multiple=true,MQ 将确認所有未完成的delivery tag,并包括确認中指定的tag。
與确認相關的其他所有内容一樣,這個作用域是channel内。比如,假定channel Ch 上有未确認的delivery tag 5、6、7 和 8,當一個delivery tag=8、multiple=true的acknowledgement frame到達該channel時,則從 5 到 8 的所有投遞都将被确認。
若multiple=false,則仍不确認投遞 5、6 和 7。
要确認與MQ Java用戶端的多次投遞,将Channel#basicAck的multiple參數設定為 true。
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});
Negative Acknowledgement and Requeuing of Deliveries
有時,消費者無法及時處理投遞,但其他執行個體可能能夠處理。這時可能更想讓它重新入隊,讓其他Con接收和處理它。
basicReject和basicNack就是用于實作這種想法的兩個協定方法。這些方法通常用于消極地确認投遞。
此類投遞可被Broker丢棄或重新入隊。此行為由requeue字段控制:
- 當字段設定為true,Broker将用指定的delivery tag重新入隊投遞(或多個投遞)。
這兩個方法通常暴露作為用戶端庫中channel上的操作。Java 用戶端使用者可以調用:
- Channel#basicReject
- Channel#basicNack
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});
RabbitMQ ACK 機制的意義
ACK機制可以保證Con拉取到了消息,若處理失敗了,則隊列中還有這個消息,仍然可以給Con處理。
ack機制是 Con 告訴 Broker 目前消息是否成功消費,至于 Broker 如何處理 NACK,取決于 Con 是否設定了 requeue:如果 requeue=false, 則NACK 後 Broker 還是會删除消息的。
但一般處理消息失敗都是因為代碼邏輯出bug,即使隊列中後來仍然保留該消息,然後再給Con消費,依舊報錯。
當然,若一台機器當機,消息還有,還可以給另外機器消費,這種情景下 ACK 很有用。
如果不使用 ACK 機制,直接把出錯消息存庫,便于日後查bug或重新執行。 參考 Quartz 定時任務排程,Quartz可以讓失敗的任務重新執行一次,或者不管,或者怎麼怎麼樣,但是 RabbitMQ 好像缺了這一點。
1 ACK和NACK
當設定
autoACK=false
時,就可以使用手工ACK。
其實手工方式包括了手工ACK、手工NACK。
- 手工 ACK 時,會發送給Broker一個應答,代表消息處理成功,Broker就可回送響應給Pro
- NACK 則表示消息處理失敗,如果設定了重回隊列,Broker端就會将沒有成功處理的消息重新發送
使用方式
Con消費時,若由于業務異常,可手工 NACK 記錄日志,然後進行補償
void basicNack(long deliveryTag,
boolean multiple,
boolean requeue)
如果由于伺服器當機等嚴重問題,就需要手工 ACK 保障Con消費成功
void basicAck(long deliveryTag, boolean multiple)