天天看点

(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》

前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范

2.9 Processor

Processor

既是一种特别的发布者(

Publisher

)又是一种订阅者(

Subscriber

)。 所以你能够订阅一个

Processor

,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。

前面一直在聊响应式流的四个接口中的三个:

Publisher

Subscriber

Subscription

,唯独

Processor

迟迟没有提及。原因在于想用好它们不太容易,多数情况下,我们应该进行避免使用

Processor

,通常来说仅用于一些特殊场景。

2.9.1 使用 Sink 来线程安全地生成流

比起直接使用Processor,更好的方式是通过调用

sink()

来得到它的Sink。这个Sink是线程安全的,可以用于在应用程序中多线程并发地生成数据。例如,通过

UnicastProcessor

得到一个线程安全的 sink:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
    FluxSink<Integer> sink = processor.sink(overflowStrategy);           

多个线程可以并发地通过下边的方法生成数据到sink。

sink.next(n);           

看到这里是不是感觉跟

generate

生成数据流的方式很像?所以Reactor官方建议,当你想要使用Processor的时候,首先看看能否用

generate

实现同样的功能,或者看看是否有相应的操作符可以达到你想要的效果。

2.9.2 Reactor 内置的 Processor

Reactor Core 内置多种 Processor。这些 processor 具有不同的语法,大概分为三类。

  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):这些 processors 只能通过直接 调用 Sink 的方法来推送数据。
  • 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):这些 processors 既可以直接调用 Sink 方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据。
  • 异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了 RingBuffer 的数据结构来缓存多个来自上游的数据,因此更加有健壮性。

异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。

1)DirectProcessor

DirectProcessor

可以将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 即可。另一方面,它的不足是无法处理背压。所以,当

DirectProcessor

推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个

IllegalStateException

一旦 Processor 结束(通常通过调用它的 Sink 的 error(Throwable) 或 complete() 方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。

2)UnicastProcessor

UnicastProcessor

可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者(上一节的例子通过

publish

转换成了

ConnectableFlux

,所以可以接入两个订阅者)。

UnicastProcessor

有多种选项,因此提供多种不同的

create

静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。

可以通过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。如果给出的队列是有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。

在上边“有限的”例子中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

3)EmitterProcessor

EmitterProcessor

能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个发布者并同步获得数据。

最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由

bufferSize

定义。 之后如果仍然没有订阅者订阅它并消费数据,对

onNext

的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。

因此第一个订阅者会收到最多

bufferSize

个元素。然而之后,后续接入的订阅者只能获取到它们开始订阅之后推送的数据。这个内部的缓存会继续用于背压的目的。

默认情况下,如果所有的订阅者都取消了订阅,它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create 静态工厂方法的 autoCancel 参数来配置。

4)ReplayProcessor

ReplayProcessor

会缓存直接通过自身的 Sink 推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。

可以通过多种配置方式创建它:

  • 缓存一个元素(cacheLast)。
  • 缓存一定个数的历史元素(create(int)),所有的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。

5)TopicProcessor

TopicProcessor

是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置

shared

(build() 的 share(boolean) 配置)。

如果你企图在并发环境下通过并发的上游发布者调用

TopicProcessor

onNext

onComplete

,或

onError

方法,就必须配置

shared

。否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。

TopicProcessor

能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出

onError

onComplete

信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法

executor

指定,通过提供一个有限线程数的

ExecutorService

来限制这一个数。

这个 processor 基于一个

RingBuffer

数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在

RingBuffer

中的索引。

这个 processor 也有一个

autoCancel

构造器方法:如果设置为

true

(默认的),那么当 所有的订阅者取消之后,上游发布者也就被取消了。

6)WorkQueueProcessor

WorkQueueProcessor

也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置

shared

(它多数构造器配置与

TopicProcessor

相同)。

它放松了对响应式流规范的兼容,但是好处就在于相对于

TopicProcessor

来说需要更少的资源。 它仍然基于

RingBuffer

,但是不再要求每一个订阅者都关联一个线程,因此相对于

TopicProcessor

来说更具扩展性。

代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式(无法保证完全公平的循环分发)。