天天看點

MQTT擷取離線消息小議

作者:俏巴

概述

微消息隊列MQ for IoT在處理離線消息時,為了簡化離線消息擷取機制,微消息隊列系統在用戶端成功建立連接配接并通過權限校驗後,會自動加載離線消息并下發到用戶端,但是實際在使用過程中會出現消費端啟動後遲遲無法擷取離線消息的問題,本文主要介紹延遲消息的發送與接收環節需要注意的問題。

協定相關

注意在使用SDK進行離線消息的發送過程中需要特别注意QoS和cleanSession兩個參數。
  • QoS 指代消息傳輸的服務品質(主要針對發送端)
取值 1 2 3
意義 最多分發一次 僅分發一次
  • cleanSession 建立 TCP 連接配接後是否關心之前狀态(主要針對接收端)
true false
用戶端再次上線時,将不再關心之前所有的訂閱關系以及離線消息 用戶端再次上線時,還需要處理之前的離線消息,而之前的訂閱關系也會持續生效

為了處理的友善,對于處理離線消息的情況,建議不論是發送端還是接收端,參數都設定為:

QoS = 1

cleanSession = false           

Java示例代碼

Send Code

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.IOException;
import java.util.Date;

import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;

public class MQTTSendMsg1 {

    public static void main(String[] args) throws IOException {

        final String broker ="tcp://******.mqtt.aliyuncs.com:1883";
        final String acessKey ="******";
        final String secretKey ="******";
        final String topic ="******";
        final String clientId ="GID_******@@@ClientID_device1";
        String sign;
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            System.out.println("Connecting to broker: " + broker);
            sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
            connOpts.setUserName(acessKey);
            connOpts.setServerURIs(new String[] { broker });
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(90);
            connOpts.setAutomaticReconnect(true);
            connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
            sampleClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    System.out.println("connect success");
                    //連接配接成功,需要上傳用戶端所有的訂閱關系
                }
                public void connectionLost(Throwable throwable) {
                    System.out.println("mqtt connection lost");
                }
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });
            sampleClient.connect(connOpts);
            for (int i = 0; i < 5; i++) {
                try {
                    String scontent = new Date()+"MQTT Test body" + i;
                    //此處消息體隻需要傳入 byte 數組即可,對于其他類型的消息,請自行完成二進制資料的轉換
                    final MqttMessage message = new MqttMessage(scontent.getBytes());
                    message.setQos(1);//設定離線消息的情況
                    System.out.println(i+" pushed at "+new Date()+" "+ scontent);
                    sampleClient.publish(topic+"/notice/", message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception me) {
            me.printStackTrace();
        }
    }
}           

Receive Code

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MQTTRecvMsg {
        public static void main(String[] args) {

            final String broker ="tcp://******.mqtt.aliyuncs.com:1883";
            final String acessKey ="******";
            final String secretKey ="******";
            final String topic ="******";
            final String clientId ="GID_******@@@ClientID_device2";
            String sign;
            MemoryPersistence persistence = new MemoryPersistence();
            try {
                final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
                final MqttConnectOptions connOpts = new MqttConnectOptions();
                System.out.println("Connecting to broker: " + broker);

                sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
                final String[] topicFilters=new String[]{topic+"/notice/"};
                final int[]qos={1};
                connOpts.setUserName(acessKey);
                connOpts.setServerURIs(new String[] { broker });
                connOpts.setPassword(sign.toCharArray());
                connOpts.setCleanSession(false);//設定确定是否繼續接受離線消息
                connOpts.setKeepAliveInterval(90);
                connOpts.setAutomaticReconnect(true);
                final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
                sampleClient.setCallback(new MqttCallbackExtended() {
                    public void connectComplete(boolean reconnect, String serverURI) {
                        System.out.println("connect success");
                        //連接配接成功,需要上傳用戶端所有的訂閱關系
                        executorService.submit(new Runnable()
                        {
                            public void run()
                            {
                                try
                                {
                                    sampleClient.subscribe(topicFilters, qos);
                                } catch(Exception me)
                                {
                                    me.printStackTrace();
                                }
                            }
                        });
                    }
                    public void connectionLost(Throwable throwable) {
                        System.out.println("mqtt connection lost");
                    }
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                    }
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                    }
                });
                //用戶端每次上線都必須上傳自己所有涉及的訂閱關系,否則可能會導緻消息接收延遲
                sampleClient.connect(connOpts);
                //每個用戶端最多允許存在30個訂閱關系,超出限制可能會丢棄導緻收不到部分消息
                sampleClient.subscribe(topicFilters,qos);
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception me) {
                me.printStackTrace();
            }
        }
}
           

特别注意:

離線消息生成需要一定的時間,因為推送的消息需要等待用戶端的 ack 逾時才會被判成離線消息,是以擷取離線消息一般也需要訂閱端等待一定的時間。

參考連結

微消息隊列名詞解釋 MQTT 擷取離線消息