RabbitMQ多消費者模式
-
- 一、依賴包
- 二、消息發送代碼
- 三、消費者代碼
一、依賴包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
二、消息發送代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Send {
// 消息隊列名稱
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("testhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 定義隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 模拟發送批量消息
for (int i = 0; i < 50; i++) {
String message = "Hello World " + i;
// MessageProperties參數,用來配置消息發送後持久化到消息伺服器上。
// 不過這并不能強保證消息一定不丢失,因為消息發送到伺服器,到伺服器儲存消息到磁盤有一個時間視窗的。
// 如果要強保證消息不丢失,需要用到消息發送确認機制,伺服器儲存到磁盤之後才會傳回給消息生産者
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("send message: [" + message + "]");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、消費者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("testhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 如果隊列參數durable設定為FALSE,當rabbitmq伺服器當機或者重新開機後,隊列消失,消息也會丢失。
// 消息生産者和消費者都需要修改為true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback callback = ((consumerTag, message) -> {
System.out.println("recv message:[" + new String(message.getBody(), StandardCharsets.UTF_8));
try {
// 外部參數,多個消費者,可以模拟設定不同的時間,代表消費者的處理能力
Thread.sleep(Long.parseLong(args[0]);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 手動确認
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
});
// 設定伺服器每次隻推送一條消息到消費者
// 如果不設定該參數,rabbitmq伺服器會把隊裡的消息輪訓分到每個消費服務上。
// 設定該參數後,每次隻會分一個消息到消費服務上,待消費服務發送确認後,才會發送下一條。根據消費者不同處理速度,每個消費者消費的消息數量是不同的
channel.basicQos(1);
// autoAck參數設定為false,設定非自動确認,當消息伺服器沒有收到手動确認的消息後,會将消息重新發送的其他的consumer上,保證消息不丢失
channel.basicConsume(QUEUE_NAME, false, callback, consumerTag -> {});
} catch (Exception e) {
}
}
}
因為是多消費者模式,需要啟動多個Recv服務,最好指定不同參數。