本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範
2.9 Processor
Processor
既是一種特别的釋出者(
Publisher
)又是一種訂閱者(
Subscriber
)。 是以你能夠訂閱一個
Processor
,也可以調用它們提供的方法來手動插入資料到序列,或終止序列。
前面一直在聊響應式流的四個接口中的三個:
Publisher
、
Subscriber
Subscription
,唯獨
Processor
遲遲沒有提及。原因在于想用好它們不太容易,多數情況下,我們應該進行避免使用
Processor
,通常來說僅用于一些特殊場景。
2.9.1 使用 Sink 來線程安全地生成流
比起直接使用Processor,更好的方式是通過調用
sink()
來得到它的Sink。這個Sink是線程安全的,可以用于在應用程式中多線程并發地生成資料。例如,通過
UnicastProcessor
得到一個線程安全的 sink:
UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);
多個線程可以并發地通過下邊的方法生成資料到sink。
sink.next(n);
看到這裡是不是感覺跟
generate
生成資料流的方式很像?是以Reactor官方建議,當你想要使用Processor的時候,首先看看能否用
generate
實作同樣的功能,或者看看是否有相應的操作符可以達到你想要的效果。
2.9.2 Reactor 内置的 Processor
Reactor Core 内置多種 Processor。這些 processor 具有不同的文法,大概分為三類。
- 直接的(direct)(DirectProcessor 和 UnicastProcessor):這些 processors 隻能通過直接 調用 Sink 的方法來推送資料。
- 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):這些 processors 既可以直接調用 Sink 方法來推送資料,也可以通過訂閱到一個上遊的釋出者來同步地産生資料。
- 異步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):這些 processors 可以将從多個上遊釋出者得到的資料推送下去。由于使用了 RingBuffer 的資料結構來緩存多個來自上遊的資料,是以更加有健壯性。
異步的 processor 在執行個體化的時候最複雜,因為有許多不同的選項。是以它們暴露出一個 Builder 接口。 而簡單的 processors 有靜态的工廠方法。
1)DirectProcessor
DirectProcessor
可以将信号分發給零到多個訂閱者(Subscriber)。它是最容易執行個體化的,使用靜态方法 create() 即可。另一方面,它的不足是無法處理背壓。是以,當
DirectProcessor
推送的是 N 個元素,而至少有一個訂閱者的請求個數少于 N 的時候,就會發出一個
IllegalStateException
。
一旦 Processor 結束(通常通過調用它的 Sink 的 error(Throwable) 或 complete() 方法), 雖然它允許更多的訂閱者訂閱它,但是會立即向它們重新發送終止信号。
2)UnicastProcessor
UnicastProcessor
可以使用一個内置的緩存來處理背壓。代價就是它最多隻能有一個訂閱者(上一節的例子通過
publish
轉換成了
ConnectableFlux
,是以可以接入兩個訂閱者)。
UnicastProcessor
有多種選項,是以提供多種不同的
create
靜态方法。例如,它預設是 無限的(unbounded) :如果你在在訂閱者還沒有請求資料的情況下讓它推送資料,它會緩存所有資料。
可以通過提供一個自定義的 Queue 的具體實作傳遞給 create 工廠方法來改變預設行為。如果給出的隊列是有限的(bounded), 并且緩存已滿,而且未收到下遊的請求,processor 會拒絕推送資料。
在上邊“有限的”例子中,還可以在構造 processor 的時候提供一個回調方法,這個回調方法可以在每一個 被拒絕推送的元素上調用,進而讓開發者有機會清理這些元素。
3)EmitterProcessor
EmitterProcessor
能夠向多個訂閱者發送資料,并且可以對每一個訂閱者進行背壓處理。它本身也可以訂閱一個釋出者并同步獲得資料。
最初如果沒有訂閱者,它仍然允許推送一些資料到緩存,緩存大小由
bufferSize
定義。 之後如果仍然沒有訂閱者訂閱它并消費資料,對
onNext
的調用會阻塞,直到有訂閱者接入 (這時隻能并發地訂閱了)。
是以第一個訂閱者會收到最多
bufferSize
個元素。然而之後,後續接入的訂閱者隻能擷取到它們開始訂閱之後推送的資料。這個内部的緩存會繼續用于背壓的目的。
預設情況下,如果所有的訂閱者都取消了訂閱,它會清空内部緩存,并且不再接受更多的訂閱者。這一點可以通過 create 靜态工廠方法的 autoCancel 參數來配置。
4)ReplayProcessor
ReplayProcessor
會緩存直接通過自身的 Sink 推送的元素,以及來自上遊釋出者的元素, 并且後來的訂閱者也會收到重發(replay)的這些元素。
可以通過多種配置方式建立它:
- 緩存一個元素(cacheLast)。
- 緩存一定個數的曆史元素(create(int)),所有的曆史元素(create())。
- 緩存基于時間窗期間内的元素(createTimeout(Duration))。
- 緩存基于曆史個數和時間窗的元素(createSizeOrTimeout(int, Duration))。
5)TopicProcessor
TopicProcessor
是一個異步的 processor,它能夠重發來自多個上遊釋出者的元素, 這需要在建立它的時候配置
shared
(build() 的 share(boolean) 配置)。
如果你企圖在并發環境下通過并發的上遊釋出者調用的
TopicProcessor
onNext
,或
onComplete
方法,就必須配置
onError
。否則,并發調用就是非法的,進而 processor 是完全相容響應式流規範的。
shared
TopicProcessor
能夠對多個訂閱者發送資料。它通過對每一個訂閱者關聯一個線程來實作這一點, 這個線程會一直執行直到 processor 發出
onError
或
onComplete
信号,或關聯的訂閱者被取消。 最多可以接受的訂閱者個數由構造者方法
executor
指定,通過提供一個有限線程數的
ExecutorService
來限制這一個數。
這個 processor 基于一個
RingBuffer
資料結構來存儲已發送的資料。每一個訂閱者線程 自行管理其相關的資料在
RingBuffer
中的索引。
這個 processor 也有一個
autoCancel
構造器方法:如果設定為
true
(預設的),那麼當 所有的訂閱者取消之後,上遊釋出者也就被取消了。
6)WorkQueueProcessor
WorkQueueProcessor
也是一個異步的 processor,也能夠重發來自多個上遊釋出者的元素, 同樣在建立時需要配置
shared
(它多數構造器配置與
TopicProcessor
相同)。
它放松了對響應式流規範的相容,但是好處就在于相對于
TopicProcessor
來說需要更少的資源。 它仍然基于
RingBuffer
,但是不再要求每一個訂閱者都關聯一個線程,是以相對于
TopicProcessor
來說更具擴充性。
代價在于分發模式有些差別:來自訂閱者的請求會彙總在一起,并且這個 processor 每次隻對一個 訂閱者發送資料,是以需要循環(round-robin)對訂閱者發送資料,而不是一次全部發出的模式(無法保證完全公平的循環分發)。