这篇博客主要讲述了如何通过OriginBot来看护宝宝,当宝宝的脸不在摄像头的范围之内时,发送消息到钉钉群组,通知家人及时查看。
前言
我在上个月有了宝宝,为了方便照看宝宝,就买了一个带有宝宝看护功能的摄像头,但是产品做的不怎么样,最重要的脸部遮挡功能用不了,后来就退货了。退货后就萌生了自己用OriginBot做一个类似功能的想法,于是就有了这篇博客~
功能流程图(架构图)
具体的流程或者说架构如下:
其实整体也不复杂,OriginBot上有一个MIPI摄像头,然后利用地平线TogetheROS.Bot的人体检测和跟踪来实时检测摄像头中有没有脸部,如果没有脸部,就向后端发送一条数据,然后后端会向钉钉群组发消息告诉家人。钉钉群组里面需要事先创建一个webhook。
下面会分为人体检测、判断有无脸部、后端操作三个部分来记录。
人体检测
这一部分用的是地平线TogetheROS.Bot现成的功能,启动OriginBot之后,在命令行中运行如下命令:
# 配置tros.b环境
source /opt/tros/setup.bash
# 从tros.b的安装路径中拷贝出运行示例需要的配置文件。
cp -r /opt/tros/lib/mono2d_body_detection/config/ .
# 配置MIPI摄像头
export CAM_TYPE=mipi
# 启动launch文件
ros2 launch mono2d_body_detection mono2d_body_detection.launch.py
此时可以通过http://IP:8000查看检测效果,这个模块检测了人体、人头、人脸、人手检测框、检测框类型和目标跟踪ID以及人体关键点等,我只取其中的人脸部分,当然了,以后也可以增加如人体检测等。
上面的命令运行之后,在OriginBot上执行ros2 topic list,应该会有一个topic叫做hobot_mono2d_body_detection, 这个就是我们需要的,我们会订阅这个话题,然后分析其中发送数据来判断有没有脸部
判断摄像头中有没有脸部
按照TogetheROS.Bot的文档说明,hobot_mono2d_body_detection的消息类型是ai_msgs.msg.PerceptionTargets, 具体如下:
# 感知结果
# 消息头
std_msgs/Header header
# 感知结果的处理帧率
# fps val is invalid if fps is less than 0
int16 fps
# 性能统计信息,比如记录每个模型推理的耗时
Perf[] perfs
# 感知目标集合
Target[] targets
# 消失目标集合
Target[] disappeared_targets
其中的disappeared_targets是我们关注的重点,如果face出现在disappeared_targets中,就说明之前是有脸部的,但是现在没有了,这个时候就要向后端发出数据进一步处理。
我为了判断有无脸部,写了一个Node,代码如下:
import rclpy
from rclpy.node import Node
from ai_msgs.msg import PerceptionTargets
from cv_bridge import CvBridge
import time
from api_connection import APIConnection
BabyMonitorMapping = {
# 这里的k-v要根据后端Django中的值来确定
"face": "看不到脸",
"body": "不在摄像头范围内",
}
class FaceDetectionListener(Node):
""" 检测宝宝的脸是不是在摄像头中 """
def __init__(self):
super().__init__("face_detection")
self.bridge = CvBridge()
self.subscription = self.create_subscription(
PerceptionTargets, "hobot_mono2d_body_detection", self.listener_callback, 10
)
self.conn = APIConnection()
self.timer = time.time()
self.counter = 0
def listener_callback(self, msg):
targets = msg.targets
disappeared_targets = msg.disappeared_targets
targets_list = []
disappeared_targets_list = []
if disappeared_targets:
for item in disappeared_targets:
disappeared_targets_list.append(item.rois[0].type)
if targets:
for item in targets:
targets_list.append(item.rois[0].type)
print(f"检测到的对象如下:{targets_list}")
print(f"消失的对象如下:{disappeared_targets_list}")
if disappeared_targets_list:
self.sending_notification(disappeared_targets_list)
def sending_notification(self, disappeared_targets_list):
for item in disappeared_targets_list:
if BabyMonitorMapping.get(item):
event = BabyMonitorMapping.get(item)
if self.counter == 0:
# 这里baby的ID是模拟的,应该去数据库中查
data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face-detection/")
self.counter += 1
self.timer = time.time()
else:
if time.time() - self.timer >= 60.0:
# 60秒不重复发消钉钉消息
data = {"event": event, "baby": "6b56979a-b2b9-11ee-920d-f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face-detection/")
self.timer = time.time()
self.counter += 1
def main(args=None):
rclpy.init(args=args)
try:
face_detection_listener = FaceDetectionListener()
rclpy.spin(face_detection_listener)
except KeyboardInterrupt:
print("终止运行")
finally:
face_detection_listener.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
代码整体不难,但还是做几点必要的说明:
1. BabyMonitorMapping
这个字典的作用是把TogetheROS.Bot里面的字段跟后端的字段做一个映射关系,方便后面使用
点击OriginBot家庭助理计划之宝宝看护助手 - 古月居可查看全文