天天看点

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

物模型价值

物联网元年

关键词:探索、快速

2016年阿里云物联网平台(前称:物联网套件)上线,为客户设备上云提供了通道能力,包括MQTT连接、消息流转等核心功能。

第一批客户大多基于该模式使用物联网平台能力,当时整个行业处于物联网云平台起步期,包括AWS,Azure起步阶段同样只是提供通道能力。

基于通道能力,客户使用物联网平台接入方式详见文档

https://developer.aliyun.com/article/746536

这个阶段的客户大多是硬件厂商,软硬一体开发,尝试物联网转型提升设备价值,对物联网平台的诉求比较简单,希望自己更多参与,对新模式有更多把控力,所以都会采用自定义协议上云。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

物联网繁荣

关键词:生态、扩展、数字化

近两年物联网设备、解决方案如雨后春笋般涌出,不少用户希望赶上物联网这波浪潮。这个阶段的客户不仅仅关注设备连云,也开始关注围绕设备产生的解决方案。因此客户角色从硬件厂商,快速扩展到集成商、软件提供商等。由于大量角色的进入,对软硬开发解耦、易扩展的能力提出了诉求。同时我们也发现第一批使用通道能力的平台客户随着自己业务发展、设备扩展,原来的架构已无法支撑,对物联网平台也提出了新的要求。

举两个典型场景:

  • 老客户升级:某个共享设备提供商,原来仅提供大学校园共享洗衣机服务,利用物联网平台通道能力上云,随着公司业务发展,从共享洗衣机业务扩展到校园淋浴、饮水机、充电桩等多类设备,原来自定义协议和API无法支撑多品类设备,难扩展。需要有一套接入标准和规范,方便快速扩展设备类型。
  • 新生态客户:某个充电桩平台客户,提供充电桩管理平台,作为甲方要求大量桩企(乙方)按照平台规范接入,典型的软硬件分离场景。需要有一套接入标准和规范,方便快速扩展桩企规模。

这一阶段平台在通道能力之上,提供了物模型能力,物模型可以屏蔽底层设备差异,让软件开发者基于平台提供的标准API开发;硬件开发者基于平台提供的标准协议开发;从而达到软硬开发解耦的目的。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

物联网赋能

关键词:场景化、智能

物联网终极目标一定是基于设备采集数据赋能业务,实现数字业务化。例如金融、物流、家居、餐饮、商场、医疗、交通等不同领域通过物联网数字化后,结合数据分析智能化决策、互联互通、场景规则、数字孪生等能力实现纵深领域场景化、智能化。

这一阶段平台在通道能力、物模型能力之上,还进一步提供设备智能运维、数据分析、可视化、数字孪生等高价值服务,帮助客户数字化后产生真正的业务价值。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

基于以上分析,物联网已经过了最初的“元年”阶段,也迈入了“繁荣”阶段,正逐步朝“问物联网要赋能”的阶段演进。物模型是物联网生态化、高扩展、数字化、智能化非常重要的基础,强烈建议客户使用。

物模型接入实践

自定义接入模式

以一个老客户为例,原来仅使用物联网平台通道能力,下图中1~8流程都需要自定义开发,当客户设备类型足够简单时,该模式复杂度通常不会成为客户痛点。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

面临的挑战

随着客户接入设备种类越来越多,面临的扩展性问题也越来越严峻。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

使用物模型后的模式

物模型模式下,设备与云交互协议、云平台设备API都基于物模型标准化了,即使设备不断扩展,客户业务服务器和设备端逻辑都不需要进行调整,保证了扩展性。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

物模型接入流程详细介绍

流程图

以下是客户详细接入流程,主要分为:云端配置、设备开发、服务端开发、设备运行时管理四大部分。平台会提供一些工具,使各部分流程更高效。接下来进行详细介绍。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

本文试图手把手介绍从0到1接入物模型,还会配套介绍一些接入过程中有帮助的平台能力,所以文章篇幅比较长,事实上客户接入流程还是非常简单的,真正开发只需要涉及到图中红色三个模块。

