天天看点

SpringBoot集成阿里云Kafka 示例

Step By Step

1、kafka控制台创建公网类型实例

2、创建SpringBoot项目集成阿里云Kafka

3、发送接收测试

一、kafka控制台创建公网类型实例

1.1 Kafka

控制台

创建实例

SpringBoot集成阿里云Kafka 示例

1.2 获取认证参数

SpringBoot集成阿里云Kafka 示例
SpringBoot集成阿里云Kafka 示例

二、创建SpringBoot项目集成阿里云Kafka

2.1 创建Spring Boot(2.5.2)项目

SpringBoot集成阿里云Kafka 示例

2.2 依赖

<properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>           

2.3 Sender.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        this.template.sendDefault("my_msg", msg);
        System.out.println("send message:" + msg);
    }
}           

2.4 Receiver.class

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("Receive Message");
        System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
    }
}           

2.5 KafkaController.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}           

2.6 application.yml

spring:
  kafka:
    template:
      default-topic: <topic>
    bootstrap-servers: <SSL接入点>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <用户名>
        password: <密码>
    consumer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: <group>
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:           
kafka.client.truststore.jks 下载 地址 ,证书下载后直接放在C盘根目录下。

2.7 项目结构

SpringBoot集成阿里云Kafka 示例

三、发送接收测试

3.1 启动项目,使用PostMan发送Post请求

SpringBoot集成阿里云Kafka 示例

3.2 项目日志

SpringBoot集成阿里云Kafka 示例

3.3 控制台消息监控查看

SpringBoot集成阿里云Kafka 示例

更多参考

SSL接入点PLAIN机制收发消息