天天看點

SpringCloud實戰9-Stream消息驅動

官方定義 Spring Cloud Stream 是一個建構消息驅動微服務的架構。

  應用程式通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 互動,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件互動。是以,我們隻需要搞清楚如何與 Spring Cloud Stream 互動就可以友善使用消息驅動的方式。

通過使用Spring Integration來連接配接消息代理中間件以實作消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。目前僅支援RabbitMQ、Kafka。

這裡還要講解一下什麼是Spring Integration  ? Integration  內建

企業應用內建(EAI)是內建應用之間資料和服務的一種應用技術。四種內建風格:

  1.檔案傳輸:兩個系統生成檔案,檔案的有效負載就是由另一個系統處理的消息。該類風格的例子之一是針對檔案輪詢目錄或FTP目錄,并處理該檔案。

  2.共享資料庫:兩個系統查詢同一個資料庫以擷取要傳遞的資料。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。

  3.遠端過程調用:兩個系統都暴露另一個能調用的服務。該類例子有EJB服務,或SOAP和REST服務。

  4.消息:兩個系統連接配接到一個公用的消息系統,互相交換資料,并利用消息調用行為。該風格的例子就是衆所周知的中心輻射式的(hub-and-spoke)JMS架構。

 為什麼需要SpringCloud Stream消息驅動呢?

  比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分區,這些中間件的差異性導緻我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,

後面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。

如下是官方文檔提供的架構圖所示:

SpringCloud實戰9-Stream消息驅動

Spring Cloud Stream由一個中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當于消費者consumer,它是從隊列中接收消息的)和output(相當于生産者producer,它是從隊列中發送消息的。)通道與外界交流。

通道通過指定中間件的Binder實作與外部代理連接配接。業務開發者不再關注具體消息中間件,隻需關注Binder對應用程式提供的抽象概念來使用消息中間件實作業務即可。

Binder

  通過定義綁定器作為中間層,實作了應用程式與消息中間件(Middleware)細節之間的隔離。通過向應用程式暴露統一的Channel通過,是的應用程式不需要再考慮各種不同的消息中間件的實作。當需要更新消息中間件,或者是更換其他消息中間件産品時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯 。甚至可以任意的改變中間件的類型而不需要修改一行代碼。目前隻提供了RabbitMQ和Kafka的Binder實作。

Springcloud Stream還有個好處就是像Kafka一樣引入了一點分區的概念,像RabbitMQ不支援分區的隊列,你用了SpringCloud Stream技術,它就會幫RabbitMQ引入了分區的特性,SpringCloud Stream就是天然支援分區的,我們用起來還是很友善的。後面會詳細講解

接下來進行一個Demo進行演練。

首先我們要在先前的工程中建立三個子子產品,分别是springcloud-stream,springcloud-stream1,springcloud-stream2  這三個子產品,其中springcloud-stream作為生産者進行發消息子產品,springcloud-stream1,springcloud-stream2作為消息接收子產品。

如下圖所示:

SpringCloud實戰9-Stream消息驅動

 分别在springcloud-stream,springcloud-stream1,springcloud-stream2  這三個子產品引入如下依賴:

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>

      
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
    </dependency>      

接着進行application.yml進行配置如下:

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
#Kafka的消息中間件伺服器
          brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,号分隔
          zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。
          auto-create-topics: true
      bindings:
#這裡用stream給我們提供的預設output,後面會講到自定義output
        output:
#消息發往的目的地        
            destination: stream-demo
#消息發送的格式,接收端不用指定格式,但是發送端要            
            content-type: text/plain      

接下來進行第一個springcloud-stream子產品的代碼編寫,在該子產品下定義一個SendService,如下:

package hjc.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * Created by cong on 2018/5/28.
 */
//這個注解給我們綁定消息通道的,Source是Stream給我們提供的,可以點進去看源碼,可以看到output和input,這和配置檔案中的output,input對應的。
@EnableBinding(Source.class)

public class SendService {

    @Autowired
    private Source source;


    public void sendMsg(String msg){
        source.output().send(MessageBuilder.withPayload(msg).build());
    }

}      

springcloud-stream 的controller層代碼如下:

package hjc.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by cong 2018/5/28
 */
@RestController
public class ProducerController {


    @Autowired
    private SendService sendService;


    @RequestMapping("/send/{msg}")
    public void send(@PathVariable("msg") String msg){
        sendService.sendMsg(msg);
    }
}      

接下來進行springcloud-stream1,springcloud-stream2兩個子產品的代碼編寫

首先需要引入的依賴,上面已經提到。

接着進行springcloud-stream1和springcloud-stream2子產品application.yml的配置,如下:

springcloud-stream1配置如下:

server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
          input:
            destination: stream-demo      

springcloud-stream2子產品application.yml的配置如下:

server:
  port: 7890
spring:
  application:
    name: consumer_2
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo      

好了接下來進行springcloud-stream1子產品和springcloud-stream2子產品的消息接受代碼的編寫,springcloud-stream1子產品和springcloud-stream2子產品的消息接受代碼都是一樣的,如下:

//消息接受端,stream給我們提供了Sink,Sink源碼裡面是綁定input的,要跟我們配置檔案的imput關聯的。
@EnableBinding(Sink.class)
public class RecieveService {

    @StreamListener(Sink.INPUT)
    public void recieve(Object payload){
        System.out.println(payload);
    }

}      

好了接着我們首先要啟動上一篇随筆所提到的zookeeper,和Kafka,如下:

SpringCloud實戰9-Stream消息驅動

接着分别現後啟動啟動springcloud-stream,springcloud-stream1,springcloud-stream2,子產品運作結果如下:

首先進行springcloud-stream子產品的通路,如下:

SpringCloud實戰9-Stream消息驅動

回車後可以看到,Kafka CommitId,說明消息發送成功,再看一下,那兩個消息接受子產品的輸出,如下:

SpringCloud實戰9-Stream消息驅動
SpringCloud實戰9-Stream消息驅動

可以看到這兩消息子產品都接收到了消息并且列印了出來。

好了到現在為止,我們進行了一個簡單的消息發送和接收,用的是Stream給我們提供的預設Source,Sink,接下來我們要自己進行自定義,這種方式在工作中還是用的比較多的,因為我們要往不同的消息通道發消息,

必然不能全都叫input,output的,那樣的話就亂套了,是以首先自定義一個接口,如下:

/**
 * Created by cong on 2018/5/28.
 */
public interface MySource {

    @Output("myOutput")
    MessageChannel myOutput();

}      

這裡要注意一下,可以看到上面的代碼,其中myOutput是要和你的配置檔案的消息發送端配置對應的,是以修改springcloud-stream中application.yml配置,如下:

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
#Kafka的消息中間件伺服器
          brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,号分隔
          zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。
          auto-create-topics: true
      bindings:
#自定義output
        myOutput:
#消息發往的目的地
            destination: stream-demo
#消息發送的格式,接收端不用指定格式,但是發送端要
            content-type: text/plain      

這樣還不行,還必須改造springcloud-stream消息發送端的SendService這個類,代碼如下:

package hjc.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * Created by cong on 2018/5/28.
 */

@EnableBinding(MySource.class)
public class SendService {
    
    @Autowired
    private MySource source;

    public void sendMsg(String msg){
        source.myOutput().send(MessageBuilder.withPayload(msg).build());
    }

}      

接下來重新啟動那三個子產品,運作結果如下:

SpringCloud實戰9-Stream消息驅動
SpringCloud實戰9-Stream消息驅動

可以看到兩個消息接收端還是依然能接受消息。

接收端的自定義接收也是類似的修改的,這裡就不示範了。

springcloud-stream還給我們提供了一個Processor接口,用于進行消息處理後再進行發送出去,相當于一個消息中轉站。下面我們進行示範

  首先我們需要改造springcloud-stream1子產品,把它作為一個消息中轉站。用于springcloud-stream消息處理後再進行發送給springcloud-stream2子產品

首先修改springcloud-stream1子產品的配置,如下:

server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
          input:
            destination: stream-demo
 #進行消息中轉處理後,在進行轉發出去           
          output:
            destination: stream-demo-trans      

接着在建立一個消息中轉類,代碼如下:

package hjc.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.ServiceActivator;

/**
 * Created by cong on 2018/5/28.
 */
@EnableBinding(Processor.class)
public class TransFormService {

    @ServiceActivator(inputChannel = Processor.INPUT,outputChannel = Processor.OUTPUT)
    public Object transform(Object payload){
        System.out.println("消息中轉站:"+payload);
        return payload;
    }

}      

接着要修改消息中轉站發送消息出去的接收端springcloud-stream2的配置,如下:

server:
  port: 7890
spring:
  application:
    name: consumer_2
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo-trans      

這裡要強調一下,要把先前RecieveService類的綁定注解全都注釋掉,不然,會綁定沖突的,接下來分别重新開機這三個子產品,運作結果如下:

先進性springcloud-stream子產品的通路。

SpringCloud實戰9-Stream消息驅動

中轉站運作結果取下:

SpringCloud實戰9-Stream消息驅動

接下來,看中轉後的的接受端Springcloud-stream2的消息,到底有沒有消息過來,如下:

SpringCloud實戰9-Stream消息驅動

可以看到,中轉後消息被接受到了。

我們還可能會遇到一個場景就是,我們接收到消息後,給别人一個回報ACK,SpringCloud stream 給我們提供了一個SendTo注解可以幫我們幹這些事情。

首先我們先實作一個接口SendToBinder去實作output和input,代碼如下:

package hjc.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * Created by cong on 2018/5/28.
 */
public interface SendToBinder {

    @Output("output")
    MessageChannel output();

    @Input("input")
    SubscribableChannel input();

}      

接着再建立一個SendToService類來綁定自己的SendToBinder接口,然後監聽input,傳回ACK表示中轉站收到消息了,再轉發消息出去,代碼如下:

package hjc.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;

/**
 * Created by cong on 2018/5/28.
 */
@EnableBinding(SendToBinder.class)
public class SendToService {

    @StreamListener("input")
    @SendTo("output")
    public Object receiveFromInput(Object payload){
        System.out.println("中轉消息。。"+payload);
        return "xxxxx";
    }

}      

這裡要注意一點就是,啟動前下那邊之前的用到的哪些綁定注解,先注釋掉,不然與這裡會發生沖突。

運作結果如下:

SpringCloud實戰9-Stream消息驅動
SpringCloud實戰9-Stream消息驅動

可以看到發送端受到一個ACK

可以看到先前的例子,我們都是一端發消息,兩個消息接受者都接收到了,但是有時候有些業務場景我隻想讓其中一個消息接收者接收到消息,那麼該怎麼辦呢?

這時候就涉及一個消息分組(Consumer Groups)的概念了。

消息分組(Consumer Groups)

  “Group”,如果使用過 Kafka 的讀者并不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一緻。微服務中動态的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對于這種情況,同一個事件防止被重複消費,

  隻要把這些應用放置于同一個 “group” 中,就能夠保證消息隻會被其中一個應用消費一次。不同的組是可以消費的,同一個組内會發生競争關系,隻有其中一個可以消費。

首先修改該springcloud-stream1子產品的配置,修改代碼如下:

server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
#input是接收,注意這裡不能再像前面一樣寫output了
          input:
            destination: stream-demo
#分組的組名
            group: group      

接着修改springcloud-stream2子產品的配置,代碼如下:

server:
  port: 7890
spring:
  application:
    name: consumer_2
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo-trans
            group: group      

可以看到springcloud-stream1和springcloud-stream2是屬于同一組的。springcloud-stream子產品的發的消息隻能被springcloud-stream1或springcloud-stream2其中一個接收到,這樣避免了重複消費。

springcloud-stream2子產品代碼恢複成如下代碼:

package hjc.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

/**
 * Created by cong on 2018/5/28.
 */
//消息接受端,stream給我們提供了Sink,Sink源碼裡面是綁定input的,要跟我們配置檔案的imput關聯的。
@EnableBinding(Sink.class)
public class RecieveService {

    @StreamListener(Sink.INPUT)
    public void recieve(Object payload){
        System.out.println(payload);
    }

}      

springcloud-stream2的接收端代碼不變,依然跟上面代碼一樣。

接着,運作結果如下:

SpringCloud實戰9-Stream消息驅動

控制台如下:

SpringCloud實戰9-Stream消息驅動
SpringCloud實戰9-Stream消息驅動

可以看到隻有其中一個受到消息。避免了消息重複消費。

有時候我們隻想給特定的消費者消費消息,那麼又該真麼做呢?

這是後又涉及到消息分區的概念了。

消息分區()

  Spring Cloud Stream對給定應用的多個執行個體之間分隔資料予以支援。在分隔方案中,實體交流媒介(如:代理主題)被視為分隔成了多個片(partitions)。一個或者多個生産者應用執行個體給多個消費者應用執行個體發送消息并確定相同特征的資料被同一消費者執行個體處理。 

Spring Cloud Stream對分割的程序執行個體實作進行了抽象。使得Spring Cloud Stream 為不具備分區功能的消息中間件(RabbitMQ)也增加了分區功能擴充。

那麼我們就要進行一些配置了,比如我隻想要springcloud-stream2子產品接收到消息,

springcloud-stream2配置如下:

server:
  port: 7890
spring:
  application:
    name: consumer_2
  cloud:
    stream:
      kafka:
        binder:
          brockers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo-trans
            group: group
            consumer:
#開啟分區
              partitioned: true
#分區數量
      instance-count: 2      

生産者端springcloud-stream子產品配置如下:

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
#Kafka的消息中間件伺服器
          brockers: localhost:9092
#Zookeeper的節點,如果叢集,後面加,号分隔
          zk-nodes: localhost:2181
#如果設定為false,就不會自動建立Topic 有可能你Topic還沒建立就直接調用了。
          auto-create-topics: true
      bindings:
#自定義output
        myOutput:
#消息發往的目的地
            destination: stream-demo
#消息發送的格式,接收端不用指定格式,但是發送端要
            content-type: text/plain
            producer:
#分區的主鍵,根據什麼來分區,下面的payload.id隻是一個對象的id用于做為Key,用來說明的。希望不要誤解
              partitionKeyExpression: payload.id
#Key和分區數量進行取模去配置設定消息,這裡分區數量配置為2
              partitionCount: 2      

其他的代碼基本不變,這裡就不示範了。這裡要給大家說明一下,比如分區的Key是一個對象的id,比如說id=1,每次發送消息的對象的id為相同值1,則消息隻會被同一個消費者消費,比如說Key和分區數量取模計算的結果是分到stream2子產品中,那麼下一次進行進行消息發送,

隻要分組的key即id的值依然還是1的話,消息永遠隻會配置設定到stream2子產品中。