如果您希望快速接入,可以直接关注P0部分,其它部分都可以跳过。

1 云端配置

1.1 创建产品(P0)

1.

登录物联网平台

2.创建产品。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

说明:

• 所属品类:标准品类库提供了一些供参考的模板,选择后可以修改,建议使用。

• 节点类型:根据实际选择即可。

• 数据格式:“ICA标准数据格式(Alink JSON)”表示设备使用标准Alink JSON格式上报数据;“透传/自定义”表示设备可以使用自定义格式,通过Alink特定Topic上报物联网平台,该模式客户需要写脚本进行转换,透传模式在此不做展开,后面单独起文章介绍。

1.2 物模型建模(P0)

1.模型查看。

已有的模型是继承自创建产品时选择的“充电桩”品类模板。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

2.编辑模型。

通过“编辑草稿”,进行修改和添加,最后需要对物模型“发布上线”。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

• 定义物模型非常重要,物模型通过属性、事件、服务三要素描述了设备所有能力,设备和云交互、客户服务器访问设备通过物模型都可以实现协议标准化。如果客户定义的物模型如果足够通用和专业,阿里可以帮助作为ICA行业标准进行推广。

• 服务的调用方式有:同步调用、异步调用两种模式。客户云端开发调用下行控制API,同步调用和异步调用获取返回结果方式不一样,在后文“3.3”章节详细介绍。

物模型概念介绍

物模型介绍文档请参见

这里

了解物模型概念,能够帮助您更好对设备建模。

1.3 物模型配置

当前默认是物模型强校验模式,即设备上报数据在IoT平台会进行物模型数据规范强校验,如果不符合规范会报错。

另外物模型弱校验、免校验、去重等规则也会在近期陆续开放,后期进行文档补充。

配置之后,会在设备运行时生效。

关联阅读:4.2 物模型扩展规则校验。

1.4 注册三元组(P0)

1.注册设备。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

• 添加设备:测试阶段使用较多,单个添加。

• 批量添加:量产阶段使用,有两种模式,“自动生成”表示设备标识符(deviceName)由平台按照一定的规则随机颁发;“批量上传”支持客户自定义设备标识符(deviceName)。

2.查看设备列表。

可以通过“设备列表”、“批次管理”两种方式查看创建的设备列表。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

通过“批次管理”查看这一批次设备详情,并且支持下载三元组列表。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
注意:此处设备标识符(deviceName)非常重要,与productKey, deviceSecret一起称为设备的“三元组”,作为设备的唯一身份,大部分情况需要烧录到设备上。

2 设备开发

2.1 使用设备SDK开发(P0)

设备接入SDK文档请参见

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
根据需要选择合适的语言版本。C SDK 建议使用“4.x”版本。

本文选择 Java SDK进行演示。

环境准备:

https://help.aliyun.com/document_detail/97331.html

物模型开发:

https://help.aliyun.com/document_detail/97333.html

1.开发之前需要先准备如下好两份数据:

  • 设备证书信息(productKey、deviceName、deviceSecret)
    物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
  • 设备物模型
    物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

为了方便查看物模型详细数据规范,通过导出“物模型TSL”查看详细物模型定义,其中包括物模型属性、事件、服务标识符、参数、数据规范。抽取部分内容,针对以下属性、事件、服务在DEMO中进行开发演示。

