天天看点

spark streaming 与 kafka 集成测试

版本: kafka:2.11 spark:2.0.2 测试过程: 1、开发spark streaming程序,读取kafka队列数据,并进行处理; 2、启动spark、zookeeper及kafka; 3、启动log4j输出到kafka的程序,先用kafka receive console程序验证其正确性; 4、启动spark streaming程序,观察执行效果,启动命令如下: spark-submit --class com.itown.bigdata.kafka.KafkaReader /usr/hadoop/jar/sparkApp-0.0.1-SNAPSHOT-jar-with-dependencies.jar   开发过程: 1、java类: 注意: 参照 http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 进行的开发   package com.itown.bigdata.kafka;   import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.regex.Pattern;   import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies;   import scala.Tuple2;   import com.google.common.collect.Lists;   public class KafkaReader { static final Pattern SPACE = Pattern.compile(" ");   public static void main(String[] args) { // 每个话题的分片数 int numThreads = 2; SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount") .setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));   Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);   Collection<String> topics = Arrays.asList("test", "test2");   final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils .createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));   JavaDStream<String> words = stream .flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() { public Iterator<String> call( ConsumerRecord<String, String> t) throws Exception { System.out.println(">>>" + t.value());   return Lists.newArrayList(SPACE.split(t.value())) .iterator(); } });   // 对其中的单词进行统计 JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } });   // 打印结果 wordCounts.print();   try { jssc.start(); jssc.awaitTermination(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 2、maven pom.xml 注意: 1)spark-streaming-kafka的引用部分 2)打包同时将依赖包也打了进来 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>   <groupId>com.itown.bigdata</groupId> <artifactId>sparkApp</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>   <name>sparkApp</name> <url>http://maven.apache.org</url>   <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>   <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin>   <plugin> <artifactId> maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>   </plugins> </build>   <repositories> <repository> <id>central</id> <name>central</name> <url>http://central.maven.org/maven2/</url> <releases> <enabled>true</enabled> </releases> </repository> </repositories> </project>          

继续阅读