天天看點

kafka stream及interceptor(四)

一、 Kafka Streams

官網位址:https://kafka.apache.org/30/documentation/streams/

1.1. 概述

1.1.1. Kafka Streams

Kafka Streams是一個用戶端庫,用于建構任務關鍵型實時應用程式和微服務,其中輸入和/或輸出資料存儲在Kafka叢集中。Kafka Streams結合了在用戶端編寫和部署标準Java和Scala應用程式的簡單性以及Kafka伺服器端叢集技術的優勢,使這些應用程式具有高度可擴充性,彈性,容錯性,分布式等等。

1.1.2. Kafka Streams特點

1)功能強大

  • 高擴充性,彈性,容錯

2)輕量級

  • 無需專門的叢集一個庫,而不是架構

3)完全內建

  • 100%的Kafka 0.10.0版本相容,易于內建到現有的應用程式

4)實時性

      • 毫秒級延遲
      • 并非微批處理
      • 視窗允許亂序資料
      • 允許遲到資料

1.1.3. 為什麼要有Kafka Streams

目前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級别的處理能力,目前也支援SQL on Stream。而Spark Streaming基于Apache Spark,可以非常友善與圖計算,SQL處理等內建,功能強大,對于熟悉其它Spark應用開發的使用者而言使用門檻低。另外,目前主流的Hadoop發行版,如Cloudera和Hortonworks,都內建了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。

第一,Spark和Storm都是流式處理架構,而Kafka Stream提供的是一個基于Kafka的流式處理類庫。架構要求開發者按照特定的方式去開發邏輯部分,供架構調用。開發者很難了解架構的具體運作方式,進而使得調試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運作方式主要由開發者控制,友善使用和調試。

kafka stream及interceptor(四)

第二,雖然Cloudera與Hortonworks友善了Storm和Spark的部署,但是這些架構的部署仍然相對複雜。而Kafka Stream作為類庫,可以非常友善的嵌入應用程式中,它對應用的打包和部署基本沒有任何要求。

第三,就流式處理系統而言,基本都支援Kafka作為資料源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka子產品。事實上,Kafka基本上是主流的流式處理系統的标準資料源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時,需要為架構本身的程序預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應用執行個體而言,架構本身也會占用部分資源,如SparkStreaming需要為shuffle和storage預留記憶體。但是Kafka作為類庫不占用系統資源。

第五,由于Kafka本身提供資料持久化,是以Kafka Stream提供滾動部署和滾動更新以及重新計算的能力。

第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以線上動态調整并行度。

 1.2、stream示範

 stream在大資料中應用場景比較多,下面根據github提供的單詞統計案例來示範下:https://github.com/apache/kafka/blob/2.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

 1.2.1、 建立一個輸入topic和一個輸出topic

bin/kafka-topics.sh --create --bootstrap-server 192.168.32.123:9092 --topic streams-plaintext-input --partitions 1 --replication-factor 1      
bin/kafka-topics.sh --create --bootstrap-server 192.168.32.123:9092 --topic streams-wordcount-output --partitions 1 --replication-factor 1 --config cleanup.policy=compact      

1.2.2、運作WordCount程式

 要導入pom包

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.1.1</version>
        </dependency>      
public final class WordCountDemo {

    public static void main(final String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.32.123:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> source = builder.stream("streams-plaintext-input");
       //統計次數
        final KTable<String, Long> counts = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value)
                .count();
         //輸出
        // need to override value serde to Long type
        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}      

啟動生産者:

./bin/kafka-console-producer.sh --bootstrap-server 192.168.32.123:9092 --topic streams-plaintext-input      

啟動消費者:

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.32.123:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer      
kafka stream及interceptor(四)
kafka stream及interceptor(四)

 由上面可知,他可以實時統計出出現的次數

二、Kafka producer攔截器(interceptor)

2.1. 攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實作clients端的定制化控制邏輯。對于producer而言,interceptor使得使用者在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許使用者指定多個interceptor按序作用于同一條消息進而形成一個攔截鍊(interceptor chain)。Intercetpor的實作接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)

擷取配置資訊和初始化資料時調用。

(2)onSend(ProducerRecord):

該方法封裝進KafkaProducer.send方法中,即它運作在使用者主線程中。Producer確定在消息被序列化以及計算分區前調用該方法。使用者可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目标分區的計算

(3)onAcknowledgement(RecordMetadata, Exception):

該方法會在消息被應答或消息發送失敗時調用,并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運作在producer的IO線程中,是以不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率