"schema":"https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json",
    "profile":{
        "productKey":"a1nhbEV****"
    },
    "properties":[
        {
            "identifier":"acOutMeterIty",
            "name":"交流输出电表底值监测属性",
            "accessMode":"rw",
            "required":false,
            "dataType":{
                "type":"int",
                "specs":{
                    "min":"0",
                    "max":"200",
                    "step":"1"
                }
            }
        }
    ],
    "events":[
        {
            "identifier":"post",
            "name":"post",
            "type":"info",
            "required":true,
            "desc":"属性上报",
            "method":"thing.event.property.post",
            "outputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ]
        },
        {
            "identifier":"startChaResEvt",
            "name":"启动充电结果事件",
            "type":"info",
            "required":false,
            "method":"thing.event.startChaResEvt.post",
            "outputData":[
                {
                    "identifier":"gunNum",
                    "name":"充电枪编号",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ]
        }
    ],
    "services":[
        {
            "identifier":"set",
            "name":"set",
            "required":true,
            "callType":"async",
            "desc":"属性设置",
            "method":"thing.service.property.set",
            "inputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ],
            "outputData":[

            ]
        },
        {
            "identifier":"get",
            "name":"get",
            "required":true,
            "callType":"async",
            "desc":"属性获取",
            "method":"thing.service.property.get",
            "inputData":[
                "acOutMeterIty"
            ],
            "outputData":[
                {
                    "identifier":"acOutMeterIty",
                    "name":"交流输出电表底值监测属性",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"200",
                            "step":"1"
                        }
                    }
                }
            ]
        },
        {
            "identifier":"startChaResService",
            "name":"开启充电",
            "required":false,
            "callType":"async",
            "method":"thing.service.startChaResService",
            "inputData":[
                {
                    "identifier":"charm",
                    "name":"电量",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"1",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ],
            "outputData":[
                {
                    "identifier":"realcharm",
                    "name":"realcharm",
                    "dataType":{
                        "type":"int",
                        "specs":{
                            "min":"0",
                            "max":"100",
                            "step":"2"
                        }
                    }
                }
            ]
        }
    ]
}           

2.开发代码。

如下示例中只需要将三元组,和属性、事件、服务参数替换成您的设备信息。其它代码可以直接运行。

关于免订阅能力介绍:

有些设备最资源比较敏感,为了避免初始化订阅大量Alink协议中系统Topic带来的性能开销,平台提供了免订阅能力,即平台帮设备进行Topic订阅。

SDK只有3.1.0及以后版本支持免订阅能力,并且默认打开该能力。

如果3.1.0及以后版本SDK您希望取消免订阅,依旧按需订阅Topic,可以设置SDK配置项关闭该能力,在make.settings中设置“FEATURE_MQTT_AUTO_SUBSCRIBE=n”。

public class Demo {

