原理
實作上述需求,其核心就是任意倆個字,而解決這種問題的手段一般就是“通配”。雖然是通配,但是每一條消息實際上還是有個“明确”的目的地的
(1)規則引擎配置:
源端:配置通配符 "+"目的端:配置通配符"${TargetDevice}"這一條保證消息源可以是任意的
這一條保證目的地是任意的,而真正的目的地可以根據TargetDevice變化而變化。
(2)裝置端
源裝置上報TargetDevice字段 ,表明該消息去向
Step By Step
前提:測試相關的環境準備好,兩個程式(裝置)(A作為源,B作為目的地),物聯網平台上topic啊,物模型啊之類的都定義好。
1、配置規則引擎
(1)編寫SQL語句
這裡選擇的是自定義topic消息,也可以選其他的。(注意topic需要有釋出權限)
SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"
(2)添加操作(配置目的地)
這裡選下發到另一個自定義topic(注意,測試裝置需要成功訂閱這個topic)
/a16*pRCl/${TargetDevice}/user/get
(3)啟動規則
2、裝置端開發
源裝置A:上報消息(消息中一定要包含TargetDevice字段)
這裡展現TargetDevice必須要有,你們自己測試的時候可以再加些其他字段
源碼:
AliyunIoTSignUtil:
package com.alibaba.taro;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;
/**
* AliyunIoTSignUtil
*/
public class AliyunIoTSignUtil {
public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
//将參數Key按字典順序排序
String[] sortedKeys = params.keySet().toArray(new String[] {});
Arrays.sort(sortedKeys);
//生成規範化請求字元串
StringBuilder canonicalizedQueryString = new StringBuilder();
for (String key : sortedKeys) {
if ("sign".equalsIgnoreCase(key)) {
continue;
}
canonicalizedQueryString.append(key).append(params.get(key));
}
try {
String key = deviceSecret;
return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* HMACSHA1加密
*
*/
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
Mac mac = Mac.getInstance(secretKey.getAlgorithm());
mac.init(secretKey);
byte[] data = mac.doFinal(content.getBytes("utf-8"));
return bytesToHexString(data);
}
public static final String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
}
裝置A:
package com.alibaba;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
public class CustomTopicMessageDemo {
public static String productKey = "a16***pRCl";
public static String deviceName = "IoTDeviceDemo1";
public static String deviceSecret = "a3b15a11*****c4952";
public static String regionId = "cn-shanghai";
// 自定義topic,在産品Topic清單位置定義
private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
// 彙報屬性
String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
postDeviceProperties(payloadJson);
try {
mqttClient.subscribe(subTopic); // 訂閱Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 設定訂閱監聽
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String payload = new String(mqttMessage.getPayload());
System.out.println(" 接收消息:");
System.out.println("Topic : " + s);
System.out.println(payload); //列印輸出消息payLoad
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 對象
*/
private static void initAliyunIoTClient() {
try {
// 構造連接配接需要的參數
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<String, String>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
//String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(false);
//connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 彙報屬性
*/
private static void postDeviceProperties(String payloadJson) {
try {
//上報資料
//進階版 物模型-屬性上報payload
System.out.println("上報屬性值:");
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(pubTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 服務傳回
*/
private static void postServiceReply(String payloadJson,String relpyTopic) {
try {
//上報資料
//進階版 物模型-屬性上報payload
System.out.println("服務調用傳回:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
System.out.println("Topic:");
System.out.println(relpyTopic);
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(relpyTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
目的裝置B:
訂閱規則引擎配置的topic
/a16*pRCl/${TargetDevice}/user/get
源碼:
B裝置
package com.alibaba;
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
public class CustomTopicMessageDemo2 {
public static String productKey = "a1*****pRCl";
public static String deviceName = "IoTDeviceDemo2";
public static String deviceSecret = "0895205*****7e2b4bf2";
public static String regionId = "cn-shanghai";
private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
String payloadJson = "{\"tts\":\"ss\"}";
postDeviceProperties(payloadJson);
try {
mqttClient.subscribe(subTopic); // 訂閱Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 設定訂閱監聽
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String payload = new String(mqttMessage.getPayload());
System.out.println(" 接收消息:");
System.out.println("Topic : " + s);
System.out.println(payload); //列印輸出消息payLoad
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 對象
*/
private static void initAliyunIoTClient() {
try {
// 構造連接配接需要的參數
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<String, String>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(false);
//connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 彙報屬性
*/
private static void postDeviceProperties(String payloadJson) {
try {
//上報資料
//進階版 物模型-屬性上報payload
System.out.println("上報屬性值:");
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(pubTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
/**
* 服務傳回
*/
private static void postServiceReply(String payloadJson,String relpyTopic) {
try {
//上報資料
//進階版 物模型-屬性上報payload
System.out.println("服務調用傳回:");
//String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
System.out.println("Topic:");
System.out.println(relpyTopic);
System.out.println(payloadJson);
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(0);
mqttClient.publish(relpyTopic, message);
System.out.println("=================================================================");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
測試結果
1、運作B程式、再運作A程式
A程式上報消息
B程式收到雲平台轉發消息
2、控制台日志
2020/12/20 14:16:28.304
裝置A上報消息:
2020/12/20 14:16:28.334
平台觸發規則引擎轉發消息
2020/12/20 14:16:28.336
裝置B收到轉發的消息
總覽: