天天看点

Storm-源码分析-Topology Submit-Nimbus

nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动 

看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main 

nimbus是用clojure实现的, 但是clojure是基于jvm的, 所以在最终发布的时候会产生nimbus.class, 所以在用户使用的时候完全可以不知道clojure, 看上去所有都是java 

clojure只是用于提高开发效率而已.

1. service-handler

这个macro两个参数, 结合例子, name = service-handler, body就是后面所有的,包括参数和函数体 

定义匿名函数 fn[conf inimbus] (……) 

定义函数defn service-handler [& args], 里面只是简单的调用fn…使用这个macro和直接定义defn service-handler [conf inimbus]几乎没有啥区别 

我有个疑问, 为什么要定义这个无聊的macro, 难道就是为了便于后面的exception处理

在service-handler函数里面最主要就是实现nimbusiface接口(backtype.storm.generated.nimbusiface接口(backtype.storm.generated.nimbusiface, $在class文件里面就是这样写的, 应该是java的命名规则)

2. server

生成server option参数, 使用tnonblockingserversocket, 定义的work thread数目, 使用的protocol和使用的processor 

其中processor, 是server上主要的处理进程, 使用传入的service-handler进行数据处理

最终启动nimbus server.

nimbus server已经启动, 剩下就是处理从client传来的rpc调用, 关键就是nimbus$iface的实现

在下面的实现中总是用到nimbus这个变量, nimbus-data, 用于存放nimbus相关配置和全局的参数

接着重点看下submittopology, 

4个参数, 

^string storm-name, storm名字 

^string uploadedjarlocation, 上传jar的目录  

^string serializedconf, 序列化过的conf信息 

^stormtopology topology, topology对象(thrift对象), 由topologybuilder产生

1. system-topology!

validate topology, 比如使用的comonentid, steamid是否合法 

添加系统所需要的component, 比如acker等, 不过没有用到, 不知道为什么要调用system-topology!

2. 建立topology的本地目录 (这步开始需要lock互斥)

jars and configs are kept on local filesystem because they're too big for zookeeper. the jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}

Storm-源码分析-Topology Submit-Nimbus

3. 建立zookeeper heartbeats

就是按照下面图示在zookeeper建立topology的心跳目录

Storm-源码分析-Topology Submit-Nimbus

4. start-storm, 产生stormbase

虽然叫做start-storm, 其实做的事情只是把stormbase结构序列化并放到zookeeper上 

这个stormbase和topology对象有什么区别, 

topology对象, topology的静态信息, 包含components的详细信息和之间的拓扑关系, 内容比较多所以存储在磁盘上stormcode.ser 

而stormbase, topology的动态信息, 只记录了launch时间, status, worker数, component的executor数运行态数据, 比较mini, 所以放在zk上

重上面可以看出stormbase是定义的一个record, 包含storm-name, 当前时间戳, topology的初始状态(active或inactive), worker数目, 和executor的数目 

其中计算num-executors, 使用->>, 其实等于(map-val num-start-executors (all-components topology)), map-value就是对(k,v)中的value执行num-start-executors, 而这个函数其实就是去读componentcommon里面的parallelism_hint, 所以num-executors, 描述每个component需要几个executors(线程)

最终调用activate-storm!将storm-base序列化后的数据存到zookeeper的"/storms/id”目录下 

Storm-源码分析-Topology Submit-Nimbus

5. mk-assignments

<a href="http://www.cnblogs.com/fxjwind/archive/2013/06/19/3144246.html">storm-源码分析-topology submit-nimbus-mk-assignments</a>

本文章摘自博客园,原文发布日期:2013-06-19