天天看點

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka

一、關于Spring-Cloud-Stream

  Spring Cloud Stream本質上就是整合了Spring Boot和Spring Integration,實作了一套輕量級的消息驅動的微服務架構。通過使用Spring Cloud Stream,可以有效地簡化開發人員對消息中間件的使用複雜度,讓系統開發人員可以有更多的精力關注于核心業務邏輯的處理。

  在這裡我先放一張官網的圖:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka

  應用程式通過Spring Cloud Stream注入到輸入和輸出通道與外界進行通信。根據此規則我們很容易的實作消息傳遞,訂閱消息與消息中轉。并且當需要切換消息中間件時,幾乎不需要修改代碼,隻需要變更配置就行了。

  在用例圖中 Inputs代表了應用程式監聽消息 、outputs代表發送消息、binder的話大家可以了解為将應用程式與消息中間件隔離的抽象,類似于三層架構下利用dao屏蔽service與資料庫的實作的原理。

  springcloud預設提供了rabbitmq與kafka的實作。

二、springcloud內建kafka

1、添加gradle依賴:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
dependencies{
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
}      

View Code

2、定義一個接口:

  spring-cloud-stream已經給我們定義了最基本的輸入與輸出接口,他們分别是 Source,Sink, Processor

  Sink接口:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}      

  Source接口:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}      

  Processor接口:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}      

  這裡面Processor這個接口既定義輸入通道又定義了輸出通道。同時我們也可以自己定義通道接口,代碼如下:

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package com.bdqn.lyrk.shop.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ShopChannel {

    /**
     * 發消息的通道名稱
     */
    String SHOP_OUTPUT = "shop_output";

    /**
     * 消息的訂閱通道名稱
     */
    String SHOP_INPUT = "shop_input";

    /**
     * 發消息的通道
     *
     * @return
     */
    @Output(SHOP_OUTPUT)
    MessageChannel sendShopMessage();

    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(SHOP_INPUT)
    SubscribableChannel recieveShopMessage();


}      

3、定義服務類

SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package com.bdqn.lyrk.shop.server;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ShopService {

    @Resource(name = ShopChannel.SHOP_OUTPUT)
    private MessageChannel sendShopMessageChannel;

    @GetMapping("/sendMsg")
    public String sendShopMessage(String content) {
        boolean isSendSuccess = sendShopMessageChannel.
                send(MessageBuilder.withPayload(content).build());
        return isSendSuccess ? "發送成功" : "發送失敗";
    }

    @StreamListener(ShopChannel.SHOP_INPUT)
    public void receive(Message<String> message) {
        System.out.println(message.getPayload());
    }
}      

  這裡面大家注意 @StreamListener。這個注解可以監聽輸入通道裡的消息内容,注解裡面的屬性指定我們剛才定義的輸入通道名稱,而MessageChannel則可以通過

輸出通道發送消息。使用@Resource注入時需要指定我們剛才定義的輸出通道名稱

4、定義啟動類

SpringCloud學習之SpringCloudStream&amp;內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&amp;內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
package com.bdqn.lyrk.shop;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(ShopChannel.class)
public class ShopServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopServerApplication.class, args);
    }
}      

  注意@EnableBinding注解,這個注解指定剛才我們定義消息通道的接口名稱,當然這裡也可以傳多個相關的接口

5、定義application.yml檔案

SpringCloud學習之SpringCloudStream&amp;內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
SpringCloud學習之SpringCloudStream&amp;內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka
spring:
  application:
    name: shop-server
  cloud:
    stream:
      bindings:
        #配置自己定義的通道與哪個中間件互動
        shop_input: #ShopChannel裡Input和Output的值
          destination: zhibo #目标主題
        shop_output:
          destination: zhibo
      default-binder: kafka #預設的binder是kafka
  kafka:
    bootstrap-servers: localhost:9092 #kafka服務位址
    consumer:
      group-id: consumer1
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1
server:
  port: 8100      

  這裡是重頭戲,我們必須指定所有通道對應的消息主題,同時指定預設的binder為kafka,緊接着定義Spring-kafka的外部化配置,在這裡指定producer的序列化類為ByteArraySerializer

啟動程式成功後,我們通路 http://localhost:8100/sendMsg?content=2 即可得到如下結果

SpringCloud學習之SpringCloudStream&amp;內建kafka一、關于Spring-Cloud-Stream二、springcloud內建kafka