天天看點

RabbitMQ持久化機制、記憶體磁盤控制(四)

一、持久化

如果看到這一篇文章的朋友,都是有經驗的開發人員,對持久化的概念就不用再做過多的解析了,經過前面的幾篇文章,其實不難發現RabbitMQ 的持久化其實就隻分交換器持久化、隊列持久化和消息持久化這三個部分;

  • 定義持久化交換器,通過第三個參數 durable 開啟/關閉持久化
channel.exchangeDeclare(exchangeName, exchangeType, durable)      
  • 定義持久化隊列,通過第二個參數 durable 開啟/關閉持久化
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);      
  • 發送持久化消息,需要在消息屬性中設定 deliveryMode=2 , 此屬性在 BasicProperties 中,通過 basicPublish 方法的 props 參數傳入。
channel.basicPublish(exchange, routingKey, props, body);      

BasicProperties 對象可以從RabbitMQ 内置的 MessageProperties 類中擷取

MessageProperties.PERSISTENT_TEXT_PLAIN 1      

如果還需要設定其它屬性,可以通過 AMQP.BasicProperties.Builder 去建構一個BasicProperties 對象;這個用法在前兩篇文章中都有展示過

new AMQP.BasicProperties.Builder() .deliveryMode(2) .build()      

二、持久化代碼示範

/**
 * 持久化示例
 */
public class Consumer {
    private static Runnable receive = new Runnable() {
        public void run() {
            // 1、建立連接配接工廠
            ConnectionFactory factory = new ConnectionFactory();
            // 2、設定連接配接屬性
            factory.setHost("192.168.0.1");
            factory.setUsername("admin");
            factory.setPassword("admin");

            Connection connection = null;
            Channel channel = null;
            final String clientName = Thread.currentThread().getName();
            String queueName = "routing_test_queue";

            try {
                // 3、從連接配接工廠擷取連接配接
                connection = factory.newConnection("消費者-" + clientName);

                // 4、從連結中建立通道
                channel = connection.createChannel();

                // 定義一個持久化的,direct類型交換器
                channel.exchangeDeclare("routing_test", "direct", true);
                // 定義一個持久化隊列
                channel.queueDeclare(queueName, true, false, false, null);

                // 将隊列和交換器綁定,第三個參數 routingKey是關鍵,通過此路由鍵決定接收誰的消息
                channel.queueBind(queueName, "routing_test", clientName);

                // 定義消息接收回調對象
                DeliverCallback callback = new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                    }
                };
                // 監聽隊列
                channel.basicConsume(queueName, true, callback, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                    }
                });

                System.out.println(clientName + " 開始接收消息");
                System.in.read();

            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 8、關閉通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }

                // 9、關閉連接配接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(receive, "c1").start();
        new Thread(receive, "c2").start();
    }
}      
public class Producer {

