天天看點

Storm整合Kafka

先丢個官網連結

本章部落格依賴官網

第一步,先放依賴:

//為了防止沖突,我們用exclusion将log4j依賴排除

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.9.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.0</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.12.0</version>
    </dependency>
           

同時也要将上面的storm-core裡的依賴排除

<dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>${storm.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>log4j-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
           

接下來我們先寫一個bolt方法

public class LogProcessBlot extends BaseRichBolt {
	//需要發送是以将collector定義出去
    OutputCollector collector;
    @Override
    //初始化
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
    	//因為kafka發送過來以byte方式,名為bytes,是以我們需要用getBinaryByField函數
        byte[] bytes = input.getBinaryByField("bytes");
        //傳回是一個位元組數組,我們把它轉成string類型
        String s = new String(bytes);
        //這邊列印測試
        System.out.println("value : "+s.toString());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
           

再寫main方法,spout方法可以寫在main方法裡

public static void main(String[] args) {
		//這邊官網可供參考
		//設定zookeeper端口
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        String topic = "project";
        String zkRoot = "/" + topic;
        String s = UUID.randomUUID().toString();
        //建立一個SpoutConfig,裡面4個參數
        //1.zookeeper端口
        //2.topic名
        //3.zookeeperRoot位址
        //一個id,這邊我們取一個随機數
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic,zkRoot,s);
		//這邊建立一個KafkaSpout,直接将上面的SpoutConfig丢進去
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

		//建立一個builder
        TopologyBuilder builder = new TopologyBuilder();
        //builder操作連接配接Spout和Bolt
        builder.setSpout("KafkaSpout",kafkaSpout);
        builder.setBolt("LogProcessBlot",new LogProcessBlot()).shuffleGrouping("KafkaSpout");
		
		//這邊我們以本地方式進行測試
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("StormKafkaTopology",new Config(),builder.createTopology());
    }
           

測試之前我們要先開啟zookeeper

./bin/zkServer.sh start
           

然後打開Kafka

./bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
           

建立topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic project
           

開啟生産者端

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic peojext
           

然後我們就可以開始測試了,開啟main方法,然後往生産者端生産資料,看控制台

Storm整合Kafka

資料被storm捕獲,這樣我們的Storm整合Kafka就完成了

如果你想将他偏移量設定為目前開始,隻需要将下列代碼設定到spoutconfig中

繼續閱讀