天天看點

storm與kafka進行內建

接上篇 storm的安裝,本篇進行storm與kafka進行內建工作。

噴口(spout)是流的來源。 例如,spout可能會讀取卡夫卡主題中的元組并将其作為流發送。 Bolts消耗輸入流,處理并可能發射新的流。 Bolts可以做任何事情,從運作功能,過濾元組,流聚合,流式連接配接,與資料庫互動等等。 Storm拓撲中的每個節點并行執行。 一個拓撲無限期地運作,直到終止它。 Storm會自動重新配置設定任何失敗的任務。 此外,即使機器停機并且資訊丢失,Storm也可以保證不會丢失資料。

有三個主要類将Kafka和Storm結合在一起。 他們如下 

BrokerHosts - ZkHosts&StaticHosts

BrokerHosts

是一個接口,

ZkHosts

StaticHosts

是它的兩個主要實作。 ZkHosts用于通過在ZooKeeper中維護詳細資訊來動态跟蹤Kafka經紀人,而

StaticHosts

用于手動/靜态設定Kafka經紀人及其詳細資訊。 ZkHosts是通路Kafka經紀人的簡單而快捷的方式。

是以我這裡選取了ZkHosts的方式進行建構內建。

(1)主要流程:(簡單的描述)其實前面從webServer到kafka這條路已經打通了,在這篇文章裡,這篇文章主要是對kafka到storm這條路如何打通進行講解,在下一篇中會再将怎麼持久化到hdfs中的。

storm與kafka進行內建

ZkHosts的簽名如下 -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
           

其中

brokerZkStr

是ZooKeeper主機,

brokerZkPath

是維護Kafka代理細節的ZooKeeper路徑。本文中我選取了第二種方式,下面是我的配置:

//配置zk連接配接
        String zkConnString = "master:2181";

        BrokerHosts hosts = new ZkHosts(zkConnString);
        
        //Spout配置 分别是zk主機位置  kafka的topic,然後一個zkRoot ,另外一個就是id
        SpoutConfig spoutConfig = new SpoutConfig(hosts, "testtopics", "", "1");
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
           

然後下面就是設定spout和bolt的過程了

TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("kafkaspout", kafkaSpout).setNumTasks(2);
        builder.setBolt("spilt-bolt", new SplitBolt(), 2).shuffleGrouping("kafkaspout").setNumTasks(2);

        Config config = new Config();
        config.setNumWorkers(2);
        config.setDebug(true);
        
        //3、送出任務,分為本地模式、叢集模式,這裡我們設定了送出到叢集中去
        StormSubmitter.submitTopologyWithProgressBar("mywordcount",config,builder.createTopology());
           

(2)pom檔案:這裡花費是時間是最多的,因為版本以及包之間的比對問題,調來調去調了一整天才調通的。(裡面包含了對下一篇hdfs中的操作需要的包)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.storm</groupId>
  <artifactId>storms</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>storms</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
 
     <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.0.0</version>
    <scope>provided</scope>
</dependency>

    
	<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.1</version>
    </dependency>
     -->
     
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
		<dependency>
		    <groupId>org.apache.hadoop</groupId>
		    <artifactId>hadoop-common</artifactId>
		    <version>2.7.3</version>
		</dependency>
		
		<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>
        
        
  		<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

	<dependency>
	    <groupId>org.apache.storm</groupId>
	    <artifactId>storm-hdfs</artifactId>
	    <version>1.2.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-hdfs</artifactId>
                </exclusion>
                
            </exclusions>
        </dependency>
    </dependencies>

		<build>
		<plugins>
		<plugin>
		    <groupId>org.apache.maven.plugins</groupId>
		    <artifactId>maven-shade-plugin</artifactId>
		    <version>3.2.0</version>
		    <configuration>
		        <createDependencyReducedPom>true</createDependencyReducedPom>
		    </configuration>
		    <executions>
		        <execution>
		            <phase>package</phase>
		            <goals>
		                <goal>shade</goal>
		            </goals>
		            <configuration>
		                <transformers>
		                    <transformer
		                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
		                    <transformer
		                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
		                        <mainClass></mainClass>
		                    </transformer>
		                </transformers>
		            </configuration>
		        </execution>
		    </executions>
		</plugin>
		</plugins>
		</build>

</project>
           

(3)啟動測試:

開啟ZK(叢集中都要開) kafka(叢集中都要開) storm(一個nimbus 其餘為supervisor)

啟動一個生産者:

bin/kafka-console-producer.sh --broker-list master:9092 --topic testtopics
           

啟動Storm jar

storm jar software/storms-0.0.1-SNAPSHOT.jar com.storm.storm2.WCApp
           

在生産者中輸入:

storm與kafka進行內建

在/opt/apache-storm-1.1.3/logs/workers-artifacts/mywordcount-18-1563413651/6700/worker.log 中會發現正常輸出,并且沒有報錯就說明成功了:

storm與kafka進行內建

如果裡面報錯了,檢視錯誤的時候要是一台supervisor上的錯誤看不出什麼東西來的話,那就去另一台supervisor上去看,說不定會發現什麼報錯的原因,然後去解決。這裡簡單列幾個問題和解決方案:

    1、You're probably bundling the Storm jars with your topology jar.

          删除 /Apache-storm2.0.0/lib下的對應jar包即可

    2、當重新部署新的版本時,可能出現Exception in thread "main" org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts ["localhost"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?、

    的錯誤,這時可以用這個方法解決

    問題原因:

    由于是重新部署的storm,原先舊的storm已在zookeeper中注冊了資訊,隻要進入zookeeper用戶端将storm删除:

    ./bin/zkCli.sh

    ls / 

    rmr /storm

    重新開機storm叢集即可。

3、錯誤集錦(主要是一些jar包缺失導緻的錯誤)

https://www.jianshu.com/p/70c3a7f56386

繼續閱讀