    public static void main(String[] args) throws Exception {

        String pk = "a1nhbEVCP**";
        String dn = "7mBP6Dd6IT27Rt***";
        String ds = "*****";

        /**
         * 连接 & 认证
         */
        LinkKitInitParams params = new LinkKitInitParams();

        // 设置 Mqtt 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = pk;
        config.deviceName = dn;
        config.deviceSecret = ds;
        config.receiveOfflineMsg = false;
        params.mqttClientConfig = config;

        // 设置初始化三元组信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = pk;
        deviceInfo.deviceName = dn;
        deviceInfo.deviceSecret = ds;

        params.deviceInfo = deviceInfo;

        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            public void onError(AError aError) {
                System.out.println("===============FAILURE===============");
                ALog.e(TAG, "Init Error error=" + aError);
                System.out.println("===============FAILURE===============");
            }

            public void onInitDone(InitResult initResult) {
                System.out.println("===============SUCCESS===============");
                ALog.i(TAG, "onInitDone result=" + initResult);
                System.out.println("===============SUCCESS===============");
            }

        });

        //此处sleep 5S,由于上面init是异步流程
        Thread.sleep(5000);

        /**
         * 物模型开发
         */

        /**
         * 上报属性
         */
        Map<String, ValueWrapper> properties = new HashMap<>();

        // key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
        properties.put("acOutMeterIty", new ValueWrapper(10));

        LinkKit.getInstance().getDeviceThing().thingPropertyPost(properties, new IPublishResourceListener() {

            @Override
            public void onSuccess(String s, Object o) {
                System.out.println("=====thingPropertyPost success=======");
                System.out.println(s);
                System.out.println(JSON.toJSONString(o));
            }

            @Override
            public void onError(String s, AError aError) {
                System.out.println("=====thingPropertyPost failure=======");
            }
        });

        // 上报属性之后,云端会返回响应结果,此处是监听云端返回的属性reply
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {

            @Override
            public void onNotify(String s, String s1, AMessage aMessage) {
                System.out.println("===PROPERTY REPLY===");
                System.out.println("TOPIC:" + s1);
                System.out.println("Payload:" + JSON.toJSONString(aMessage));
            }

            @Override
            public boolean shouldHandle(String s, String s1) {
                return false;
            }

            @Override
            public void onConnectStateChange(String s, ConnectState connectState) {
            }
        });

        /**
         * 上报事件
         */
        HashMap<String, ValueWrapper> eventMap = new HashMap<>();

        // key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
        eventMap.put("gunNum", new ValueWrapper.IntValueWrapper(50));

        OutputParams eventOutput = new OutputParams(eventMap);

        // 参数identity为"startChaResEvt"属于物模型事件标识符。
        LinkKit.getInstance().getDeviceThing().thingEventPost("startChaResEvt", eventOutput, new IPublishResourceListener() {
            public void onSuccess(String resId, Object o) {
                System.out.println("=====thingEventPost success=======");
                System.out.println(resId);
                System.out.println(JSON.toJSONString(o));
            }

            public void onError(String resId, AError aError) {
                System.out.println("=====thingEventPost failure=======");
            }
        });

        /**
         * 监听并执行下行服务
         */
        // 获取设备支持的所有服务
        LinkKit.getInstance().getDeviceThing().getServices();

        // 用户可以根据实际情况注册自己需要的服务的监听器
        List<Service> srviceList = LinkKit.getInstance().getDeviceThing().getServices();

        for (int i = 0; srviceList != null && i < srviceList.size(); i++) {
            Service service = srviceList.get(i);

            LinkKit.getInstance().getDeviceThing().setServiceHandler(service.getIdentifier(), new ITResRequestHandler() {

                public void onProcess(String identify, Object result, ITResResponseCallback itResResponseCallback) {

                    System.out.println("onProcess() called with: s = [" + identify + "], o = [" + result + "], itResResponseCallback = [" + itResResponseCallback + "]");
                    System.out.println("收到云端异步服务调用 " + identify);
                    try {
                        /**
                         * 设置属性(property)的模式
                         */
                        // "set"为设置属性默认的标识符
                        if ("set".equals(identify)) {
                            // TODO 用户需要设置真实设备的的属性
                            /**
                             * 向云端同步设置好的属性值
                             */
                            Map<String, ValueWrapper> desiredProperty = (Map<String, ValueWrapper>) ((InputParams) result).getData();

                            LinkKit.getInstance().getDeviceThing().thingPropertyPost(desiredProperty, new IPublishResourceListener() {

                                @Override
                                public void onSuccess(String s, Object o) {
                                    if (result instanceof InputParams) {
                                        Map<String, ValueWrapper> data = (Map<String, ValueWrapper>) ((InputParams) result).getData();
                                        //                        data.get()
                                        ALog.d(TAG, "收到异步下行数据 " + data);
                                        // 响应云端 接收数据成功
                                        itResResponseCallback.onComplete(identify, null, null);
                                    } else {
                                        itResResponseCallback.onComplete(identify, null, null);
                                    }
                                }

                                @Override
                                public void onError(String s, AError aError) {
                                    AError error = new AError();
                                    error.setCode(100);
                                    error.setMsg("setPropertyFailed.");
                                    itResResponseCallback.onComplete(identify, new ErrorInfo(error), null);
                                }
                            });

                            /**
                             * 服务(service)的模式
                             */
                            // "startChaResService"为服务的标识符
                        } else if ("startChaResService".equals(identify)) {

                            Map<String, ValueWrapper> inputParams = (Map<String, ValueWrapper>) ((InputParams) result).getData();
                            // TODO 根据服务入参inputParams执行设备逻辑,比如启动充电
                            // 充电完成后,向云端返回输出参数
                            OutputParams outputParams = new OutputParams();
                            // key为"charm"属于物模型中"startChaResService"服务出参标识符,value为出参值遵循数据规范:int类型,数据范围1~100之间;
                            outputParams.put("charm", new ValueWrapper.IntValueWrapper(20));

                            itResResponseCallback.onComplete(identify, null, outputParams);

                        } else {
                            // 根据不同的服务做不同的处理,跟具体的服务有关系
                            OutputParams outputParams = new OutputParams();
                            // 根据特定服务,按照服务规范返回服务的出参。
                            itResResponseCallback.onComplete(identify, null, outputParams);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        ALog.d(TAG, "云端返回数据格式异常");
                    }
                }
                public void onSuccess(Object o, OutputParams outputParams) {
                    ALog.d(TAG, "onSuccess() called with: o = [" + o + "], outputParams = [" + outputParams + "]");
                    ALog.d(TAG, "注册服务成功");
                }
                public void onFail(Object o, ErrorInfo errorInfo) {
                    ALog.d(TAG, "onFail() called with: o = [" + o + "], errorInfo = [" + errorInfo + "]");
                    ALog.d(TAG, "注册服务失败");
                }
            });
        }
    }
}           

