先丢個官網連結
本章部落格依賴官網
第一步,先放依賴:
//為了防止沖突,我們用exclusion将log4j依賴排除
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
同時也要将上面的storm-core裡的依賴排除
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
接下來我們先寫一個bolt方法
public class LogProcessBlot extends BaseRichBolt {
//需要發送是以将collector定義出去
OutputCollector collector;
@Override
//初始化
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
//因為kafka發送過來以byte方式,名為bytes,是以我們需要用getBinaryByField函數
byte[] bytes = input.getBinaryByField("bytes");
//傳回是一個位元組數組,我們把它轉成string類型
String s = new String(bytes);
//這邊列印測試
System.out.println("value : "+s.toString());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
再寫main方法,spout方法可以寫在main方法裡
public static void main(String[] args) {
//這邊官網可供參考
//設定zookeeper端口
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "project";
String zkRoot = "/" + topic;
String s = UUID.randomUUID().toString();
//建立一個SpoutConfig,裡面4個參數
//1.zookeeper端口
//2.topic名
//3.zookeeperRoot位址
//一個id,這邊我們取一個随機數
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic,zkRoot,s);
//這邊建立一個KafkaSpout,直接将上面的SpoutConfig丢進去
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
//建立一個builder
TopologyBuilder builder = new TopologyBuilder();
//builder操作連接配接Spout和Bolt
builder.setSpout("KafkaSpout",kafkaSpout);
builder.setBolt("LogProcessBlot",new LogProcessBlot()).shuffleGrouping("KafkaSpout");
//這邊我們以本地方式進行測試
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology",new Config(),builder.createTopology());
}
測試之前我們要先開啟zookeeper
./bin/zkServer.sh start
然後打開Kafka
./bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
建立topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic project
開啟生産者端
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic peojext
然後我們就可以開始測試了,開啟main方法,然後往生産者端生産資料,看控制台
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLxADNyAzN0kTM1IzMwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
資料被storm捕獲,這樣我們的Storm整合Kafka就完成了
如果你想将他偏移量設定為目前開始,隻需要将下列代碼設定到spoutconfig中