(4)close:

關閉interceptor,主要用于執行一些資源清理工作如前所述,interceptor可能被運作在多個線程中,是以在具體實作時使用者需要自行確定線程安全。另外倘若指定了多個interceptor,則producer将按照指定順序調用它們,并僅僅是捕獲每個interceptor可能抛出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特别留意。

2.2. 攔截器案例

實作一個簡單的雙interceptor組成的攔截鍊。第一個interceptor會在消息發送前将時間戳資訊加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。

kafka stream及interceptor(四)

 1.增加時間戳,按前面說的實作ProducerInterceptor方法

//增加時間戳
public class TimeInterceptor implements ProducerInterceptor<String, String> {
    //擷取配置資訊和初始化資料調用
    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 建立一個新的record,把時間戳寫入消息體的最前部
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }
}      

2.統計發送消息成功和發送失敗消息數,并在producer關閉時列印這兩個計數器 

public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> configs) {

    }


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
         
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 統計成功和失敗的次數
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 儲存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}      

3.producer主程式

public class InterceptorProducer {

    public static void main(String[] args) throws Exception {
        // 1 設定配置資訊
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.32.122:9092");
        // 預設為1;當為all時候值為-1,表示所有的都需要同步(一緻性最高相對性能也會有所降低)
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 建構攔截鍊
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.study.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.study.kafka.interceptor.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        String topic = "test";
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3 發送消息
        for (int i = 0; i < 10; i++) {

            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
            producer.send(record);
            // message0 , -> 123129374927,message0
            // 成功:
            // 失敗:
        }

        // 4 一定要關閉producer,這樣才會調用interceptor的close方法
        producer.close();
    }
}      
kafka stream及interceptor(四)
kafka stream及interceptor(四)

三、 Kafka 自定義分區器 

在調用Kafka的Producer API時,如果沒有指定分區器,那麼資料将會根據預設分區器的算法均分到各個分區。然而實際的生産環境中,可能Kafka的分區數不止一個(官方建議:Kafka的分區數量應該是Broker數量的整數倍!),是以這時需要我們自定義分區器。

3.1. 預設分區器DefaultPartitioner

預設分區器:

  • org.apache.kafka.clients.producer.internals.DefaultPartitioner

預設分區器擷取分區:

  • 如果消息的 key 為 null,此時 producer 會使用預設的 partitioner 分區器将消息随機分布到 topic 的可用 partition 中。如果 key 不為 null,并且使用了預設的分區器,kafka 會使用自己的 hash 算法對 key 取 hash 值,使用hash 值與 partition 數量取模,進而确定發送到哪個分區。注意:此時 key 相同的消息會發送到相同的分區(隻要 partition 的數量不變化)。

ProducerRecord

 在具體分析分區器之前,我們先看一下生産者産生的消息記錄的結構。生産者産生的每一個消息均用

ProducerRecord

來表示,其字段如下所示:

