目前消息轉發機制是平均配置設定,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。
換句話說,隻有在消費者空閑的時候會發送下一條資訊。排程分發消息的方式,也就是告訴RabbitMQ每次隻給消費者一條消息,也就是等待消費者處理完畢并自己對剛剛處理的消息進行确認之後,才發送下一條消息,防止消費者太過于忙碌,也防止它太過去清閑。
通過設定
channel.basicQos(1);
示例如下:
這裡封裝的Connection連接配接類的代碼就不顯示出來,不會的去看【java】RabbitMQ簡單隊列。
生産者代碼如下:
package com.rabbitmqdemo.Producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
public class Producer {
// 隊列名稱
private static final String QUEUE_NAME = "study01";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立一個新的連接配接
Connection connection = MQConnectionUtils.newConnection();
// 2.建立通道
Channel channel = connection.createChannel();
// 3.建立一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保證一次隻分發一次 限制發送給同一個消費者 不得超過一條消息
for (int i = 1; i <= 50; i++) {
// 4.建立msg
String msg = "部落客長得非常帥,比金城武還帥!頂+"+i;
// 5.生産者發送消息者
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
// 關閉通道和連接配接
channel.close();
connection.close();
}
}
消費者1代碼如下:
每次消費完一條消息以後,設定休眠時間為1000
package com.rabbitmqdemo.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import javax.swing.DefaultBoundedRangeModel;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
public class Consumer {
// 隊列名稱
private static final String QUEUE_NAME = "study01";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消費者啟動....01");
// 1.建立一個新的連接配接
Connection connection = MQConnectionUtils.newConnection();
// 2.建立通道
final Channel channel = connection.createChannel();
// 3.消費者關聯隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 監聽擷取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者擷取生産者消息:" + msg);
try {
Thread.sleep(1000);
} catch (Exception e) {
} finally {
// 手動回執消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 4.設定應答模式 如果為true情況下 表示為自動應答模式 false 表示為手動應答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
// 關閉通道和連接配接
// channel.close();
// connection.close();
}
}
消費者2代碼如下:
每次消費完一條消息以後,設定休眠時間為500
package com.rabbitmqdemo.consumer;
import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerSecond {
// 隊列名稱
private static final String QUEUE_NAME = "study01";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消費者啟動....02");
// 1.建立一個新的連接配接
Connection connection = MQConnectionUtils.newConnection();
// 2.建立通道
final Channel channel = connection.createChannel();
// 3.消費者關聯隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 監聽擷取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者擷取生産者消息:" + msg);
try {
Thread.sleep(500);
} catch (Exception e) {
} finally {
// 手動回執消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 4.設定應答模式 如果為true情況下 表示為自動應答模式 false 表示為手動應答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
// 關閉通道和連接配接
// channel.close();
// connection.close();
}
}
運作結果如下:
消費者1的消費數量為:
消費者2的消費數量為:
根據消費結果可以看出,消費者2比消費者1要消費的多。