• 上报属性成功,云端会返回REPLY,有以下日志说明设备到云,云到设备的链路全部走通。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

• 设备收到属性设置指令,在完成物理设备属性修改后,建议将最新属性同步上报云端。

2.2 不使用SDK开发

1.协议准备。

“2.1 使用设备SDK开发”介绍了使用阿里云提供的SDK进行设备开发,当然您也可以选择不使用SDK,完全基于Alink协议(设备和云交互协议)开发。

Alink协议文档:

https://help.aliyun.com/document_detail/90459.html

重点关注物模型协议部分:

https://help.aliyun.com/document_detail/89301.html

。里面包含了物模型相关所有Topic介绍(物模型Topic列表在控制台也可以查看,如下图)。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

文档详细介绍了设备端如何向云端上报“属性”、“事件”,如何订阅云端向下发送的“服务”指令。

Topic和Payload都基于客户定义的物模型进行标准化和规范化,从而使得客户设备与云交互方式不会随着设备类型变化而改变,满足扩展性要求。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

2.环境准备。

根据自己选型选择合适的MQTT客户端,本文选择eclipse paho。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>//可以选择您需要的版本
</dependency>           

3.开发。

物模型复用“2.1 使用设备SDK开发”中“开发前准备”给出的。

如果不使用SDK开发,可以通过设备端在MQTT的连接报文中的clientId部分, 新增_ss=1表示开启自动订阅, 建连成功后服务端会自动订阅上以下表格中的topic, 若传递 _ss=0 或者不传递该字段, 则不会发生服务端自动订阅动作。

4.上报属性。

String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT2*****";
String deviceSecret = "****";

// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);

client.connect();

String setTopic = "/thing/event/property/post";
String setTopicReply = "/thing/event/property/post_reply";

// 上报属性,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
// 此处client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client subscribe
client.sysTopic(setTopicReply).subscribe();

// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.property.post");

// 组装属性payload
String propKey = "acOutMeterIty";
int statusValue = 30;
Map<String, Object> proValue = new HashMap<>();
proValue.put("value", statusValue);
proValue.put("time", System.currentTimeMillis());
params.put(propKey, proValue);

// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));

// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);

client.disconnect();           

日志打印的设备请求和响应。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

5.上报事件。

String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT27*****";
String deviceSecret = "***";

// MQTT连接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);

client.connect();

// topic中为"startChaResEvt"属于物模型事件标识符。
String setTopic = "/thing/event/startChaResEvt/post";
String setTopicReply = "/thing/event/startChaResEvt/post_reply";

// 报事件,云端会返回REPLY,进行订阅。(为了节省端侧订阅开销,可以开通免订阅)
client.sysTopic(setTopicReply).subscribe();

// 封装Alink协议系统参数
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保证设备端一段时间内唯一
payload.put("params", params);
payload.put("method", "thing.event.startChaResEvt.post");

// 组装属性payload
Map<String, Object> dataValue = new HashMap<>();
// key为物模型中事件参数的标识符"gunNum", value为事件参数值需要遵循数值规范:int类型,取值范围0~100之间;
dataValue.put("gunNum", 59);

params.put("value", dataValue);
params.put("time", System.currentTimeMillis());

// 上报(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));