public class ProducerRecord<K, V> {
    //該消息需要發往的主題
    private final String topic;
    //該消息需要發往的主題中的某個分區,如果該字段有值,則分區器不起作用,直接發往指定的分區
    //如果該值為null,則利用分區器進行分區的選擇 
    private final Integer partition;
    private final Headers headers;
    //如果partition字段為null,則使用分區器進行分區選擇時會用到該key字段,該值可為空 
    private final K key;
    private final V value;
    private final Long timestamp;      

Partitioner接口

Partitioner

接口中有一個最主要的方法:

/**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);      

這裡重點關注一下最後一個參數

Cluster cluster

,目前隻需要了解cluster代表了Kafka的中繼資料資訊,從該cluster字段能夠擷取到我們需要的資訊,在這裡我們隻關注從cluster能夠根據指定topic,擷取該topic所對應的分區的資訊。

DefaultPartitioner

 我們先看一下生産者在發送消息時選擇分區的邏輯,該邏輯在

KafkaProducer

類的

partition

 方法中:

/**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }      

如上代碼所示:首先判斷

ProducerRecord

中的

partition

字段是否有值,即是否在建立消息記錄的時候直接指定了分區,如果指定了分區,則直接将該消息發送到指定的分區,否則調用分區器的

partition

方法,執行分區政策。如果使用者配置了分區器,則使用使用者指定的分區器,否則使用預設的分區器,即

DefaultPartitioner

,下面我們看一下,該預設實作是如何進行分區選擇的。

public class DefaultPartitioner implements Partitioner {
 
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
 
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /* 首先通過cluster從中繼資料中擷取topic所有的分區資訊 */
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //拿到該topic的分區數
        int numPartitions = partitions.size();
        //如果消息記錄中沒有指定key
        if (keyBytes == null) {
            //則擷取一個自增的值
            int nextValue = nextValue(topic);
            //通過cluster拿到所有可用的分區(可用的分區這裡指的是該分區存在首領副本)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //如果該topic存在可用的分區
            if (availablePartitions.size() > 0) {
                //那麼将nextValue轉成正數之後對可用分區數進行取餘
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                //然後從可用分區中傳回一個分區
                return availablePartitions.get(part).partition();
            } else { // 如果不存在可用的分區
                //那麼就從所有不可用的分區中通過取餘的方式傳回一個不可用的分區
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else { // 如果消息記錄中指定了key
            // 則使用該key進行hash操作,然後對所有的分區數進行取餘操作,這裡的hash算法采用的是murmur2算法,然後再轉成正數
            //toPositive方法很簡單,直接将給定的參數與0X7FFFFFFF進行邏輯與操作。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
 
    //nextValue方法可以了解為是在消息記錄中沒有指定key的情況下,需要生成一個數用來代替key的hash值
    //方法就是最開始先生成一個随機數,之後在這個随機數的基礎上每次請求時均進行+1的操作
    private int nextValue(String topic) {
        //每個topic都對應着一個計數
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) { // 如果是第一次,該topic還沒有對應的計數
            //那麼先生成一個随機數
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            //然後将該随機數與topic對應起來存入map中
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                //之後把這個随機數傳回
                counter = currentCounter;
            }
        }
        //一旦存入了随機數之後,後續的請求均在該随機數的基礎上+1之後進行傳回
        return counter.getAndIncrement();
    }      

總結

生産者發送消息時整個分區路由的步驟如下:

  1. 判斷消息中的

    partition

    字段是否有值,有值的話即指定了分區,直接将該消息發送到指定的分區就行。
  2. 如果沒有指定分區,則使用分區器進行分區路由,首先判斷消息中是否指定了

    key

  3. 如果指定了

    key

    ,則使用該

    key

    進行hash操作,并轉為正數,然後對

    topic

    對應的分區數量進行取模操作并傳回一個分區。
  4. 如果沒有指定

    key

    ,則通過先産生随機數,之後在該數上自增的方式産生一個數,并轉為正數之後進行取餘操作。

上述第4點需要注意一下,如果該

topic

有可用分區,則優先配置設定可用分區,如果沒有可用分區,則配置設定一個不可用分區。這與第3點中

key

有值的情況不同,

key

有值時,不區分可用分區和不可用分區,直接取餘之後選擇某個分區進行配置設定。

3.2、自定義分區器

/**
 * 定義Kafka分區器
 */
public class MySamplePartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
    private Random random = new Random();

    //我的分區器定義
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitioners = cluster.partitionsForTopic(topic);
        int numPartitions = partitioners.size();

        /**
         * 由于按key分區,在這裡我們規定:key值不允許為null。
         * 在實際項目中,key為null的消息*,可以發送到同一個分區,或者随機分區。
         */
        int res = 1;
        if (keyBytes == null) {
            System.out.println("value is null");
            res = random.nextInt(numPartitions);
        } else {
//            System.out.println("value is " + value + "\n hashcode is " + value.hashCode());
            res = Math.abs(key.hashCode()) % numPartitions;
        }
        System.out.println("data partitions is " + res);
        return res;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}      
public class TestProducer {
    public static void main(String args[]) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.32.122:9092,192.168.32.123:9092,192.168.32.124:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //org.apache.kafka.clients.producer.internals.DefaultPartitioner
        props.put("partitioner.class", "com.study.kafka.partition.MyPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("market_topic1",
                    "" + i,
                    "message" + i));

            //Thread.sleep(1000L);
        }

        producer.close();
        System.out.println("done...");
    }
}      
public class TestConsumer {
    public static void main(String args[]) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.32.122:9092,192.168.32.123:9092,192.168.32.124:9092");
        props.setProperty("group.id", "test111111111111111");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("auto.commit.interval.ms", "1000");

        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("market_topic1"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.partition() + ":" + record.offset());
            }
        }
    }
}      
kafka stream及interceptor(四)

這短短的一生我們最終都會失去,不妨大膽一點,愛一個人,攀一座山,追一個夢