1 概念
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅隻完成了部分突然它挂掉了,會發生什麼情況。 RabbitMQ 一旦向消費者傳遞了一條消息,便立即将該消息标記為删除。在這種情況下,突然有個消費者挂掉了,我們将丢失正在處理的消息。以及後續發送給該消費這的消息,因為它無法接收到。
為了保證消息在發送過程中不丢失, rabbitmq 引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之後,告訴 rabbitmq 它已經處理了, rabbitmq 可以把該消息删除了。
2 自動應答
消息發送後立即被認為已經傳送成功,這種模式需要在高吞吐量和資料傳輸安全性方面做權衡,因為這種模式如果消息在接收到之前,消費者那邊出現連接配接或者 channel 關閉,那麼消息就丢失了,當然另一方面這種模式消費者那邊可以傳遞過載的消息, 沒有對傳遞的消息數量進行限制,當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導緻這些消息的積壓,最終使得記憶體耗盡,最終這些消費者線程被作業系統殺死, 是以這種模式僅适用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用。
3 消息應答的方法
-
Channel.basicAck(用于肯定确認)
RabbitMQ 已知道該消息并且成功的處理消息,可以将其丢棄了
- Channel.basicNack(用于否定确認)
-
Channel.basicReject(用于否定确認)
與 Channel.basicNack 相比少一個參數,不處理該消息了直接拒絕,可以将其丢棄了
4 Multiple 的解釋
手動應答的好處是可以批量應答并且減少網絡擁堵
channel.basicAck(deliveryTag,true)
multiple 的 true 和 false 代表不同意思
- true 代表批量應答 channel 上未應答的消息
比如說 channel 上有傳送 tag 的消息 5,6,7,8 目前 tag 是 8 那麼此時 5-8 的這些還未應答的消息都會被确認收到消息應答
- false 同上面相比
隻會應答 tag=8 的消息 5,6,7 這三個消息依然不會被确認收到消息應答
5 消息自動重新入隊
如果消費者由于某些原因失去連接配接(其通道已關閉,連接配接已關閉或 TCP 連接配接丢失), 導緻消息未發送 ACK 确認, RabbitMQ 将了解到消息未完全處理,并将對其重新排隊。如果此時其他消費者可以處理,它将很快将其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確定不會丢失任何消息。
6 消息手動應答代碼
預設消息采用的是自動應答,是以我們要想實作消息消費過程中不丢失,需要把自動應答改為手動應答,消費者在上面代碼的基礎上增加下面畫紅色部分代碼。
消息生産者
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("請輸入資訊");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生産者發出消息" + message);
}
}
}
}
消費者 01:
public class Work03 {
private static final String ACK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息處理時間較短");
//消息消費的時候如何處理消息
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
/**
* 1.消息标記 tag
* 2.是否批量應答未應答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//采用手動應答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
});
}
}
消費者 02:
public class Work04 {
private static final String ACK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2 等待接收消息處理時間較長");
//消息消費的時候如何處理消息
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:"+message);
/**
* 1.消息标記 tag
* 2.是否批量應答未應答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//采用手動應答
boolean autoAck=false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消費者取消消費接口回調邏輯");
});
}
}
睡眠工具類:
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
7 手動應答效果示範
正常情況下消息發送方發送兩個消息 C1 和 C2 分别接收到消息并進行處理