最开始使用storm命令来启动topology, 如下
storm jar storm-starter-0.0.1-snapshot-standalone.jar storm.starter.wordcounttopology
这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用python写, 简单? 可以直接使用storm命令?
这儿的klass就是topology类, 所以java命令只是调用topology类的main函数
直接看看wordcounttopology例子的main函数都执行什么?
除了定义topology, 最终会调用stormsubmitter.submittopology(args[0], conf, builder.createtopology()), 来提交topology
直接看看submittopology,
1. 配置参数
把命令行参数放在stormconf, 从conf/storm.yaml读取配置参数到conf, 再把stormconf也put到conf, 可见命令行参数的优先级更高
将stormconf转化为json, 因为这个配置是要发送到服务器的
2. submit jar
先判断topologynameexists, 通过thrift client得到现在运行的topology的状况, 并check
然后submit jar, 通过底下三步
client.getclient().beginfileupload();
client.getclient().uploadchunk(uploadlocation, bytebuffer.wrap(tosubmit));
client.getclient().finishfileupload(uploadlocation);
把数据通过rpc发过去, 具体怎么存是nimbus自己的逻辑的事...
3. submit topology
很简单只是简单的调用rpc
client.getclient().submittopologywithopts(name, submittedjar, serconf, topology, opts);