天天看點

Storm案例程式(二)——整合KafkaStorm案例程式(二)——整合Kafka

Storm案例程式(二)——整合Kafka

文章目錄

  • Storm案例程式(二)——整合Kafka
      • 1.程式流程
      • 2.引入依賴
      • 3.列印logBolt與拓撲
本案例隻是storm整合Kafka,并将kafka中的消息列印在控制台,後續會更新其他案例程式,引入資料庫等

1.程式流程

Storm案例程式(二)——整合KafkaStorm案例程式(二)——整合Kafka

2.引入依賴

<!--    storm    -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.1.0</version>
        </dependency>

        <!--        kafka-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.0.0</version>
        </dependency>
           

3.列印logBolt與拓撲

/**
 * 列印從 Kafka 中擷取的資料
 */
public class LogConsoleBolt extends BaseRichBolt {


    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            System.out.println("received from kafka : "+ value);
            // 必須 ack,否則會重複消費 kafka 中的消息
            collector.ack(input);
        }catch (Exception e){
            e.printStackTrace();
            collector.fail(input);
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

           

拓撲

配置kafka的位址和主題名稱

public class LogTopology {

    //kafka的位址 主題名稱
    private static final String KAFKA_ADDR = "kafka的IP:9092";
    private static final String TOPIC_NAME = "demo_topic";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
        builder.setBolt("log_bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");


        // 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("LogTopology ", new Config(), builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("本地模式");
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LogTopology ",
                    new Config(), builder.createTopology());
        }
    }

    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
        return KafkaSpoutConfig.builder(bootstrapServers, topic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setRetry(getRetryService())
                .setOffsetCommitPeriodMs(10_000)
                .build();
    }

    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    }
}
           

執行便可在控制台看到storm消費kafka中主題為demo_topic的的消息