1.案例需求
实现自增数字相加的和 1+2+3+4+5+6+........
2.需求分析
Spout来发送数字作为input
使用Bolt来实现求和逻辑
将结果输出到控制台
3.导入Storm的pom依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
4.具体代码实现
package cn.ysjh;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/*
使用Storm实现求和功能
*/
public class SumStorm {
/*
Spout需要继承Base
*/
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector spoutOutputCollector;
/*
初始化方法,只会被调用一次
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector=spoutOutputCollector;
}
int num=0;
/*
会产生数据,在实际生产中肯定是从消息队列中获取数据
这个方法是一个死循环,会一直运行
*/
@Override
public void nextTuple() {
this.spoutOutputCollector.emit(new Values(++num));
System.out.println("数据:"+num);
//防止数据产生太快
Utils.sleep(1000);
}
/*
声明输出字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//这里要和上边nextTuple()方法中的Values中的对应
outputFieldsDeclarer.declare(new Fields("number"));
}
}
/*
数据的累计求和Bolt:接收数据并处理
*/
private static class SumBolt extends BaseRichBolt {
/*
初始化方法,会被执行一次
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
int sum=0;
/*
获取Spout发送过来的数据
Bolt中获取值可以根据index获取,也可以根据上一个环节中定义的fields名称获取,建议使用后一种方法
*/
@Override
public void execute(Tuple tuple) {
Integer Num = tuple.getIntegerByField("number");
sum+=Num;
System.out.println("Sum:"+sum);
}
/*
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
public static void main(String[] args){
//TopologyBuilder根据Spout和Bolt来构建topology,Storm中任何一个作业都是通过Topology的方式进行提交的,Topology中需要指定Spout和Bolt执行的顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
//创建本地模式,只需使用LocalCluster类
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SumStorm",new Config(),builder.createTopology());
}
}
可以看出先实现Spout和Bolt,最后实现Topology进行本地运行
注意:
这里是本地运行模式,不需要Storm集群环境,如果要提交到集群上运行,最后的LocalCluster需要修改为StormSubmitter将Topology提交到集群上运行
5.运行截图