Storm案例程式(二)——整合Kafka
文章目錄
- Storm案例程式(二)——整合Kafka
-
-
- 1.程式流程
- 2.引入依賴
- 3.列印logBolt與拓撲
-
本案例隻是storm整合Kafka,并将kafka中的消息列印在控制台,後續會更新其他案例程式,引入資料庫等
1.程式流程
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLwEEVPh3a61UeRpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLyYjM5EDOzcTMzIzMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2.引入依賴
<!-- storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- kafka-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.0.0</version>
</dependency>
3.列印logBolt與拓撲
/**
* 列印從 Kafka 中擷取的資料
*/
public class LogConsoleBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
try {
String value = input.getStringByField("value");
System.out.println("received from kafka : "+ value);
// 必須 ack,否則會重複消費 kafka 中的消息
collector.ack(input);
}catch (Exception e){
e.printStackTrace();
collector.fail(input);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
拓撲
配置kafka的位址和主題名稱
public class LogTopology {
//kafka的位址 主題名稱
private static final String KAFKA_ADDR = "kafka的IP:9092";
private static final String TOPIC_NAME = "demo_topic";
public static void main(String[] args) {
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
builder.setBolt("log_bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
// 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("LogTopology ", new Config(), builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogTopology ",
new Config(), builder.createTopology());
}
}
private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
return KafkaSpoutConfig.builder(bootstrapServers, topic)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setRetry(getRetryService())
.setOffsetCommitPeriodMs(10_000)
.build();
}
private static KafkaSpoutRetryService getRetryService() {
return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
}
}
執行便可在控制台看到storm消費kafka中主題為demo_topic的的消息