    public static void main(String[] args) {
        // 1、建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設定連接配接屬性
        factory.setHost("192.168.0.1");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、從連接配接工廠擷取連接配接
            connection = factory.newConnection("生産者");

            // 4、從連結中建立通道
            channel = connection.createChannel();

            // 定義一個持久化的,direct類型交換器
            channel.exchangeDeclare("routing_test", "direct", true);

            // 記憶體、磁盤預警時用
            System.out.println("按回車繼續");
            System.in.read();

            // 消息内容
            String message = "Hello A";
            // 發送持久化消息到routing_test交換器上
            channel.basicPublish("routing_test", "c1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 消息内容
            message = "Hello B";
            // 發送持久化消息到routing_test交換器上
            channel.basicPublish("routing_test", "c2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 記憶體、池畔預警時用
            System.out.println("按回車結束");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}      

二、記憶體告警

預設情況下 set_vm_memory_high_watermark 的值為 0.4,即記憶體門檻值(臨界值)為 0.4,表示當RabbitMQ 使用的記憶體超過 40%時,就會産生記憶體告警并阻塞所有生産者的連接配接。一旦告警被解除(有消息被消費或者從記憶體轉儲到磁盤等情況的發生), 一切都會恢複正常。在出現記憶體告警後,所有的用戶端連接配接都會被阻塞。阻塞分為 blocking 和 blocked 兩種。

  • blocking:表示沒有發送消息的連結。
  • blocked:表示試圖發送消息的連結。

如果出現了記憶體告警,并且機器還有可用記憶體,可以通過指令調整記憶體門檻值,解除告警。

rabbitmqctl set_vm_memory_high_watermark 1 1      

或者

rabbitmqctl set_vm_memory_high_watermark absolute 1GB      

但這種方式隻是臨時調整,RabbitMQ 服務重新開機後,會還原。如果需要永久調整,可以修改配置檔案。但修改配置檔案需要重新開機RabbitMQ 服務才能生效。

修改配置檔案: vim /etc/rabbitmq/rabbitmq.conf

vm_memory_high_watermark.relative = 0.4 1      
vm_memory_high_watermark.absolute = 1GB      

三、模拟記憶體告警

1. 調整記憶體門檻值,模拟出告警,在RabbitMQ 伺服器上修改。 注意:修改之前,先在管理頁面看一下目前使用了多少,調成比目前值小

rabbitmqctl set_vm_memory_high_watermark absolute 50MB      

2.重新整理管理頁面(可能需要重新整理多次),在 Overview -> Nodes 中可以看到Memory變成了紅色,表示此節點記憶體告警了

3. 啟動 Producer 和 Consumer(源碼連結在最下面)

4. 檢視管理界面的 Connections 頁面,可以看到生産者和消費者的連結都處于 blocking 狀态。

5. 在 Producer 的控制台按回車健,再觀察管理界面的 Connections 頁面,會發現生産者的狀态成了 blocked 。

6. 此時雖然在 Producer 控制台看到了發送兩條消息的資訊,但 Consumer 并沒有收到任何消息。并且在管理界面的 Queues 頁面也看到不到隊列的消息數量有變化。

7. 解除記憶體告警後,會發現 Consumer 收到了 Producer 發送的兩條消息。

四、記憶體換頁

  • 在Broker節點的使用記憶體即将達到記憶體門檻值之前,它會嘗試将隊列中的消息存儲到磁盤以釋放記憶體空間,這個動作叫記憶體換頁。
  • 持久化和非持久化的消息都會被轉儲到磁盤中,其中持久化的消息本身就在磁盤中有一份副本,此時會将持久化的消息從記憶體中清除掉。
  • 預設情況下,在記憶體到達記憶體門檻值的 50%時會進行換頁動作。也就是說,在預設的記憶體門檻值為 0.4的情況下,當記憶體超過 0.4 x 0 .5=0.2 時會進行換頁動作。
  • 通過修改配置檔案,調整記憶體換頁分頁門檻值(不能通過指令調整)。
# 此值大于1時,相當于禁用了換頁功能。
 vm_memory_high_watermark_paging_ratio = 0.75      

五、磁盤告警

  • 當磁盤剩餘空間低于磁盤的門檻值時,RabbitMQ 同樣會阻塞生産者,這樣可以避免因非持久化的消息持續換頁而耗盡磁盤空間導緻服務崩潰
  • 預設情況下,磁盤門檻值為50MB,表示當磁盤剩餘空間低于50MB 時會阻塞生産者并停止記憶體中消息的換頁動作
  • 這個門檻值的設定可以減小,但不能完全消除因磁盤耗盡而導緻崩漬的可能性。比如在兩次磁盤空間檢測期間内,磁盤空間從大于50MB被耗盡到0MB
  • 通過指令可以調整磁盤門檻值,臨時生效,重新開機恢複
# disk_limit 為固定大小,機關為MB、GB
 rabbitmqctl set_disk_free_limit <disk_limit>      
# fraction 為相對比值,建議的取值為1.0~2.0之間
 rabbitmqctl set_disk_free_limit mem_relative <fraction>      

其實這些内容在官網上都有說明,有興趣可以直接看官網:https://www.rabbitmq.com/alarms.html

  git源碼:https://gitee.com/TongHuaShuShuoWoDeJieJu/rabbit.git

這短短的一生我們最終都會失去,不妨大膽一點,愛一個人,攀一座山,追一個夢