天天看點

阿瑟-消息隊列(12):Kafka-多線程生産者和消費者(二)

作者:阿瑟傑克斯

PS:創作不易,感謝閱讀,希望對讀者有所幫助,喜歡的話可以點贊、收藏、關注,作者會持續更新 Java 生态圈常見知識。黑色加粗為重點關注内容!

模型一:多個 Consumer,且每一個 Consumer 有自己的 Worker 線程

  • 消費者
public class ConsumerThread implements Runnable {

    static Logger log = LoggerFactory.getLogger(ConsumerThread.class);

    private KafkaConsumer<String, String> consumer;

    /**
     * 初始化配置
     */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.158:9092");
        props.put("group.id", "my-consumer-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 100);
        props.put("session.timeout.ms", 10000);
        props.put("max.poll.records", 10);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    @Override
    public void run() {
        System.out.println("主線程式号:"+Thread.currentThread().getId()+" ");

        /* 每次建立的不同consumer,消費固定分區 */
        Properties configs = initConfig();
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Collections.singletonList("mytopic1"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach((ConsumerRecord<String, String> record) -> {

                log.info("線程式号:"+Thread.currentThread().getId()+" partition:"+record.partition()+" 收到消息: key ===" + record.key() + " value ====" + record.value() + " topic ==="+ record.topic());
            });
        }
    }
}           
  • 調用
@RequestMapping("/receiveThread")
public void receiveThread() {
    ExecutorService runnableService = Executors.newFixedThreadPool(3);
    runnableService.submit(new ConsumerThread());
    runnableService.submit(new ConsumerThread());
    runnableService.submit(new ConsumerThread());
    runnableService.shutdown();
}           
  • 運作結果:可以看到 3 個線程啟用了 3 個 consumer,分别消費 3 個分區
主線程式号:48 
主線程式号:49 
主線程式号:50 
線程式号:50 partition:2 收到消息: key ===null value ====value33 topic ===mytopic1
線程式号:48 partition:0 收到消息: key ===null value ====value0 topic ===mytopic1
線程式号:49 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1           

模型二:一個Consumer(或多個Consumer),且每個Consumer有多個Worker線程

  • 消費者
public class ConsumerThread2 implements Runnable {

    static Logger log = LoggerFactory.getLogger(ConsumerThread.class);

    private KafkaConsumer<String, String> consumer;

    private ExecutorService executor;

    /**
     * 初始化配置
     */
    private static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.158:9092");
        props.put("group.id", "my-consumer-group");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 100);
        props.put("session.timeout.ms", 10000);
        props.put("max.poll.records", 10);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    @Override
    public void run() {
        /* 每次建立的不同consumer,消費固定分區 */
        Properties configs = initConfig();
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Collections.singletonList("mytopic1"));

        System.out.println("主線程式号:" + Thread.currentThread().getId() + " ");
        executor = new ThreadPoolExecutor(3,3,0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(4), new ThreadPoolExecutor.CallerRunsPolicy());
        while (true){
            //循環不斷拉取100消息,僅消費資料,邏輯處理交給下遊worker線程
            ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String,String> item : consumerRecords){
                executor.submit(new ConsumerThreadHandler(Thread.currentThread().getId(), item));
            }
        }
    }
}           
  • 資料處理線程
public class ConsumerThreadHandler implements Runnable {

    private ConsumerRecord consumerRecord;
    private Long mainThreadId;

    public ConsumerThreadHandler(Long mainThreadId, ConsumerRecord consumerRecord) {
        this.consumerRecord = consumerRecord;
        this.mainThreadId = mainThreadId;
    }

    @Override
    public void run() {
        //結合自己的業務處理
        System.out.println("主線程:"+mainThreadId+ " 線程式号:"+Thread.currentThread().getId()+" partition:"+consumerRecord.partition()+" 收到消息: key ===" + consumerRecord.key() + " value ====" + consumerRecord.value() + " topic ==="
                + consumerRecord.topic());
    }
}           
  • 調用
@RequestMapping("/receiveThread2")
public void receiveThread2() {
    ExecutorService runnableService = Executors.newFixedThreadPool(3);
    runnableService.submit(new ConsumerThread2());
    runnableService.submit(new ConsumerThread2());
    runnableService.submit(new ConsumerThread2());
    runnableService.shutdown();
}           
  • 運運作結果:啟用了 3 個 consumer 作為消費線程,每個消費線程又開啟一個線程池進行多線程處理資料。
主線程式号:48
主線程式号:49
主線程式号:50
主線程:48 線程式号:48 partition:1 收到消息: key ===null value ====value3 topic ===mytopic1
主線程:48 線程式号:55 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1
主線程:48 線程式号:54 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1
主線程:50 線程式号:50 partition:2 收到消息: key ===null value ====value28 topic ===mytopic1
主線程:50 線程式号:62 partition:2 收到消息: key ===null value ====value30 topic ===mytopic1
主線程:49 線程式号:57 partition:0 收到消息: key ===null value ====value17 topic ===mytopic1