// 打印云端返回的Reply(client进行了封装,您根据自己的业务进行封装即可,也可以直接使用MQTT Client监听订阅消息)
client.sysTopic(setTopicReply).readTopic(10000);

client.disconnect();           

6.服务调用。

此处为一段伪代码。可以在MQTT建连的时候通过callback监听云端下发的控制指令或消息。

前提:已经对下行的TOPIC进行订阅过,免订阅能力参考上面介绍。

mqttClient = new MqttClient(url, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(true);
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(65);
LogUtil.log(clientId + "进行连接, 目的地: " + url);

// 此处订阅云端下发的消息
mqttClient.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
        LogUtil.log("connection lost, cause:" + cause);
        cause.printStackTrace();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        TopicChannel topicChannel = getTopic(topic);
        LogUtil.log("receive message, channel:" + topicChannel
                    + ",topic:" + topic
                    + ", payload:" + new String(message.getPayload(), "UTF-8") + "");
        topicChannel.put(message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //如果是qos 0消息 token.resp是没有回复的
        LogUtil.log("sent, " + ((token == null || token.getResponse() == null) ? "null"
                                : token.getResponse().getKey()));
    }
});

mqttClient.connect(connOpts);           

重点说明:

• 所有被订阅的下行Topic都会被监听到。物模型相关的主要包括:属性上报Reply、属性下行设置、服务下行控制。

• 设置设备属性(

https://help.aliyun.com/document_detail/89301.html#title-wmh-y2e-18r

),默认异步方式返回结果。

• 订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/property/set”

• 服务控制(

https://help.aliyun.com/document_detail/89301.html#title-3pt-nfy-jys

),同异步方式取决于物模型中service配置的调用模式。

• 服务异步方式订阅的Topic为Alink协议标准Topic:“/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}”

• 服务同步方式订阅的Topic需要遵循RRPC Topic模式:详见文档

https://help.aliyun.com/document_detail/90568.html
注意:仅设备侧需要感知RRPC特殊TOPIC,设备上云后,数据流转、开放API面向的还是Alink协议编程。

2.3 在线调试

设备开发后之后,如何快速模拟业务服务器给设备下发指令,调试设备能力?IoT平台提供了“在线调试”的功能,可以模拟设备或模拟应用端到端调试。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

此处使用“在线调试”里面“调试真实设备”能力。通过控制台下发设备控制指令,分两类:1)属性设置;2)服务调用。

1.服务调用调试。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

云端下发后,可以到设备端查看控制Log是否打印,以判断指令达到端侧。

从图中可见设备收到startChaResService服务,同时向云端返回了输出参数。

2.属性设置调试。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

• “获取”:暂不支持到设备,只能从云端获取设备最新属性。

• “设置”:指令直接到设备端,设备修改本地属性之后,上报云端最新属性;到设备上的设置指令为"set"。

• “设置期望值”:如果设备在线,会直接下发设备,如果设备离线,指令在云端进行缓存,待上线后下发设备端,下发之后,设备修改本地属性之后,同样上报云端最新属性;到设备上的设置指令同样为"set"。如果您希望使用物模型期望值能力,可点击查看最佳实践。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

从图中可见设备收到set指令,返回了服务响应,同时向云端上报了最新属性。

说明:服务结果还可以通过“2.4 查看物模型数据”章节中获取。

2.4 查看物模型数据

DEMO运行之后,可以看到设备已经“在线”状态。

“运行状态”展示设备上报的属性值;

“事件管理”展示设备上报的事件;

“服务调用”展示云端下发设备的控制服务;

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

上报属性结构化展示。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

上报事件,包括事件参数展示。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

属性设置、服务调用两类服务的云端下发入参、设备响应出参都有展示,如上证明设备收到云端指令,并且正常返回响应。

2.5 查看日志服务

设备在运行过程,可能会出现一些异常,比如连接失败、认证失败、数据异常等等,为了便于排查,可以查看日志服务。举例设备上报数据可能会不符合物模型规范,比如事件参数"gunNum"对应值的数据范围为0~100之间,而真实上报了50000。日志服务会展示设备错误详情。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

