天天看点

Spark streaming手动保存offset到zk java实现

文章目录

    • 前言
    • pom依赖版本
    • Demo

前言

网上有部分案例是关于手动设置kafka中offset的,不过大多采用的是0.8的kafka版本,采用scala的编写,kafka-0.10版本的鲜有提及,或者都不完整。0.10版本是可以兼容之前的,但是新版本的api确实要更简洁易用。找不到可以直接拿来就用的,所以这里抽时间看官网api,写个较为完整的JAVA测试用例,希望对大家有帮助。

pom依赖版本

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>
           

Demo

话不多说上代码。注释较少,不过代码命名就是很好的注释了。

import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.*;
public class WordCount {
    private static String groupId = "group1";

    private static Map<TopicPartition,Long> offsets = new HashMap<>();
    // 消费者组根路径
    private static Collection<String> topics = Arrays.asList( "mytest-topic","dc01");

    private static String zkServerUrls = "dc-sit-225:2181";
	// 实际应用中需要序列化存储,不然zkCli.sh读取会乱码,相关组件也无法识别
    private static ZkClient zkClient = new ZkClient(zkServerUrls);
    // 消费者组根路径,kafka-manage会读取该路径
    private static String rootPath = "/consumers" + groupId + "/offsets/";

    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        initOffset(zkClient);
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        // 配置主题,kafka参数,offsets
                        ConsumerStrategies.<String, String>Subscribe(topics, initKafkaParams(),offsets)
                );
        JavaDStream<String> words = stream.flatMap(x -> updateOffset(x));
        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();
        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
    }

    public static void initOffset(ZkClient zkClient){
        if (!zkClient.exists(rootPath)){
            for (String topic : topics){
                String path = rootPath + "/" + topic ;
                zkClient.createPersistent(path,true);
            }
        }else {
            List<String> topicSet = zkClient.getChildren(rootPath);
            for (String topic : topicSet){
                String topicPath = rootPath + "/" + topic;
                List<String> partitionSet = zkClient.getChildren(topicPath);
                for (String partition : partitionSet){
                    Long offset = zkClient.readData(topicPath + "/" + partition);
                    TopicPartition topicPartition = new TopicPartition(topic, Integer.valueOf(partition));
                    // 注意这里offset需要往后移一位,不然会出现重复消费
                    if (offset != 0){
                        offset += 1;
                    }
                    offsets.put(topicPartition,offset);
                }
            }
        }
    }

    public static Map initKafkaParams(){
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "dc-sit-225:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", groupId);
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", false);
        return kafkaParams;
    }

    public static Iterator<String> updateOffset(ConsumerRecord<String,String> consumerRecord){
        String path = rootPath + "/" + consumerRecord.topic() + "/" + consumerRecord.partition();
        if (!zkClient.exists(path)){
            zkClient.createPersistent(path,true);
        } else{
            zkClient.writeData(path, consumerRecord.offset());
        }
        return Arrays.asList(consumerRecord.value().split(" ")).iterator();
    }
}
           

继续阅读