天天看點

RabbitMQ多消費者模式

RabbitMQ多消費者模式

    • 一、依賴包
    • 二、消息發送代碼
    • 三、消費者代碼

一、依賴包

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.26</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.26</version>
        </dependency>
           

二、消息發送代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Send {
    // 消息隊列名稱
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("testhost");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            // 定義隊列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

			// 模拟發送批量消息
            for (int i = 0; i < 50; i++) {
                String message = "Hello World " + i;
                // MessageProperties參數,用來配置消息發送後持久化到消息伺服器上。
                // 不過這并不能強保證消息一定不丢失,因為消息發送到伺服器,到伺服器儲存消息到磁盤有一個時間視窗的。
                // 如果要強保證消息不丢失,需要用到消息發送确認機制,伺服器儲存到磁盤之後才會傳回給消息生産者
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("send message: [" + message + "]");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           

三、消費者代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class Recv {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost("testhost");
            factory.setUsername("admin");
            factory.setPassword("admin");

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            // 如果隊列參數durable設定為FALSE,當rabbitmq伺服器當機或者重新開機後,隊列消失,消息也會丢失。
            // 消息生産者和消費者都需要修改為true
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            DeliverCallback callback = ((consumerTag, message) -> {
                System.out.println("recv message:[" + new String(message.getBody(), StandardCharsets.UTF_8));

                try {
                	// 外部參數,多個消費者,可以模拟設定不同的時間,代表消費者的處理能力
                    Thread.sleep(Long.parseLong(args[0]);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                	// 手動确認
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

                }
            });
			// 設定伺服器每次隻推送一條消息到消費者
			// 如果不設定該參數,rabbitmq伺服器會把隊裡的消息輪訓分到每個消費服務上。
			// 設定該參數後,每次隻會分一個消息到消費服務上,待消費服務發送确認後,才會發送下一條。根據消費者不同處理速度,每個消費者消費的消息數量是不同的
            channel.basicQos(1);
            // autoAck參數設定為false,設定非自動确認,當消息伺服器沒有收到手動确認的消息後,會将消息重新發送的其他的consumer上,保證消息不丢失
            channel.basicConsume(QUEUE_NAME, false, callback, consumerTag -> {});
        } catch (Exception e) {

        }
    }
}
           

因為是多消費者模式,需要啟動多個Recv服務,最好指定不同參數。