1.消息應答模式(手動、自動)
1.1應答模式
為了確定消息不會丢失,RabbitMQ支援消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ就可以删除它了。
如果一個消費者挂掉卻沒有發送應答,RabbitMQ會了解為這個消息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以确認即使消費者偶爾挂掉也不會丢失任何消息了。 沒有任何消息逾時限制;隻有當消費者挂掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
消息應答是預設打開的。我們通過顯示的設定autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從記憶體删除。如果消費者因當機或連結失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ裡,消息沒有過期的概念),則RabbitMQ會将消息重新發送給其他監聽在隊列的下一個消費者
.1.2自動應答
// 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
channel.basicConsume(QUEUE_EMAIL, true, defaultConsumer);
生産者
public class Producer {
private static String EXCHANGE = "sms_email";
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立連接配接
Connection connection = ConnectionUtils.newConnection();
// 2.建立通道
Channel channel = connection.createChannel();
//3.綁定的交換機 參數1互動機名稱 參數2 exchange類型
channel.exchangeDeclare(EXCHANGE, "fanout");
String msg = "測試生産消息" + new Date();
try {
// channel.txSelect();
// 4.生産消息
channel.basicPublish(EXCHANGE, "", null, msg.getBytes());
// int i = 1 / 0;
System.out.println("生産者生産消息:" + msg);
// channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
// channel.txRollback();
} finally {
channel.close();
connection.close();
}
}
}
短信消費者
public class SmsConsumer {
private static String QUEUE_SMS = "sms";
private static String EXCHANGE = "sms_email";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
System.out.println("短信消費者");
//1.建立連接配接
Connection connection = ConnectionUtils.newConnection();
// 2.建立通道
final Channel channel = connection.createChannel();
// 3.聲明隊列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:這沒什麼好說的,隊列名
*
* durable:是否持久化,那麼問題來了,這是什麼意思?持久化,指的是隊列持久化到資料庫中。在之前的博文中也說過,如果RabbitMQ服務挂了怎麼辦,隊列丢失了自然是不希望發生的。持久化設定為true的話,即使服務崩潰也不會丢失隊列
* exclusive:是否排外,what? 這又是什麼呢。設定了排外為true的隊列隻可以在本次的連接配接中被通路,也就是說在目前連接配接建立多少個channel通路都沒有關系,但是如果是一個新的連接配接來通路,對不起,不可以,下面是我嘗試通路了一個排外的queue報的錯。還有一個需要說一下的是,排外的queue在目前連接配接被斷開的時候會自動消失(清除)無論是否設定了持久化
* autoDelete:這個就很簡單了,是否自動删除。也就是說queue會清理自己。但是是在最後一個connection斷開的時候
* 設定隊列的其他一些參數
*/
channel.queueDeclare(QUEUE_SMS, false, false, false, null);
/**
* String queue, String exchange, String routingKey
*/
channel.queueBind(QUEUE_SMS, EXCHANGE, "message");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("短信消費者開始擷取消息.........." );
int i = 1 / 0;
// 手動應答消息
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("短信消費者結束擷取消息:" + msgString);
}
};
// 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
channel.basicConsume(QUEUE_SMS, false, defaultConsumer);
}
}
郵件消費者
public class SmsConsumer {
private static String QUEUE_SMS = "sms";
private static String EXCHANGE = "sms_email";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
System.out.println("短信消費者");
//1.建立連接配接
Connection connection = ConnectionUtils.newConnection();
// 2.建立通道
final Channel channel = connection.createChannel();
// 3.聲明隊列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:這沒什麼好說的,隊列名
*
* durable:是否持久化,那麼問題來了,這是什麼意思?持久化,指的是隊列持久化到資料庫中。在之前的博文中也說過,如果RabbitMQ服務挂了怎麼辦,隊列丢失了自然是不希望發生的。持久化設定為true的話,即使服務崩潰也不會丢失隊列
* exclusive:是否排外,what? 這又是什麼呢。設定了排外為true的隊列隻可以在本次的連接配接中被通路,也就是說在目前連接配接建立多少個channel通路都沒有關系,但是如果是一個新的連接配接來通路,對不起,不可以,下面是我嘗試通路了一個排外的queue報的錯。還有一個需要說一下的是,排外的queue在目前連接配接被斷開的時候會自動消失(清除)無論是否設定了持久化
* autoDelete:這個就很簡單了,是否自動删除。也就是說queue會清理自己。但是是在最後一個connection斷開的時候
* 設定隊列的其他一些參數
*/
channel.queueDeclare(QUEUE_SMS, false, false, false, null);
/**
* String queue, String exchange, String routingKey
*/
channel.queueBind(QUEUE_SMS, EXCHANGE, "message");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("短信消費者開始擷取消息.........." );
int i = 1 / 0;
// 手動應答消息
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("短信消費者結束擷取消息:" + msgString);
}
};
// 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
channel.basicConsume(QUEUE_SMS, false, defaultConsumer);
}
}
異常狀态:
int i = 1 / 0,我們在郵件和短信消費者代碼裡面加入,在手動或者自動應答之前抛出異常,就會導緻消費都沒被消費,而如果在應答之後抛出異常,隻能用事務來處理,使用的消息隊列是sms和email.
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLz0keOd3Z61UNNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1kDNzUjMwEDM3ATNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
看到消息都是出于等待被消費的狀态
正常
注釋掉int i = 1/0,
手動應答(隊列sms):通過debug可以看到,執行完1,消費還沒被确認
直到2執行完成後,才被确認
自動應答(隊列email):執行完成後就被确認了
1.消息事務
事務的實作主要是對信道(Channel)的設定,主要的方法有三個:
channel.txSelect()聲明啟動事務模式;
channel.txComment()送出事務;
channel.txRollback()復原事務;
生産者復原:消費隊列中看不到任何消息
channel.txSelect();
// 4.生産消息
channel.basicPublish(EXCHANGE, "", null, msg.getBytes());
int i = 1 / 0;
channel.txCommit();
System.out.println("生産者生産消息:" + msg);
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
消費者復原:消息沒有被消費,進行了復原
try {
channel.txSelect();
System.out.println("短信消費者開始擷取消息.........." );
// 手動應答消息
channel.basicAck(envelope.getDeliveryTag(), false);
int i = 1 / 0;
channel.txCommit();
System.out.println("短信消費者結束擷取消息:" + msgString);
} catch (IOException e) {
channel.txRollback();
}
3.公平轉發
目前消息轉發機制是平均配置設定,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,隻有在消費者空閑的時候會發送下一條資訊。排程分發消息的方式,也就是告訴RabbitMQ每次隻給消費者處理一條消息,也就是等待消費者處理完畢并自己對剛剛處理的消息進行确認之後,才發送下一條消息,防止消費者太過于忙碌,也防止它太過去清閑。
**通過 設定channel.basicQos(1);** 我們生産了10條消息, 01消費者,休眠時間比較短,消費了9條消息
消費者02,休眠時間較長,最後隻消費了1條資訊
總結:也就是通過參數設定,我們可以選擇讓空閑的消費者多去消費消息,而忙碌的消費者,則等空閑再去消費消息