接上篇 storm的安裝,本篇進行storm與kafka進行內建工作。
噴口(spout)是流的來源。 例如,spout可能會讀取卡夫卡主題中的元組并将其作為流發送。 Bolts消耗輸入流,處理并可能發射新的流。 Bolts可以做任何事情,從運作功能,過濾元組,流聚合,流式連接配接,與資料庫互動等等。 Storm拓撲中的每個節點并行執行。 一個拓撲無限期地運作,直到終止它。 Storm會自動重新配置設定任何失敗的任務。 此外,即使機器停機并且資訊丢失,Storm也可以保證不會丢失資料。
有三個主要類将Kafka和Storm結合在一起。 他們如下
BrokerHosts - ZkHosts&StaticHosts
BrokerHosts
是一個接口,
ZkHosts
和
StaticHosts
是它的兩個主要實作。 ZkHosts用于通過在ZooKeeper中維護詳細資訊來動态跟蹤Kafka經紀人,而
StaticHosts
用于手動/靜态設定Kafka經紀人及其詳細資訊。 ZkHosts是通路Kafka經紀人的簡單而快捷的方式。
是以我這裡選取了ZkHosts的方式進行建構內建。
(1)主要流程:(簡單的描述)其實前面從webServer到kafka這條路已經打通了,在這篇文章裡,這篇文章主要是對kafka到storm這條路如何打通進行講解,在下一篇中會再将怎麼持久化到hdfs中的。
ZkHosts的簽名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中
brokerZkStr
是ZooKeeper主機,
brokerZkPath
是維護Kafka代理細節的ZooKeeper路徑。本文中我選取了第二種方式,下面是我的配置:
//配置zk連接配接
String zkConnString = "master:2181";
BrokerHosts hosts = new ZkHosts(zkConnString);
//Spout配置 分别是zk主機位置 kafka的topic,然後一個zkRoot ,另外一個就是id
SpoutConfig spoutConfig = new SpoutConfig(hosts, "testtopics", "", "1");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
然後下面就是設定spout和bolt的過程了
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout", kafkaSpout).setNumTasks(2);
builder.setBolt("spilt-bolt", new SplitBolt(), 2).shuffleGrouping("kafkaspout").setNumTasks(2);
Config config = new Config();
config.setNumWorkers(2);
config.setDebug(true);
//3、送出任務,分為本地模式、叢集模式,這裡我們設定了送出到叢集中去
StormSubmitter.submitTopologyWithProgressBar("mywordcount",config,builder.createTopology());
(2)pom檔案:這裡花費是時間是最多的,因為版本以及包之間的比對問題,調來調去調了一整天才調通的。(裡面包含了對下一篇hdfs中的操作需要的包)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.storm</groupId>
<artifactId>storms</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storms</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(3)啟動測試:
開啟ZK(叢集中都要開) kafka(叢集中都要開) storm(一個nimbus 其餘為supervisor)
啟動一個生産者:
bin/kafka-console-producer.sh --broker-list master:9092 --topic testtopics
啟動Storm jar
storm jar software/storms-0.0.1-SNAPSHOT.jar com.storm.storm2.WCApp
在生産者中輸入:
在/opt/apache-storm-1.1.3/logs/workers-artifacts/mywordcount-18-1563413651/6700/worker.log 中會發現正常輸出,并且沒有報錯就說明成功了:
如果裡面報錯了,檢視錯誤的時候要是一台supervisor上的錯誤看不出什麼東西來的話,那就去另一台supervisor上去看,說不定會發現什麼報錯的原因,然後去解決。這裡簡單列幾個問題和解決方案:
1、You're probably bundling the Storm jars with your topology jar.
删除 /Apache-storm2.0.0/lib下的對應jar包即可
2、當重新部署新的版本時,可能出現Exception in thread "main" org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts ["localhost"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?、
的錯誤,這時可以用這個方法解決
問題原因:
由于是重新部署的storm,原先舊的storm已在zookeeper中注冊了資訊,隻要進入zookeeper用戶端将storm删除:
./bin/zkCli.sh
ls /
rmr /storm
重新開機storm叢集即可。
3、錯誤集錦(主要是一些jar包缺失導緻的錯誤)
https://www.jianshu.com/p/70c3a7f56386