可以看到日志内容为“{"Reason":"tsl parse: int value is bigger than max 100 -> gunNum"}”,说明gunNum对应值超过物模型规范最大值100的限制。物模型规范详情到“物模型TSL”查看。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

同时可以通过“日志转储”中“日志报表”进一步查看设备大盘,包括设备上下线次数、设备上线IP区域分布、设备消息量、设备消息量Top列表、物模型错误分布、云端API错误分布等多维度指标。

日志服务介绍文档请参见

3 服务端开发

设备连接到阿里云IoT平台,设备数据会保存在IoT平台时序数据库。同时IoT平台提供两种方式供客户获取设备数据:方式1)通过服务端订阅或者规则引擎实时流转到客户服务器;2)通过开放API供客户调用获取。

3.1 服务端调用API开发(P0)

1.环境准备。

SDK下载文档:

https://help.aliyun.com/document_detail/30581.html

API接口列表:

https://help.aliyun.com/document_detail/69579.html

重点关注物模型使用相关API

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

2.以下示例为设置设备属性API,设备异步返回结果,客户需要通过“数据流转”方式获取。

String accessKey = "***";
String accessSecret = "***";
try {
    DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
} catch (Exception e) {
    System.out.println("DefaultProfile exception");
}

IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKey, accessSecret);
DefaultAcsClient defaultAcsClient = new DefaultAcsClient(profile);

SetDevicePropertyRequest setDevicePropertyRequest = new SetDevicePropertyRequest();
// 如果使用实例,此处传入真实实例id;如果公共实例,不需要设置。
//createProductRequest.setIotInstanceId("iothub-test-xxx");
setDevicePropertyRequest.setProductKey(pk);
setDevicePropertyRequest.setDeviceName(dn);

Map<String, Integer> properties = new HashMap<>();
// key为物模型中属性标识符"acOutMeterIty",value需要遵循属性值规范:int类型,取值范围在0~200之间;
properties.put("acOutMeterIty", 98);
setDevicePropertyRequest.setItems(JSON.toJSONString(properties));

SetDevicePropertyResponse response = null;
try {
    response = defaultAcsClient.getAcsResponse(setDevicePropertyRequest);
} catch (Exception e) {
    Log.error("执行失败:e:" + e.getMessage());
}

System.out.println("===============");
System.out.println("setDeviceProperty request : " + JSON.toJSONString(setDevicePropertyRequest));
System.out.println("setDeviceProperty response : " + JSON.toJSONString(response.getData()));
System.out.println("setDeviceProperty requestId : " + response.getRequestId());
System.out.println("===============");           
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果,订阅方式和数据结构详见“3.2 数据流转”章节介绍。

关联介绍:“3.2.1 服务端订阅”中“重点说明”。

3.2 数据流转

平台提供两种数据流转方式:方式1)服务端订阅;方式2)规则引擎;

3.2.1服务端订阅(P0)

服务端订阅配置

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

“推送消息类型”选择“设备上报消息”,包括物模型属性上报、事件上报、设备下行指令结果(包括属性设置响应、服务控制响应)等消息。

消息格式详见文档:

https://help.aliyun.com/document_detail/73736.html
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

服务端订阅DEMO

接入说明:

https://help.aliyun.com/document_detail/143601.html
/**
 * AMQP服务端订阅
*/
//参数说明,请参见AMQP客户端接入说明文档。
String accessKey = "***";
String accessSecret = "***";
String consumerGroupId = "***";
//iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
String clientId = "TESTClientID";

//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "|authMode=aksign"
    + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//接入域名,请参见AMQP客户端接入说明文档。
String connectionUrl = "amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671?amqp.idleTimeout=80000";

Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}

private static MessageListener messageListener = new MessageListener() {
    @Override
    public void onMessage(Message message) {
        try {
            //1.收到消息之后一定要ACK。
            // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
            // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
            // message.acknowledge();
            //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
            // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
            executorService.submit(() -> processMessage(message));
        } catch (Exception e) {
            logger.error("submit task occurs exception ", e);
        }
    }
};

