天天看點

物聯網平台實用技巧:通過服務端訂閱(HTTP/2)擷取裝置狀态原理訂閱接收

産品推薦:阿裡雲物聯網開發者工具( IoT Studio ), 立刻免費體驗吧!

原理

在物聯網平台控制台,設定服務端訂閱裝置狀态變化通知消息後,物聯網平台會将該産品下的裝置上下線消息推送到您的服務端。服務端通過HTTP/2 SDK接收裝置狀态變化消息。

流程圖

物聯網平台實用技巧:通過服務端訂閱(HTTP/2)擷取裝置狀态原理訂閱接收

說明 步驟1.1和2.1物聯網平台判斷是否已配控制台已配置服務端訂閱裝置狀态變化通知。

裝置狀态變化通知消息的資料格式

"status":"online|offline",
    "productKey":"al123455****",
    "deviceName":"deviceName1234",
    "time":"2018-08-31 15:32:28.205",
    "utcTime":"2018-08-31T07:32:28.205Z",
    "lastTime":"2018-08-31 15:32:28.195",
    "utcLastTime":"2018-08-31T07:32:28.195Z",
    "clientIp":"123.123.***.***"
}           

訂閱

在物聯網平台控制台,設定HTTP/2服務端訂閱裝置狀态變化通知。

1、登入物聯網平台控制台。

2、左側導航欄選擇裝置管理 > 産品。

3、在産品清單中,搜尋到要配置服務端訂閱的産品,并單擊該産品對應的檢視按鈕。

4、在産品詳情頁,單擊服務端訂閱 > 設定。

5、選擇推送的消息類型為裝置狀态變化通知,單擊儲存

物聯網平台實用技巧:通過服務端訂閱(HTTP/2)擷取裝置狀态原理訂閱接收

接收

服務端通過HTTP/2 SDK接收裝置狀态消息。本示例以配置Java HTTP/2 SDK為例。

說明

僅支援JDK8。

如果同一個阿裡雲賬号資訊用于啟動了多個HTTP/2 SDK,物聯網平台會将裝置狀态消息随機推送到其中一個用戶端。

在Maven項目中,添加以下pom依賴,安裝阿裡雲IoT SDK。

<groupId>com.aliyun.openservices</groupId>
  <artifactId>iot-client-message</artifactId>
  <version>1.1.3</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>aliyun-java-sdk-core</artifactId>
  <version>3.7.1</version>
</dependency>           

Config.*參數值中,需傳入您的阿裡雲賬号AccessKey資訊和裝置資訊。

private static String accessKeyID = "Config.accessKey";
  // 您的阿裡雲賬号AccesseKey Secret
  private static String accessKeySecret = "Config.accessKeySecret";
  // 您的阿裡雲賬号ID
  private static String uid = "Config.uid";
  // regionId
  private static String regionId = "cn-shanghai";
  // endPoint: https://${uid}.iot-as-http2.${region}.aliyuncs.com
  private static String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";           

完整示例代碼如下:

* Copyright © 2019 Alibaba. All rights reserved.
 */
package com.aliyun.iot.demo.checkstatus;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.message.MessageClientFactory;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyun.openservices.iot.api.message.callback.MessageCallback;
import com.aliyun.openservices.iot.api.message.entity.MessageToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class GetDeviceStatusByH2 {

  // ===================需要使用者填寫的參數開始===========================
  // 修改Config.*的參數為您的賬号資訊
  // 各項資訊的擷取方式請參考 https://help.aliyun.com/document_detail/89227.html
  // 使用者賬号AccessKey
  private static String accessKeyID = "Config.accessKey";
  // 使用者賬号AccesseKeySecret
  private static String accessKeySecret = "Config.accessKeySecret";
  // 使用者uid
  private static String uid = "Config.uid";
  // regionId
  private static String regionId = "cn-shanghai";
  // endPoint: https://${uid}.iot-as-http2.${region}.aliyuncs.com
  private static String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";
  // ===================需要使用者填寫的參數結束===========================

  private static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
      Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("http2-downlink-handle-%d").build(),
      new ThreadPoolExecutor.AbortPolicy());

  /**
   * 1、設定服務端訂閱
   * 2、啟動本程式
   * 3、啟動您的裝置,使裝置線上
   * 4、關閉您的裝置,使裝置離線
   * 5、檢視本程式列印的日志
   * 
   * @param args
   */
  public static void main(String[] args) {

    // 連接配接配置
    Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKeyID, accessKeySecret);

    // 構造用戶端
    MessageClient client = MessageClientFactory.messageClient(profile);

    // 消息回調處理
    MessageCallback messageCallback = new MessageCallback() {
      @Override
      public Action consume(MessageToken messageToken) {
        // 傳回消費成功,業務另起線程處理,以免堵塞回調
        executorService.submit(() -> handleDownLinkMessage(messageToken));
        // 收到消息應該盡快return commitSuccess
        return MessageCallback.Action.CommitSuccess;
      }
    };

    // 本地過濾。物聯網平台會将已訂閱的全部裝置消息推送下來,這裡通過比對Topic,過濾出要處理的消息
    // 這裡隻處理Topic為 /as/mqtt/status 開頭的消息,其他消息也會收到但不處理
    // Topic及其對應的消息格式請參考 https://help.aliyun.com/document_detail/73736.html
    client.setMessageListener("/as/mqtt/status/#", messageCallback);

    // 通用回調,setMessageListener中未比對的消息在這裡處理
    MessageCallback messageCallbackCommon = new MessageCallback() {

      @Override
      public Action consume(MessageToken messageToken) {
        // 如果有業務處理,建議另起線程處理
        return MessageCallback.Action.CommitSuccess;
      }
    };

    // 資料接收
    client.connect(messageCallbackCommon);
  }

  private static void handleDownLinkMessage(MessageToken messageToken) {
    // 整個消息體内容是JSON
    String message = new String(messageToken.getMessage().getPayload());
    // 擷取其中的status字段,就是裝置線上狀态
    JSONObject json = (JSONObject) JSON.parse(message);
    String deviceName = json.getString("deviceName");
    String status = json.getString("status");
    System.out.println("消息原始内容: " + message);
    System.out.println(deviceName + " 線上狀态: " + status);
    // 其他自定義實作
  }
}           

服務端接收訂閱消息的Java HTTP/2 SDK配置詳細說明,請參見開發指南(Java)。