文章目录
-
- 前言
- pom依赖版本
- Demo
前言
网上有部分案例是关于手动设置kafka中offset的,不过大多采用的是0.8的kafka版本,采用scala的编写,kafka-0.10版本的鲜有提及,或者都不完整。0.10版本是可以兼容之前的,但是新版本的api确实要更简洁易用。找不到可以直接拿来就用的,所以这里抽时间看官网api,写个较为完整的JAVA测试用例,希望对大家有帮助。
pom依赖版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
Demo
话不多说上代码。注释较少,不过代码命名就是很好的注释了。
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
public class WordCount {
private static String groupId = "group1";
private static Map<TopicPartition,Long> offsets = new HashMap<>();
// 消费者组根路径
private static Collection<String> topics = Arrays.asList( "mytest-topic","dc01");
private static String zkServerUrls = "dc-sit-225:2181";
// 实际应用中需要序列化存储,不然zkCli.sh读取会乱码,相关组件也无法识别
private static ZkClient zkClient = new ZkClient(zkServerUrls);
// 消费者组根路径,kafka-manage会读取该路径
private static String rootPath = "/consumers" + groupId + "/offsets/";
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
initOffset(zkClient);
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
// 配置主题,kafka参数,offsets
ConsumerStrategies.<String, String>Subscribe(topics, initKafkaParams(),offsets)
);
JavaDStream<String> words = stream.flatMap(x -> updateOffset(x));
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
public static void initOffset(ZkClient zkClient){
if (!zkClient.exists(rootPath)){
for (String topic : topics){
String path = rootPath + "/" + topic ;
zkClient.createPersistent(path,true);
}
}else {
List<String> topicSet = zkClient.getChildren(rootPath);
for (String topic : topicSet){
String topicPath = rootPath + "/" + topic;
List<String> partitionSet = zkClient.getChildren(topicPath);
for (String partition : partitionSet){
Long offset = zkClient.readData(topicPath + "/" + partition);
TopicPartition topicPartition = new TopicPartition(topic, Integer.valueOf(partition));
// 注意这里offset需要往后移一位,不然会出现重复消费
if (offset != 0){
offset += 1;
}
offsets.put(topicPartition,offset);
}
}
}
}
public static Map initKafkaParams(){
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "dc-sit-225:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
return kafkaParams;
}
public static Iterator<String> updateOffset(ConsumerRecord<String,String> consumerRecord){
String path = rootPath + "/" + consumerRecord.topic() + "/" + consumerRecord.partition();
if (!zkClient.exists(path)){
zkClient.createPersistent(path,true);
} else{
zkClient.writeData(path, consumerRecord.offset());
}
return Arrays.asList(consumerRecord.value().split(" ")).iterator();
}
}