本文属于介绍 NWPC 消息平台 系列文章。
本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息。
介绍
数值预报业务系统产品制作一般分为三个步骤:
- 监视输出:检测模式积分任务是否输出计算结果,使用单一任务循环检查文件是否存在,一般运行在 HPC 的登录节点
- 生成产品:根据模式输出结果制作产品,一般运行在 HPC 的串行和并行计算节点中
- 分发产品:文件拷贝或 FTP 传输,需要与 HPC 外部通讯,一般运行在 HPC 的串行节点或登录节点
数值预报业务系统产品制作流程
NWPC 消息平台目前使用两种产品事件消息,对应于上述不同步骤:
- 面向 NMC 监控平台的表示 GRIB 2 产品已生成的消息,对应步骤 2
- 面向 NWPC 消息系统的表示 GRIB 2 产品已上传到二级存储的消息,对应步骤 3
数值预报业务系统根据事件消息的应用场景,使用不同方式发送以上两种产品事件消息。
方案
本节介绍将事件消息发送集成到数值预报业务系统中的两种方式。
脚本集成
将事件消息发送命令加入到已有任务脚本中,是最简单的集成方式,无需对现有系统的结构进行修改。
GRIB 2 产品生成消息目前使用这种方式。该消息用于统计产品生成的时间,需要在产品生成后尽快发送。最直观的方法就是将消息发送命令放到产品制作任务脚本的最后。但实践中我们往往将产品生成与分发拆分为两个相关联的任务:产品制作任务只负责产品生成,不与系统外部进行交互;产品分发任务由产品制作任务驱动,与外部进行通讯。产品制作任务可能运行在无法与外界进行通讯的计算节点上。所以 NWPC 消息平台将产品生成消息发送命令放到分发任务脚本的开头,既符合将产品生成与分发相分离的要求,也能在第一时间发送消息,不受后续分发操作的影响。
这种方式存在一定的弊端。
(1) 如果产品分发任务执行失败并重新运行,对于同一个产品文件,会产生多条消息记录,需要后端应用对重复消息进行额外处理。
(2) 业务系统为了保证分发任务执行稳定,会限制同时运行的任务数。分发任务可能会因作业数限制而延迟启动,影响产品生成消息按时发送。另外,某些极端情况下,上传任务持续处于运行状态,后续任务无法运行,导致产品生成消息无法发送。
消息发送任务
为产品事件创建单独的发送任务可以解决消息重复发送的问题。
GRIB 2 产品完成上传二级存储的消息目前使用这种方式。该消息计划用于驱动部署在气象大数据云平台加工流水线上的产品制作任务,使后续系统无需重复监视二级存储中文件的到达情况。产品上传任务一般运行在 HPC 的串行计算节点或其它专用传输节点上,受队列节点数限制。而消息上传任务运行时间通常在 2 秒以内,资源消耗较少,所以可以直接在 ecFlow 服务运行节点(即 HPC 登录节点)上运行,不占用计算节点,不会因作业排队而带来额外的时间延迟。
这种方式也存在一定的弊端。
(1) 额外创建任务需要重新设计现有 ecFlow 系统,不方便进行集成。
(2) 如果为发送消息增加大量任务,可能会给整个 ecFlow 系统带来额外负担,包括处理依赖关系、生成作业脚本等,同时也不利于系统维护。
关键技术实现
消息命令行工具
NWPC 的数值预报业务系统任务脚本都是 shell 脚本,在 ecFlow 系统中发送产品事件消息最合适的方式就是使用命令行程序。
NWPC 消息平台分别为 NMC 和 NWPC 两套消息系统开发命令行程序,实现在 CMA-PI 上向消息中间件 Kafka 和 RabbitMQ 发送事件消息。
nmc-message-client
nmc_monitor_client send
命令使用 kafka-go 库连接 NMC 监控平台的 Kafka 服务,发送产品生成消息,支持多个 Kafka 节点。
https://github.com/segmentio/kafka-go
产品生成消息中的各个参数由命令参数提供:
-
:系统名称source
-
:消息类型type
-
:事件状态,只发送完成状态 (0)status
-
:文件名file-name
-
:文件绝对路径absolute-data-name
-
:起报时次start-time
-
:预报时效forecast-time
为了保证消息发送异常时不影响后面的产品上传任务,增加
--ignore-error
选项忽略命令执行过程中的所有错误。
下面是 GRAPES GFS 后处理系统中发送产品生成消息的命令调用。
/g1/u/nwp_pd/nmc_message_client/bin/nmc_monitor_client send \ --target "host1:9092,host2:9092,host3:9092" \ --source nwpc_grapes_gfs \ --type prod_grib \ --status 0 \ --file-name gmf.gra.${init_time}${forecast_time}.grb2 \ --absolute-data-name ${run_dir}/output/grib2_orig/gmf.gra.${init_time}${forecast_time}.grb2 \ --start-time ${init_time} \ --forecast-time ${forecast_time} \ --ignore-error \ --debug
nwpc-message-client
nwpc_message_client production
命令使用 amqp 库向 NWPC 消息系统的 RabbitMQ 服务发送产品完成上传消息。
https://github.com/streadway/amqp
产品完成上传消息中的各个参数由命令参数提供:
-
:系统名system
-
:产品流production-stream
-
:产品类型production-type
-
:产品名称production-name
-
:事件名event
-
:起报时次start-time
-
:预报时效forecast-time
下面是 GRAPES GFS 后处理系统中发送产品完成上传二级存储消息的命令调用。
/path/to/nwpc_message_client production \ --system grapes_gfs_gmf \ --production-stream oper \ --production-type grib2 \ --production-name orig \ --event storage \ --status complete \ --start-time ${init_time} \ --forecast-time "${forecast_hour}h" \ --rabbitmq-server ${NWPC_MESSAGE_CLIENT_RABBITMQ_ADDRESS} \ --broker-address ${NWPC_MESSAGE_CLIENT_BROKER_ADDRESSS}
脚本集成
在上传任务脚本中集成消息发送命令,为了保证消息发送与上传任务互不影响,将发送命令放到脚本开头,设置超时时间,并忽略发送命令的错误。使用与产品上传一样的标识变量控制是否发送消息,避免在测试期间发送无效消息。
if [ ${FLAG_UPLOAD_ORIG} == ".true." ]; then
/g1/u/nwp_pd/nmc_message_client/bin/nmc_monitor_client send \ # ...skip commands...
fi
if [ ${FLAG_UPLOAD_ORIG} == ".true." ]; then
# do some ftp
fi
消息发送任务
单独的消息发送任务由其他任务(比如产品上传)触发,在 ecFlow 服务所在的节点上直接运行,不占用计算资源。
GRAPES GFS 产品后处理系统中使用单独的任务发送产品上传消息
同样也使用标识变量控制消息发送,并忽略发送消息时发生的任何错误。
if [ ${FLAG_STORAGE_ARCHIVE_GRIB2} == ".true." ]; then
echo "Send message to nwpc message broker and ignore any errors..."
set +e
source ${NWPC_MESSAGE_CLIENT_CONFIG_SCRIPT}
${NWPC_MESSAGE_CLIENT_BIN} production \ # ...skip commands...
set -e
fi
应用
目前已在 GRAPES 确定性模式系统中应用产品事件消息。其中 GRAPES GFS、GRAPES MESO 10KM、GRAPES 3KM 发送两种消息,GRAPES TYM 仅发送产品上传消息。
GRAPES 模式业务系统中产品事件消息发送的两种方式
参考
NWPC 消息平台项目
nwpc-oper/nwpc-message-client
https://github.com/nwpc-oper/nwpc-message-client
nwpc-oper/nmc-message-client
https://github.com/nwpc-oper/nmc-message-client
nwpc-oper/nwpc-message-tool
https://github.com/nwpc-oper/nmc-message-tool
产品事件消息
《NWPC消息平台:产品事件消息》
《适用于NMC监控平台的数值预报产品消息》
命名行工具
《使用Cobra构建命令行程序》
《动态解析命令行参数》
《使用kafka-go连接Kafka》
题图由 renategranade0 在 Pixabay 上发布。