天天看點

RabbitMQ消息應答模式、事務、公平轉發

1.消息應答模式(手動、自動)

1.1應答模式

        為了確定消息不會丢失,RabbitMQ支援消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ就可以删除它了。

        如果一個消費者挂掉卻沒有發送應答,RabbitMQ會了解為這個消息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以确認即使消費者偶爾挂掉也不會丢失任何消息了。 沒有任何消息逾時限制;隻有當消費者挂掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。

        消息應答是預設打開的。我們通過顯示的設定autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從記憶體删除。如果消費者因當機或連結失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ裡,消息沒有過期的概念),則RabbitMQ會将消息重新發送給其他監聽在隊列的下一個消費者

.1.2自動應答

// 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
 channel.basicConsume(QUEUE_EMAIL, true, defaultConsumer);
           

生産者

public class Producer {

    private static String EXCHANGE = "sms_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立連接配接
        Connection connection = ConnectionUtils.newConnection();
        // 2.建立通道
        Channel channel = connection.createChannel();
        //3.綁定的交換機 參數1互動機名稱 參數2 exchange類型
        channel.exchangeDeclare(EXCHANGE, "fanout");
        String msg = "測試生産消息" + new Date();
        try {
//            channel.txSelect();
            // 4.生産消息
            channel.basicPublish(EXCHANGE, "", null, msg.getBytes());
//            int i = 1 / 0;
            System.out.println("生産者生産消息:" + msg);
//            channel.txCommit();
        } catch (IOException e) {
            e.printStackTrace();
//            channel.txRollback();
        } finally {
            channel.close();
            connection.close();
        }
    }
}
           

短信消費者

public class SmsConsumer {
    private static String QUEUE_SMS = "sms";

    private static String EXCHANGE = "sms_email";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        System.out.println("短信消費者");
        //1.建立連接配接
        Connection connection = ConnectionUtils.newConnection();
        // 2.建立通道
        final Channel channel = connection.createChannel();
        // 3.聲明隊列
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * queue:這沒什麼好說的,隊列名
         *
         * durable:是否持久化,那麼問題來了,這是什麼意思?持久化,指的是隊列持久化到資料庫中。在之前的博文中也說過,如果RabbitMQ服務挂了怎麼辦,隊列丢失了自然是不希望發生的。持久化設定為true的話,即使服務崩潰也不會丢失隊列
         * exclusive:是否排外,what? 這又是什麼呢。設定了排外為true的隊列隻可以在本次的連接配接中被通路,也就是說在目前連接配接建立多少個channel通路都沒有關系,但是如果是一個新的連接配接來通路,對不起,不可以,下面是我嘗試通路了一個排外的queue報的錯。還有一個需要說一下的是,排外的queue在目前連接配接被斷開的時候會自動消失(清除)無論是否設定了持久化
         * autoDelete:這個就很簡單了,是否自動删除。也就是說queue會清理自己。但是是在最後一個connection斷開的時候
         * 設定隊列的其他一些參數
         */
        channel.queueDeclare(QUEUE_SMS, false, false, false, null);
        /**
         * String queue, String exchange, String routingKey
         */
        channel.queueBind(QUEUE_SMS, EXCHANGE, "message");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("短信消費者開始擷取消息.........." );
                int i = 1 / 0;
                // 手動應答消息
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println("短信消費者結束擷取消息:" + msgString);

            }
        };
        // 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
        channel.basicConsume(QUEUE_SMS, false, defaultConsumer);
    }
}
           

郵件消費者

public class SmsConsumer {
    private static String QUEUE_SMS = "sms";

