天天看点

Kafka 入门代码示例

kafka 生产者

配置类

/**
 * kafka配置类
 *
 * @author olafwang
 * @since 2020/9/29 2:45 下午
 */
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaProducer<String, String> producerRecord() {

        Properties properties = new Properties();
        // 配置kafka集群地址,不用将全部机器都写上,zk会自动发现全部的kafka broke
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        // 设置发送消息的应答方式
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        // 重试间隔时间
        properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
        // 一批次发送的消息大小 16KB
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
        // 一个批次等待时间,10ms
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
        // RecordAccumulator 缓冲区大小  32M,如果缓冲区满了会阻塞发送端
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        // 配置拦截器, 多个逗号隔开
        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xiaolyuh.interceptor.TraceInterceptor");

        Serializer<String> keySerializer = new StringSerializer();
        Serializer<String> valueSerializer = new StringSerializer();

        return new KafkaProducer<>(properties, keySerializer, valueSerializer);
    }

}      

发送端

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootStudentKafkaApplicationTests {

    @Autowired
    private KafkaProducer<String, String> kafkaProducer;

    @Test
    public void testSyncKafkaSend() throws Exception {
        // 同步发送测试
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test_cluster_topic", "key-" + i, "value-" + i);
            // 同步发送,这里我们还可以指定发送到那个分区,还可以添加header
            kafkaProducer.send(producerRecord, new KafkaCallback<>(producerRecord)).get(50, TimeUnit.MINUTES);
        }

        System.out.println("ThreadName::" + Thread.currentThread().getName());
    }

    @Test
    public void testAsyncKafkaSend() {
        // 异步发送测试
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test_cluster_topic2", "key-" + i, "value-" + i);
            // 异步发送,这里我们还可以指定发送到那个分区,还可以添加header
            kafkaProducer.send(producerRecord, new KafkaCallback<>(producerRecord));
        }

        System.out.println("ThreadName::" + Thread.currentThread().getName());
        // 记得刷新,否则消息有可能没有发出去
        kafkaProducer.flush();
    }
}

/**
 * 异步回调函数,该方法会在 Producer 收到 ack 时调用,当Exception不为空表示发送消息失败。
 *
 * @param <K>
 * @param <V>
 */
class KafkaCallback<K, V> implements Callback {
    private final ProducerRecord<K, V> producerRecord;

    public KafkaCallback(ProducerRecord<K, V> producerRecord) {
        this.producerRecord = producerRecord;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        System.out.println("ThreadName::" + Thread.currentThread().getName());
        if (Objects.isNull(exception)) {
            System.out.println(metadata.partition() + "-" + metadata.offset() + ":::" + producerRecord.key() + "=" + producerRecord.value());
        }

        if (Objects.nonNull(exception)) {
            // TODO  告警,消息落库从发
        }
    }
}      

消费者

Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。

@Component
public class KafkaConsumerDemo {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 10,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1));

    @PostConstruct
    public void startConsumer() {
        executor.submit(() -> {
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");

            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            // 请求超时时间
            properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
            Deserializer<String> keyDeserializer = new StringDeserializer();
            Deserializer<String> valueDeserializer = new StringDeserializer();
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);

            consumer.subscribe(Arrays.asList("test_cluster_topic"));

            // KafkaConsumer的assignment()方法来判定是否分配到了相应的分区,如果为空表示没有分配到分区
            Set<TopicPartition> assignment = consumer.assignment();
            while (assignment.isEmpty()) {
                // 阻塞1秒
                consumer.poll(1000);
                assignment = consumer.assignment();
            }

            // KafkaConsumer 分配到了分区,开始消费
            while (true) {
                // 拉取记录,如果没有记录则柱塞1000ms。
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    String traceId = new String(record.headers().lastHeader("traceId").value());
                    System.out.printf("traceId = %s, offset = %d, key = %s, value = %s%n", traceId, record.offset(), record.key(), record.value());
                }

                // 异步确认提交
                consumer.commitAsync((offsets, exception) -> {
                    if (Objects.isNull(exception)) {
                        // TODO 告警、落盘、重试
                    }
                });
            }
        });

    }
}      

拦截器

/**
 * @author olafwang
 * 链路ID
 */
public class TraceInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    /**
     * 最先调用,读取配置信息,只调用一次
     */
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println(JSON.toJSONString(configs));
    }

    /**
     * 它运行在用户主线程中,在消息序列化和计算分区之前调用,这里最好不小修改topic 和分区参数,否则会出一些奇怪的现象。
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        Headers headers = new RecordHeaders();
        headers.add("traceId", UUID.randomUUID().toString().getBytes(Charset.forName("UTF8")));
        // 修改消息
        return new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), headers);
    }

    /**
     * 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程 中失败时调用。
     * 并且通常都是在 producer 回调逻辑触发之前调用。
     * onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息 发送效率。
     *
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (Objects.isNull(exception)) {
            // TODO  出错了
        }
    }

    /**
     * 关闭 interceptor,主要用于执行一些资源清理工作,只调用一次
     */
    @Override
    public void close() {
        System.out.println("==========close============");
    }
}      

源码