天天看點

(19)Reactor Processors——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》

前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範

2.9 Processor

Processor

既是一種特别的釋出者(

Publisher

)又是一種訂閱者(

Subscriber

)。 是以你能夠訂閱一個

Processor

,也可以調用它們提供的方法來手動插入資料到序列,或終止序列。

前面一直在聊響應式流的四個接口中的三個:

Publisher

Subscriber

Subscription

,唯獨

Processor

遲遲沒有提及。原因在于想用好它們不太容易,多數情況下,我們應該進行避免使用

Processor

,通常來說僅用于一些特殊場景。

2.9.1 使用 Sink 來線程安全地生成流

比起直接使用Processor,更好的方式是通過調用

sink()

來得到它的Sink。這個Sink是線程安全的,可以用于在應用程式中多線程并發地生成資料。例如,通過

UnicastProcessor

得到一個線程安全的 sink:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
    FluxSink<Integer> sink = processor.sink(overflowStrategy);           

多個線程可以并發地通過下邊的方法生成資料到sink。

sink.next(n);           

看到這裡是不是感覺跟

generate

生成資料流的方式很像?是以Reactor官方建議,當你想要使用Processor的時候,首先看看能否用

generate

實作同樣的功能,或者看看是否有相應的操作符可以達到你想要的效果。

2.9.2 Reactor 内置的 Processor

Reactor Core 内置多種 Processor。這些 processor 具有不同的文法,大概分為三類。

  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):這些 processors 隻能通過直接 調用 Sink 的方法來推送資料。
  • 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):這些 processors 既可以直接調用 Sink 方法來推送資料,也可以通過訂閱到一個上遊的釋出者來同步地産生資料。
  • 異步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):這些 processors 可以将從多個上遊釋出者得到的資料推送下去。由于使用了 RingBuffer 的資料結構來緩存多個來自上遊的資料,是以更加有健壯性。

異步的 processor 在執行個體化的時候最複雜,因為有許多不同的選項。是以它們暴露出一個 Builder 接口。 而簡單的 processors 有靜态的工廠方法。

1)DirectProcessor

DirectProcessor

可以将信号分發給零到多個訂閱者(Subscriber)。它是最容易執行個體化的,使用靜态方法 create() 即可。另一方面,它的不足是無法處理背壓。是以,當

DirectProcessor

推送的是 N 個元素,而至少有一個訂閱者的請求個數少于 N 的時候,就會發出一個

IllegalStateException

一旦 Processor 結束(通常通過調用它的 Sink 的 error(Throwable) 或 complete() 方法), 雖然它允許更多的訂閱者訂閱它,但是會立即向它們重新發送終止信号。

2)UnicastProcessor

UnicastProcessor

可以使用一個内置的緩存來處理背壓。代價就是它最多隻能有一個訂閱者(上一節的例子通過

publish

轉換成了

ConnectableFlux

,是以可以接入兩個訂閱者)。

UnicastProcessor

有多種選項,是以提供多種不同的

create

靜态方法。例如,它預設是 無限的(unbounded) :如果你在在訂閱者還沒有請求資料的情況下讓它推送資料,它會緩存所有資料。

可以通過提供一個自定義的 Queue 的具體實作傳遞給 create 工廠方法來改變預設行為。如果給出的隊列是有限的(bounded), 并且緩存已滿,而且未收到下遊的請求,processor 會拒絕推送資料。

在上邊“有限的”例子中,還可以在構造 processor 的時候提供一個回調方法,這個回調方法可以在每一個 被拒絕推送的元素上調用,進而讓開發者有機會清理這些元素。

3)EmitterProcessor

EmitterProcessor

能夠向多個訂閱者發送資料,并且可以對每一個訂閱者進行背壓處理。它本身也可以訂閱一個釋出者并同步獲得資料。

最初如果沒有訂閱者,它仍然允許推送一些資料到緩存,緩存大小由

bufferSize

定義。 之後如果仍然沒有訂閱者訂閱它并消費資料,對

onNext

的調用會阻塞,直到有訂閱者接入 (這時隻能并發地訂閱了)。

是以第一個訂閱者會收到最多

bufferSize

個元素。然而之後,後續接入的訂閱者隻能擷取到它們開始訂閱之後推送的資料。這個内部的緩存會繼續用于背壓的目的。

預設情況下,如果所有的訂閱者都取消了訂閱,它會清空内部緩存,并且不再接受更多的訂閱者。這一點可以通過 create 靜态工廠方法的 autoCancel 參數來配置。

4)ReplayProcessor

ReplayProcessor

會緩存直接通過自身的 Sink 推送的元素,以及來自上遊釋出者的元素, 并且後來的訂閱者也會收到重發(replay)的這些元素。

可以通過多種配置方式建立它:

  • 緩存一個元素(cacheLast)。
  • 緩存一定個數的曆史元素(create(int)),所有的曆史元素(create())。
  • 緩存基于時間窗期間内的元素(createTimeout(Duration))。
  • 緩存基于曆史個數和時間窗的元素(createSizeOrTimeout(int, Duration))。

5)TopicProcessor

TopicProcessor

是一個異步的 processor,它能夠重發來自多個上遊釋出者的元素, 這需要在建立它的時候配置

shared

(build() 的 share(boolean) 配置)。

如果你企圖在并發環境下通過并發的上遊釋出者調用

TopicProcessor

onNext

onComplete

,或

onError

方法,就必須配置

shared

。否則,并發調用就是非法的,進而 processor 是完全相容響應式流規範的。

TopicProcessor

能夠對多個訂閱者發送資料。它通過對每一個訂閱者關聯一個線程來實作這一點, 這個線程會一直執行直到 processor 發出

onError

onComplete

信号,或關聯的訂閱者被取消。 最多可以接受的訂閱者個數由構造者方法

executor

指定,通過提供一個有限線程數的

ExecutorService

來限制這一個數。

這個 processor 基于一個

RingBuffer

資料結構來存儲已發送的資料。每一個訂閱者線程 自行管理其相關的資料在

RingBuffer

中的索引。

這個 processor 也有一個

autoCancel

構造器方法:如果設定為

true

(預設的),那麼當 所有的訂閱者取消之後,上遊釋出者也就被取消了。

6)WorkQueueProcessor

WorkQueueProcessor

也是一個異步的 processor,也能夠重發來自多個上遊釋出者的元素, 同樣在建立時需要配置

shared

(它多數構造器配置與

TopicProcessor

相同)。

它放松了對響應式流規範的相容,但是好處就在于相對于

TopicProcessor

來說需要更少的資源。 它仍然基于

RingBuffer

,但是不再要求每一個訂閱者都關聯一個線程,是以相對于

TopicProcessor

來說更具擴充性。

代價在于分發模式有些差別:來自訂閱者的請求會彙總在一起,并且這個 processor 每次隻對一個 訂閱者發送資料,是以需要循環(round-robin)對訂閱者發送資料,而不是一次全部發出的模式(無法保證完全公平的循環分發)。