PS:創作不易,感謝閱讀,希望對讀者有所幫助,喜歡的話可以點贊、收藏、關注,作者會持續更新 Java 生态圈常見知識。黑色加粗為重點關注内容!
模型一:多個 Consumer,且每一個 Consumer 有自己的 Worker 線程
- 消費者
public class ConsumerThread implements Runnable {
static Logger log = LoggerFactory.getLogger(ConsumerThread.class);
private KafkaConsumer<String, String> consumer;
/**
* 初始化配置
*/
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.158:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 100);
props.put("session.timeout.ms", 10000);
props.put("max.poll.records", 10);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
@Override
public void run() {
System.out.println("主線程式号:"+Thread.currentThread().getId()+" ");
/* 每次建立的不同consumer,消費固定分區 */
Properties configs = initConfig();
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Collections.singletonList("mytopic1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach((ConsumerRecord<String, String> record) -> {
log.info("線程式号:"+Thread.currentThread().getId()+" partition:"+record.partition()+" 收到消息: key ===" + record.key() + " value ====" + record.value() + " topic ==="+ record.topic());
});
}
}
}
- 調用
@RequestMapping("/receiveThread")
public void receiveThread() {
ExecutorService runnableService = Executors.newFixedThreadPool(3);
runnableService.submit(new ConsumerThread());
runnableService.submit(new ConsumerThread());
runnableService.submit(new ConsumerThread());
runnableService.shutdown();
}
- 運作結果:可以看到 3 個線程啟用了 3 個 consumer,分别消費 3 個分區
主線程式号:48
主線程式号:49
主線程式号:50
線程式号:50 partition:2 收到消息: key ===null value ====value33 topic ===mytopic1
線程式号:48 partition:0 收到消息: key ===null value ====value0 topic ===mytopic1
線程式号:49 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1
模型二:一個Consumer(或多個Consumer),且每個Consumer有多個Worker線程
- 消費者
public class ConsumerThread2 implements Runnable {
static Logger log = LoggerFactory.getLogger(ConsumerThread.class);
private KafkaConsumer<String, String> consumer;
private ExecutorService executor;
/**
* 初始化配置
*/
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.158:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 100);
props.put("session.timeout.ms", 10000);
props.put("max.poll.records", 10);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
@Override
public void run() {
/* 每次建立的不同consumer,消費固定分區 */
Properties configs = initConfig();
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Collections.singletonList("mytopic1"));
System.out.println("主線程式号:" + Thread.currentThread().getId() + " ");
executor = new ThreadPoolExecutor(3,3,0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(4), new ThreadPoolExecutor.CallerRunsPolicy());
while (true){
//循環不斷拉取100消息,僅消費資料,邏輯處理交給下遊worker線程
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> item : consumerRecords){
executor.submit(new ConsumerThreadHandler(Thread.currentThread().getId(), item));
}
}
}
}
- 資料處理線程
public class ConsumerThreadHandler implements Runnable {
private ConsumerRecord consumerRecord;
private Long mainThreadId;
public ConsumerThreadHandler(Long mainThreadId, ConsumerRecord consumerRecord) {
this.consumerRecord = consumerRecord;
this.mainThreadId = mainThreadId;
}
@Override
public void run() {
//結合自己的業務處理
System.out.println("主線程:"+mainThreadId+ " 線程式号:"+Thread.currentThread().getId()+" partition:"+consumerRecord.partition()+" 收到消息: key ===" + consumerRecord.key() + " value ====" + consumerRecord.value() + " topic ==="
+ consumerRecord.topic());
}
}
- 調用
@RequestMapping("/receiveThread2")
public void receiveThread2() {
ExecutorService runnableService = Executors.newFixedThreadPool(3);
runnableService.submit(new ConsumerThread2());
runnableService.submit(new ConsumerThread2());
runnableService.submit(new ConsumerThread2());
runnableService.shutdown();
}
- 運運作結果:啟用了 3 個 consumer 作為消費線程,每個消費線程又開啟一個線程池進行多線程處理資料。
主線程式号:48
主線程式号:49
主線程式号:50
主線程:48 線程式号:48 partition:1 收到消息: key ===null value ====value3 topic ===mytopic1
主線程:48 線程式号:55 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1
主線程:48 線程式号:54 partition:1 收到消息: key ===null value ====value0 topic ===mytopic1
主線程:50 線程式号:50 partition:2 收到消息: key ===null value ====value28 topic ===mytopic1
主線程:50 線程式号:62 partition:2 收到消息: key ===null value ====value30 topic ===mytopic1
主線程:49 線程式号:57 partition:0 收到消息: key ===null value ====value17 topic ===mytopic1