    private static String EXCHANGE = "sms_email";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        System.out.println("短信消費者");
        //1.建立連接配接
        Connection connection = ConnectionUtils.newConnection();
        // 2.建立通道
        final Channel channel = connection.createChannel();
        // 3.聲明隊列
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * queue:這沒什麼好說的,隊列名
         *
         * durable:是否持久化,那麼問題來了,這是什麼意思?持久化,指的是隊列持久化到資料庫中。在之前的博文中也說過,如果RabbitMQ服務挂了怎麼辦,隊列丢失了自然是不希望發生的。持久化設定為true的話,即使服務崩潰也不會丢失隊列
         * exclusive:是否排外,what? 這又是什麼呢。設定了排外為true的隊列隻可以在本次的連接配接中被通路,也就是說在目前連接配接建立多少個channel通路都沒有關系,但是如果是一個新的連接配接來通路,對不起,不可以,下面是我嘗試通路了一個排外的queue報的錯。還有一個需要說一下的是,排外的queue在目前連接配接被斷開的時候會自動消失(清除)無論是否設定了持久化
         * autoDelete:這個就很簡單了,是否自動删除。也就是說queue會清理自己。但是是在最後一個connection斷開的時候
         * 設定隊列的其他一些參數
         */
        channel.queueDeclare(QUEUE_SMS, false, false, false, null);
        /**
         * String queue, String exchange, String routingKey
         */
        channel.queueBind(QUEUE_SMS, EXCHANGE, "message");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("短信消費者開始擷取消息.........." );
                int i = 1 / 0;
                // 手動應答消息
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println("短信消費者結束擷取消息:" + msgString);

            }
        };
        // 4.設定應答模式,true的時候為自動應答,false為手動應答,需要處理
        channel.basicConsume(QUEUE_SMS, false, defaultConsumer);
    }
}
           

異常狀态:

int i = 1 / 0,我們在郵件和短信消費者代碼裡面加入,在手動或者自動應答之前抛出異常,就會導緻消費都沒被消費,而如果在應答之後抛出異常,隻能用事務來處理,使用的消息隊列是sms和email.

RabbitMQ消息應答模式、事務、公平轉發

看到消息都是出于等待被消費的狀态

正常

注釋掉int i = 1/0,

手動應答(隊列sms):通過debug可以看到,執行完1,消費還沒被确認

RabbitMQ消息應答模式、事務、公平轉發

直到2執行完成後,才被确認

RabbitMQ消息應答模式、事務、公平轉發
RabbitMQ消息應答模式、事務、公平轉發

自動應答(隊列email):執行完成後就被确認了

RabbitMQ消息應答模式、事務、公平轉發

1.消息事務

事務的實作主要是對信道(Channel)的設定,主要的方法有三個:

channel.txSelect()聲明啟動事務模式;
channel.txComment()送出事務;
channel.txRollback()復原事務;
           

生産者復原:消費隊列中看不到任何消息

channel.txSelect();
        // 4.生産消息
        channel.basicPublish(EXCHANGE, "", null, msg.getBytes());
        int i = 1 / 0;
        channel.txCommit();
        System.out.println("生産者生産消息:" + msg);
    } catch (IOException e) {
        e.printStackTrace();
        channel.txRollback();
        }
           

消費者復原:消息沒有被消費,進行了復原

try {
        channel.txSelect();
        System.out.println("短信消費者開始擷取消息.........." );
        // 手動應答消息
        channel.basicAck(envelope.getDeliveryTag(), false);
        int i = 1 / 0;
        channel.txCommit();
        System.out.println("短信消費者結束擷取消息:" + msgString);
    } catch (IOException e) {
        channel.txRollback();
    }
           
RabbitMQ消息應答模式、事務、公平轉發

3.公平轉發

目前消息轉發機制是平均配置設定,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。并不在乎有多少任務消費者并未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,隻有在消費者空閑的時候會發送下一條資訊。排程分發消息的方式,也就是告訴RabbitMQ每次隻給消費者處理一條消息,也就是等待消費者處理完畢并自己對剛剛處理的消息進行确認之後,才發送下一條消息,防止消費者太過于忙碌,也防止它太過去清閑。

**通過 設定channel.basicQos(1);** 我們生産了10條消息, 01消費者,休眠時間比較短,消費了9條消息

RabbitMQ消息應答模式、事務、公平轉發

消費者02,休眠時間較長,最後隻消費了1條資訊

RabbitMQ消息應答模式、事務、公平轉發

總結:也就是通過參數設定,我們可以選擇讓空閑的消費者多去消費消息,而忙碌的消費者,則等空閑再去消費消息