/**
 * 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
    try {
        byte[] body = message.getBody(byte[].class);
        String content = new String(body);
        String topic = message.getStringProperty("topic");
        String messageId = message.getStringProperty("messageId");
        System.out.println("AMQP receive message"
                           + ", topic = " + topic
                           + ", messageId = " + messageId
                           + ", content = " + content);
    } catch (Exception e) {
        logger.error("processMessage occurs error ", e);
    }
}

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
    /**
     * 连接成功建立。
     */
    @Override
    public void onConnectionEstablished(URI remoteURI) {
        logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
    }

    /**
     * 尝试过最大重试次数之后,最终连接失败。
     */
    @Override
    public void onConnectionFailure(Throwable error) {
        logger.error("onConnectionFailure, {}", error.getMessage());
    }

    /**
      * 连接中断。
      */
    @Override
    public void onConnectionInterrupted(URI remoteURI) {
        logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
    }

    /**
     * 连接中断后又自动重连上。
     */
    @Override
    public void onConnectionRestored(URI remoteURI) {
        logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

    @Override
    public void onSessionClosed(Session session, Throwable cause) {}

    @Override
    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

    @Override
    public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};

/**
 * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
 */
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
    Mac mac = Mac.getInstance(signMethod);
    mac.init(signingKey);
    byte[] rawHmac = mac.doFinal(toSignString.getBytes());
    return Base64.encodeBase64String(rawHmac);
}           

日志打印出订阅到的流转消息如下,符合预期。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

下行控制如果为异步服务,需要通过订阅数据流转获取设备返回结果。订阅Topic为"/sys/{productKey}/{deviceName}/thing/downlink/reply/message",需要根据"requestId"关联请求和响应。

关联介绍:“3.1 服务端调用API开发”中“重点说明”。
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

3.2.2 规则引擎数据订阅。

配置SQL

SQL介绍文档

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

调试SQL

Payload数据格式文档

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

可以查看“调试结果”。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

符合配置的SQL结果。

转发数据

可以转发到客户以下多种云产品中,本文选择AMQP作为示例验证。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍
物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

创建完成后,需要到规则列表页“启动”改规则。

订阅数据

服务端订阅代码可以复用上面“3.1”服务端订阅代码。差别就是服务端订阅,订阅的是Topic对应的完整Payload;而规则引擎流转AMQP,在消息流转过程可以对Payload做一些规则过滤或简单计算。

以下日志精简报文是通过规则引擎过滤后获取的数据。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

说明:同一组数据不要同时开通规则引擎和服务端订阅两种订阅模式,避免消息干扰。

4 设备运行时

设备量产之后,到达消费者手上,会开始激活上线进入到设备运行时。由于不属于开发态流程,本章节仅做简单介绍,目的是能让开发者知道开发态的配置在运行态如何产生作用,对设备接上阿里云IoT平台后的流程有个简单的认识。

物模型接入价值与实践物模型价值物模型接入实践物模型接入流程详细介绍

本文通过物模型接入流程,介绍了平台设备连接、物模型规范校验、物模型数据、规则引擎、服务端订阅、开放API六大基础能力。

设备全生命周期过程中,还有不少设备管理能力供客户选择,其中包括设备标签、设备分组、设备检索、OTA、设备运维、设备分发、文件上传、远程配置等,欢迎使用。

4.1 连接

设备连接过程,云端会对设备进行身份认证。

4.2 物模型规范校验

由于目前物模型配置仅提供强校验模式,物模型规范校验主要对设备上报的报文进行Alink协议解析、物模型数据规范校验。平台后续会陆续开放弱校验、免校验、数据去重能力。

关联阅读:1.3 物模型配置

4.3 设备管理能力

4.3.1 设备标签

介绍文档:

https://help.aliyun.com/document_detail/73733.html

4.3.2 设备分组

https://help.aliyun.com/document_detail/90386.html

4.3.3 OTA

https://help.aliyun.com/document_detail/85700.html

4.3.4 设备分发

https://help.aliyun.com/document_detail/143450.html

继续阅读