天天看点

spring-cloud-stream整合kafka

1.在项目的pom中引入

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>           

2.配置消息通道

public interface Demo {
    /**
     * 发消息的通道名称
     */
    String DEMO_OUTPUT = "demo_output";
    /**
     * 消息的订阅通道名称
     */
    String DEMO_INPUT = "demo_input";
    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(DEMO_OUTPUT)
    MessageChannel sendDemoMessage();
    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(DEMO_INPUT)
    SubscribableChannel recieveDemoMessage();
}
           
  1. 使带注释组件的结合Input和Output根据作为值给注释传递接口的列表到代理
@EnableBinding(value = {Demo.class})           

4.链接kafka配置

spring.cloud.stream.bindings.demo_input.destination=demo
spring.cloud.stream.bindings.demo_input.group=demo
spring.cloud.stream.bindings.demo_output.destination=demo
spring.cloud.stream.bindings.demo_output.group=demo
spring.cloud.stream.default-binder=kafka
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer           

5.发送消息

@Resource(name = Demo.DEMO_OUTPUT)
    private MessageChannel sendDemoMessageChannel;
    @Test
    public void Demo() {
        boolean isSendSuccess = sendDemoMessageChannel.
                send(MessageBuilder.withPayload("OK").build());
            System.out.println(isSendSuccess);
    }           

6.接收消息

@StreamListener(Demo. DEMO_INPUT)
    public void insertQuotationK(Message<String> message) {
        if (StringUtils.isEmpty(message.getPayload())) {
            System.out.println("receiver data is empty !");
            System.out.println(400 + "failed");
        }
        System.out.println("kafka收到"+message.getPayload());
    }           

7.结束咯,如果出现异常,请留言。