天天看點

【java】RabbitMQ的公平轉發

目前消息轉發機制是平均配置設定,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。    

為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。

換句話說,隻有在消費者空閑的時候會發送下一條資訊。排程分發消息的方式,也就是告訴RabbitMQ每次隻給消費者一條消息,也就是等待消費者處理完畢并自己對剛剛處理的消息進行确認之後,才發送下一條消息,防止消費者太過于忙碌,也防止它太過去清閑。

 通過設定

channel.basicQos(1);           

示例如下:

這裡封裝的Connection連接配接類的代碼就不顯示出來,不會的去看【java】RabbitMQ簡單隊列。

生産者代碼如下:

package com.rabbitmqdemo.Producer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmqdemo.Utils.MQConnectionUtils;

public class Producer {
    // 隊列名稱
    private static final String QUEUE_NAME = "study01";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立一個新的連接配接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.建立通道
        Channel channel = connection.createChannel();
        // 3.建立一個隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);// 保證一次隻分發一次 限制發送給同一個消費者 不得超過一條消息
        for (int i = 1; i <= 50; i++) {
            // 4.建立msg
            String msg = "部落客長得非常帥,比金城武還帥!頂+"+i;
            // 5.生産者發送消息者
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }


        // 關閉通道和連接配接
        channel.close();
        connection.close();
    }
}
           

消費者1代碼如下:

每次消費完一條消息以後,設定休眠時間為1000

package com.rabbitmqdemo.consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import javax.swing.DefaultBoundedRangeModel;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmqdemo.Utils.MQConnectionUtils;

public class Consumer {
    // 隊列名稱
    private static final String QUEUE_NAME = "study01";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("消費者啟動....01");
        // 1.建立一個新的連接配接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.建立通道
        final Channel channel = connection.createChannel();
        // 3.消費者關聯隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            // 監聽擷取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者擷取生産者消息:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {

                } finally {
                    // 手動回執消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }

        };
        // 4.設定應答模式 如果為true情況下 表示為自動應答模式 false 表示為手動應答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        // 關閉通道和連接配接
//        channel.close();
//        connection.close();
    }
}
           

消費者2代碼如下:

每次消費完一條消息以後,設定休眠時間為500

package com.rabbitmqdemo.consumer;

import com.rabbitmq.client.*;
import com.rabbitmqdemo.Utils.MQConnectionUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerSecond {
    // 隊列名稱
    private static final String QUEUE_NAME = "study01";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("消費者啟動....02");
        // 1.建立一個新的連接配接
        Connection connection = MQConnectionUtils.newConnection();
        // 2.建立通道
        final Channel channel = connection.createChannel();
        // 3.消費者關聯隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            // 監聽擷取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消費者擷取生産者消息:" + msg);
                try {
                    Thread.sleep(500);
                } catch (Exception e) {

                } finally {
                    // 手動回執消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }

        };
        // 4.設定應答模式 如果為true情況下 表示為自動應答模式 false 表示為手動應答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        // 關閉通道和連接配接
//        channel.close();
//        connection.close();
    }
}
           

運作結果如下:

消費者1的消費數量為:

【java】RabbitMQ的公平轉發

消費者2的消費數量為:

【java】RabbitMQ的公平轉發

根據消費結果可以看出,消費者2比消費者1要消費的多。