天天看點

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

原理

實作上述需求,其核心就是任意倆個字,而解決這種問題的手段一般就是“通配”。雖然是通配,但是每一條消息實際上還是有個“明确”的目的地的

(1)規則引擎配置:

源端:配置通配符 "+"
這一條保證消息源可以是任意的           
目的端:配置通配符"${TargetDevice}"
這一條保證目的地是任意的,而真正的目的地可以根據TargetDevice變化而變化。
           

(2)裝置端

源裝置上報TargetDevice字段 ,表明該消息去向

Step By Step

前提:測試相關的環境準備好,兩個程式(裝置)(A作為源,B作為目的地),物聯網平台上topic啊,物模型啊之類的都定義好。

1、配置規則引擎

(1)編寫SQL語句

這裡選擇的是自定義topic消息,也可以選其他的。(注意topic需要有釋出權限)

SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"

(2)添加操作(配置目的地)

這裡選下發到另一個自定義topic(注意,測試裝置需要成功訂閱這個topic)

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果
/a16*pRCl/${TargetDevice}/user/get

(3)啟動規則

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

2、裝置端開發

源裝置A:上報消息(消息中一定要包含TargetDevice字段)

這裡展現TargetDevice必須要有,你們自己測試的時候可以再加些其他字段

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

源碼:

AliyunIoTSignUtil:

package com.alibaba.taro;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将參數Key按字典順序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成規範化請求字元串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}
           

裝置A:

package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo {

    public static String productKey = "a16***pRCl";
    public static String deviceName = "IoTDeviceDemo1";
    public static String deviceSecret = "a3b15a11*****c4952";
    public static String regionId = "cn-shanghai";

    // 自定義topic,在産品Topic清單位置定義
    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";

    private static MqttClient mqttClient;
    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 彙報屬性
        String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 訂閱Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 設定訂閱監聽
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //列印輸出消息payLoad             
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 對象
     */
    private static void initAliyunIoTClient() {

        try {
            // 構造連接配接需要的參數
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            //String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 彙報屬性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上報資料
            //進階版 物模型-屬性上報payload
            System.out.println("上報屬性值:");
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服務傳回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上報資料
            //進階版 物模型-屬性上報payload
            System.out.println("服務調用傳回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}



           

目的裝置B:

訂閱規則引擎配置的topic

/a16*pRCl/${TargetDevice}/user/get

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

源碼:

B裝置

package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo2 {

    public static String productKey = "a1*****pRCl";
    public static String deviceName = "IoTDeviceDemo2";
    public static String deviceSecret = "0895205*****7e2b4bf2";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
       
         String payloadJson = "{\"tts\":\"ss\"}";

        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 訂閱Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 設定訂閱監聽
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //列印輸出消息payLoad           
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 對象
     */
    private static void initAliyunIoTClient() {

        try {
            // 構造連接配接需要的參數
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 彙報屬性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上報資料
            //進階版 物模型-屬性上報payload
            System.out.println("上報屬性值:");
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服務傳回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上報資料
            //進階版 物模型-屬性上報payload
            System.out.println("服務調用傳回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}


           

測試結果

1、運作B程式、再運作A程式

A程式上報消息

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

B程式收到雲平台轉發消息

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

2、控制台日志

2020/12/20 14:16:28.304

裝置A上報消息:

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

2020/12/20 14:16:28.334

平台觸發規則引擎轉發消息

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

2020/12/20 14:16:28.336

裝置B收到轉發的消息

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果

總覽:

阿裡雲物聯網平台之利用雲平台流轉如何實作同一款産品下任意倆個裝置的通信?原理Step By Step測試結果