上一篇主要讲的是使用树莓派与移远BC35G模组的初始环境设置及调试而这一篇将要讲 Lettuce
IOT框架中的lettuce-Sea设备端代码解析。
lettuce-Sea设备端python代码讲解
上一节课我们虽然让树莓派与移远BC35G模组连接起来了,但是他们之间的通信还需要一个“驱动”来衔接。这个“驱动”要有以下特点:
- 能不间断的接收平台下发的指令。
- 能自动初始化NB-IoT的环境。
- 能对指令的结果进行解析,失败自动重试
- 能定时上报心跳
- 能集成硬件的驱动,以便于更好的控制硬件
- 能完美退出
这就是我们lettuce-Sea设备端所提供的能力。
先提供lettuce-Sea的源代码
https://github.com/lipuqi/lettuce-Sea
首先我们要在树莓派上搭建一个可以运行lettuce-Sea的python环境
所需环境清单
- 可运行python的环境,我安装的是3.7.3的版本,现在最新版本是3.7.4
- RPi.GPIO模块,这是树莓派使用python对GPIO编程的模块请自行百度安装。我是用pip安装的。
- pyserial模块,这是python的串口调试模块,也是必备的。我也是用pip安装的。
- 如果你心情好安装个git也没问题,毕竟来回拖拽也怪麻烦的。
接下来我们来讲一下lettuce-Sea的实现原理
第四篇我已经讲过了,AT指令有4个类型
常用的就是查询指令,执行指令(有参数),执行指令(无参数)
查询指令一般都是先返回结果,再返回OK
例AT+CGATT? -> + CGATT:1 -> OK
执行指令(有参数)一般是直接返回一个OK
例AT+CGATT=1 -> OK
执行指令(无参数)则跟查询指令相同
例AT+CSQ -> + CSQ:23,99 -> OK
还有就是接收数据
例 +NNMI:2,0001
我们写代码就是要将这种规律写成可以运行的程序。
lettuce-Sea是如何实现的呢?
首先lettuce-Sea是有4个线程的
主线程:主要是自动执行一些操作,例如开始运行的初始化,和子线程的开始等等。
监听线程:主要是在开启串口以后,就不断监听端口。看有没有新数据上报,如果有就进行解析分类,主要分两种。一种是主动执行的指令返回的值,例如查询指令,执行指令这些。
还有一种是被动接收的数据,比如平台下发一条指令给设备。设备就是从这个线程接收的,然后放到执行序列里。在lettuce-Sea中,执行序列就是一个列表类型的类属性。一会看代码就明白了。
执行线程:就是不断读取执行序列,看有没有数据,如果有数据,经设备调度分发给相应的设备驱动去执行。
上报心跳:这个就好理解了,就是定时向平台上报数据,告诉平台我还活着。
代码目录
我把AT指令,设备,和基础操作分为了不同的区域。
其中AT指令中ATBase是所有指令的父类,其他都是子类。其中NNMI是没有继承ATBase的,因为这个指令主要用于存放执行队列。
设备中Drive是调度类,Gpio是封装GPIO模块的类。led是led设备驱动,pi里有初始化方法,退出方法和心跳方法。
基础操作中SerialPort是封装pyserial模块的类,ReceiveMsg负责接收消息,ATBugUtil是主程序。
代码主要就是两个流程:一个是自动执行指令,一个是被动接收指令
先讲一下自动执行指令的生命周期
主程序发送一条指令给串口,这时主程序会阻塞等待结果(status属性为结果标识位,0为执行中,1为执行成功,2为失败)
ATBase.py
class ATBase:
def __init__(self, serialPort, receiveMsg):
self.serialPort = serialPort # 串口基础操作模块
self.receiveMsg = receiveMsg # 串口数据接收模块
self.at_name = "" # AT指令名称
self.at_error_result = None # AT指令返回信息校验(错误结果)
self.at_suc_result = None # AT指令返回信息校验(正确结果)
self.status = 0 # 执行结果(0未执行1执行成功2执行失败)
self.at_result_pattern = None # 匹配返回结果的正则类型
self.error = re.compile('ERROR') # 普通消息失败
self.ok = re.compile('OK') # 普通消息成功
self.result = None # 执行结果
self.retry = 3 # 重试次数
# 发送at基础方法
def send_at(self, data=None, at_type=None):
retry = self.retry
self.serialPort.write_data(self.at_name, data, at_type) # 调用串口基础操作-写入
self.receiveMsg.atObj = self # 将本类基本信息注入串口数据接收以便更新执行情况
self.status = 0 # 复位执行结果
time.sleep(1)
# 执行写入以后,主程序阻塞等待执行结果
while True:
if self.retry == 0:
print("执行失败,不再重试")
self.retry = retry
self.off_compile_result() # 自动复位
return 2
if self.status == 1:
print("执行成功")
self.retry = retry
self.off_compile_result() # 自动复位
return 1
elif self.status == 2:
print("执行失败,重试" + str(4 - self.retry))
self.retry -= 1
time.sleep(2)
return self.send_at(data, at_type) # 执行失败后递归调用
串口处理后,将结果返回对监听线程接收。监听线程判断此执行的指令是否需要校验(比如带返回值的可能就需要校验,直接返回OK/ERROR的就不需要)。无论是否校验都会返回一个结果标识位status属性,主程序获取到属性以后,如果成功则继续往下执行,失败则重试,我设定为3次重试。
ReceiveMsg.py
class ReceiveMsg:
def __init__(self, serialPort, nnmi, qlwevtind):
super(ReceiveMsg, self).__init__()
self.nnmi = nnmi # 接收数据模块
self.qlwevtind = qlwevtind # 接收平台状态模块
self.serialPort = serialPort # 串口基础操作模块
self.atObj = None # 发送AT指令的模块
self.quit_sys = 0 # 退出(此参数3个线程同步)
self.is_pause = 0 # 暂停(此参数3个线程同步)
# 处理消息的线程方法
def receive_data(self):
while self.quit_sys == 0:
while self.is_pause == 1:
pass
result = self.serialPort.read_data() # 接收数据
if result:
# 先匹配是否为上报数据和平台上报状态,不是则匹配是否为发送指令的回值
if self.nnmi.at_result_pattern.search(result):
at_result = result.split(":")
print("接到数据")
print(at_result[1])
self.nnmi.add_order(at_result[1])
elif self.qlwevtind.at_result_pattern.search(result):
at_result = result.split(":")
print("接到平台状态")
print(at_result[1])
self.qlwevtind.oc_analysis_msg(at_result[1])
elif self.atObj:
if self.atObj.compile_result(result):
self.atObj = None
else:
print("未设置匹配项数据")
print(result)
被动接收指令的生命周期
华为OC平台下发指令给通信模组,通信模组将消息以串口发给程序。程序的监听线程接收到消息后,添加到执行队列中(NNMI类)。
NNMI.py
class NNMI:
def __init__(self):
self.at_name = "NNMI"
self.at_result_pattern = re.compile(self.at_name)
self.wait_list = [] # 待处理信息列表
# 添加待处理信息到列表
def add_order(self, order):
if order in self.wait_list:
return
else:
self.wait_list.append(order)
# 从列表删除待处理信息
def del_order(self, order):
while order in self.wait_list:
self.wait_list.remove(order)
执行线程检测到执行队列有数据,读取数据后将数据从队列中删除。
Drive.py
class Drive:
def __init__(self, nnmi, receiveMsg, led, pi):
super(Drive, self).__init__()
self.nnmi = nnmi # 接收数据模块
self.receiveMsg = receiveMsg # 串口数据接收模块
self.led = led # 灯
self.pi = pi # 树莓派
# 循环处查询列表内任务线程方法
def order_monitor(self):
while self.receiveMsg.quit_sys == 0:
time.sleep(2)
while self.receiveMsg.is_pause == 1:
pass
if self.nnmi.wait_list:
for order in self.nnmi.wait_list:
self.nnmi.del_order(order)
self.analysis_msg(order)
执行线程解析消息,并将消息调度给对应的设备
Drive.py
# 处理任务调度硬件模块
def analysis_msg(self, data):
# 分割接收消息
msg_result = data.split(",")
msg_len = msg_result[0]
msg_data = msg_result[1]
if msg_len == "4":
message_id = int(msg_data[:2])
mid = msg_data[2:6]
value = int(msg_data[6:])
if message_id == 1:
self.led.set_mid(mid)
self.led.led_on_off(value)
elif message_id == 3:
self.led.set_mid(mid)
self.led.query_status()
elif message_id == 6:
self.pi.set_mid(mid)
self.pi.execute_quit()
else:
print("无解析类型")
else:
print("命令下发长度不符合")
设备接收到消息后进行相关处理。处理成功以后,发送响应给华为OC平台。
Led.py
class Led:
def __init__(self, ioObj, nmgs):
super(Led, self).__init__()
self.status = 0 # 当前灯的状态
self.ioObj = ioObj # GPIO设备基础操作模块
self.nmgs = nmgs # 发送消息模块
self.io_id = 4 # 当前设备使用的针脚编号
self.ioObj.setup_out_io(self.io_id) # 初始化IO口
self.mid = None # 命令下发的响应码
# 处理灯的开关命令
def led_on_off(self, data):
suc = "02" + self.mid + "00" + '%02x' % data # 成功的返回信息
fail = "02" + self.mid + "01" + '%02x' % data # 失败的返回信息
if data != 0 and data != 1:
print("开关灯只接受0/1")
self.nmgs.execute_at(fail)
else:
# 如果命令下发状态与当前灯状态一致则直接发送成功
if data == self.status:
self.nmgs.execute_at(suc)
else:
execute_result = self.ioObj.execute_output(self.io_id, data)
# 如果执行成功就响应成功
if execute_result == 1:
self.status = data
self.nmgs.execute_at(suc)
else:
self.nmgs.execute_at(fail)
下面解析部分代码
初始化:
监听线程和执行线程启动后再执行初始化,进入初始化时会先进入快速初始化,如果快速初始化失败,则会进入默认初始化。默认初始化执行重试3次仍然失败,则进入退出流程。成功则启动心跳线程。
Pi.py
# 系统初始化
def sys_init(self, index=0):
# 如果重试3次皆失败,判定初始化失败
if index == 3:
return 2
print("开始初始化")
self.cfun.execute_at("1") # 开启射频
self.csq.execute_at() # 查询信号
time.sleep(5)
cg_res = self.cgatt.integration_at("1") # 入网
# 查看入网是否成功
if cg_res == 2:
print("初始化遇到问题,开始默认初始化")
self.nconfig.execute_at("AUTOCONNECT,FALSE") # 手动入网模式
self.nrb.execute_at() # 重启
index += 1
self.sys_init(index) # 递归
print("结束初始化")
return 1
校验结果:
校验结果分三种
- 普通校验,如果发生没有校验项时则会进入普通校验,也就是校验OK/ERROR,OK为成功,ERROR为失败。
- 校验正确值,就是如果结果与设定值相同为执行成功,不相同为失败。这里会调用父类的校验正确值开关方法,将设定值入参。
- 校验错误值,就是如果结果与设定值相同为执行失败,不相同为成功。这里会调用父类的校验错误值开关方法,将设定值入参。
ATBugUtil.py
# 校验返回结果
def compile_result(self, result):
# 有关键字正则表示需要结果校验,没有则只判断成功/失败
if self.at_result_pattern:
if self.at_result_pattern.search(result):
at_result = result.split(":")
if self.at_error_result:
self.status = self.compile_error_result(at_result[1])
print(result)
elif self.at_suc_result:
self.status = self.compile_suc_result(at_result[1])
print(result)
else:
if self.ok.search(result):
self.status = 1
print(result)
return True
elif self.error.search(result):
self.status = 2
print(result)
return True
return False # 返回 True/False是因为表示指令执行成功
# 开启错误校验结果
def on_compile_error_result(self, error):
self.at_result_pattern = re.compile(self.at_name)
self.at_error_result = error
# 校验错误结果
def compile_error_result(self, data):
self.result = data
if data != self.at_error_result:
return 1
return 2
# 开启正确校验结果
def on_compile_suc_result(self, suc):
self.at_result_pattern = re.compile(self.at_name)
self.at_suc_result = suc
# 校验正确结果
def compile_suc_result(self, data):
self.result = data
if data == self.at_suc_result:
return 1
return 2
在发送指令时,会将指令本身发给监听线程类,监听线程接收到数据以后会读取发给他的指令本身里的校验值。并根据不同的校验值执行不同的校验方法。
核心点就是这些的,其他的还需要开发者自己去专研。