天天看点

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》

前情提要 Reactor Operators

本节的内容来自我翻译的Reactor 3 参考文档——如何选择操作符。由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅。

如果一个操作符是专属于

Flux

Mono

的,那么会给它注明前缀。

公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,

会以一个点(.)开头,并将参数置于圆括号内,比如:

.methodCall(parameter)

1)创建一个新序列,它...

  • 发出一个

    T

    ,我已经有了:

    just

    • ...基于一个

      Optional<T>

      Mono#justOrEmpty(Optional<T>)

    • ...基于一个可能为

      null

      的 T:

      Mono#justOrEmpty(T)

  • T

    ,且还是由

    just

    方法返回
    • ...但是“懒”创建的:使用

      Mono#fromSupplier

      或用

      defer

      包装

      just

  • 发出许多

    T

    ,这些元素我可以明确列举出来:

    Flux#just(T...)

  • 基于迭代数据结构:
    • 一个数组:

      Flux#fromArray

    • 一个集合或 iterable:

      Flux#fromIterable

    • 一个 Integer 的 range:

      Flux#range

    • 一个

      Stream

      提供给每一个订阅:

      Flux#fromStream(Supplier<Stream>)

  • 基于一个参数值给出的源:
    • Supplier<T>

      Mono#fromSupplier

    • 一个任务:

      Mono#fromCallable

      Mono#fromRunnable

    • CompletableFuture<T>

      Mono#fromFuture

  • 直接完成:

    empty

  • 立即生成错误:

    error

    • ...但是“懒”的方式生成

      Throwable

      error(Supplier<Throwable>)

  • 什么都不做:

    never

  • 订阅时才决定:

    defer

  • 依赖一个可回收的资源:

    using

  • 可编程地生成事件(可以使用状态):
    • 同步且逐个的:

      Flux#generate

    • 异步(也可同步)的,每次尽可能多发出元素:

      Flux#create

      Mono#create

      也是异步的,只不过只能发一个)

2)对序列进行转化

  • 我想转化一个序列:
    • 1对1地转化(比如字符串转化为它的长度):

      map

    • ...类型转化:

      cast

    • ...为了获得每个元素的序号:

      Flux#index

    • 1对n地转化(如字符串转化为一串字符):

      flatMap

      + 使用一个工厂方法
    • 1对n地转化可自定义转化方法和/或状态:

      handle

    • 对每一个元素执行一个异步操作(如对 url 执行 http 请求):

      flatMap

      + 一个异步的返回类型为

      Publisher

      的方法
    • ...忽略一些数据:在 flatMap lambda 中根据条件返回一个

      Mono.empty()

    • ...保留原来的序列顺序:

      Flux#flatMapSequential

      (对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序)
    • ...当 Mono 元素的异步任务会返回多个元素的序列时:

      Mono#flatMapMany

  • 我想添加一些数据元素到一个现有的序列:
    • 在开头添加:

      Flux#startWith(T...)

    • 在最后添加:

      Flux#concatWith(T...)

  • 我想将

    Flux

    转化为集合(一下都是针对

    Flux

    的)
    • 转化为 List:

      collectList

      collectSortedList

    • 转化为 Map:

      collectMap

      collectMultiMap

    • 转化为自定义集合:

      collect

    • 计数:

      count

    • reduce 算法(将上个元素的reduce结果与当前元素值作为输入执行reduce方法,如sum)

      reduce

    • ...将每次 reduce 的结果立即发出:

      scan

    • 转化为一个 boolean 值:
    • 对所有元素判断都为true:

      all

    • 对至少一个元素判断为true:

      any

    • 判断序列是否有元素(不为空):

      hasElements

    • 判断序列中是否有匹配的元素:

      hasElement

  • 我想合并 publishers...
    • 按序连接:

      Flux#concat

      .concatWith(other)

    • ...即使有错误,也会等所有的 publishers 连接完成:

      Flux#concatDelayError

    • ...按订阅顺序连接(这里的合并仍然可以理解成序列的连接):

      Flux#mergeSequential

    • 按元素发出的顺序合并(无论哪个序列的,元素先到先合并):

      Flux#merge

      /

      .mergeWith(other)

    • ...元素类型会发生变化:

      Flux#zip

      Flux#zipWith

    • 将元素组合:
    • 2个 Monos 组成1个

      Tuple2

      Mono#zipWith

    • n个 Monos 的元素都发出来后组成一个 Tuple:

      Mono#zip

    • 在终止信号出现时“采取行动”:
    • 在 Mono 终止时转换为一个

      Mono<Void>

      Mono#and

    • 当 n 个 Mono 都终止时返回

      Mono<Void>

      Mono#when

    • 返回一个存放组合数据的类型,对于被合并的多个序列:
      • 每个序列都发出一个元素时:

        Flux#zip

      • 任何一个序列发出元素时:

        Flux#combineLatest

    • 只取各个序列的第一个元素:

      Flux#first

      Mono#first

      mono.or<br/>(otherMono).or(thirdMono)

      ,`flux.or(otherFlux).or(thirdFlux)
    • 由一个序列触发(类似于

      flatMap

      ,不过“喜新厌旧”):

      switchMap

    • 由每个新序列开始时触发(也是“喜新厌旧”风格):

      switchOnNext

  • 我想重复一个序列:

    repeat

    • ...但是以一定的间隔重复:

      Flux.interval(duration).flatMap(tick -&gt; myExistingPublisher)

  • 我有一个空序列,但是...
    • 我想要一个缺省值来代替:

      defaultIfEmpty

    • 我想要一个缺省的序列来代替:

      switchIfEmpty

  • 我有一个序列,但是我对序列的元素值不感兴趣:

    ignoreElements

    • ...并且我希望用

      Mono

      来表示序列已经结束:

      then

    • ...并且我想在序列结束后等待另一个任务完成:

      thenEmpty

    • ...并且我想在序列结束之后返回一个

      Mono

      Mono#then(mono)

    • ...并且我想在序列结束之后返回一个值:

      Mono#thenReturn(T)

    • Flux

      thenMany

  • 我有一个 Mono 但我想延迟完成...
    • ...当有1个或N个其他 publishers 都发出(或结束)时才完成:

      Mono#delayUntilOther

    • ...使用一个函数式来定义如何获取“其他 publisher”:

      Mono#delayUntil(Function)

  • 我想基于一个递归的生成序列的规则扩展每一个元素,然后合并为一个序列发出:
    • ...广度优先:

      expand(Function)

    • ...深度优先:

      expandDeep(Function)

3)“窥视”(只读)序列

  • 再不对序列造成改变的情况下,我想:
    • 得到通知或执行一些操作:
    • 发出元素:

      doOnNext

    • 序列完成:

      Flux#doOnComplete

      Mono#doOnSuccess

    • 因错误终止:

      doOnError

    • 取消:

      doOnCancel

    • 订阅时:

      doOnSubscribe

    • 请求时:

      doOnRequest

    • 完成或错误终止:

      doOnTerminate

      (Mono的方法可能包含有结果)
      • 但是在终止信号向下游传递 之后 :

        doAfterTerminate

    • 所有类型的信号(

      Signal

      ):

      Flux#doOnEach

    • 所有结束的情况(完成complete、错误error、取消cancel):

      doFinally

    • 记录日志:

      log

  • 我想知道所有的事件:
    • 每一个事件都体现为一个

      single

      对象:
    • 执行 callback:

      doOnEach

    • 每个元素转化为

      single

      materialize

      • ...在转化回元素:

        dematerialize

    • 转化为一行日志:

      log

4)过滤序列

  • 我想过滤一个序列
    • 基于给定的判断条件:

      filter

    • ...异步地进行判断:

      filterWhen

    • 仅限于指定类型的对象:

      ofType

    • 忽略所有元素:

      ignoreElements

    • 去重:
    • 对于整个序列:

      Flux#distinct

    • 去掉连续重复的元素:

      Flux#distinctUntilChanged

  • 我只想要一部分序列:
    • 只要 N 个元素:
    • 从序列的第一个元素开始算:

      Flux#take(long)

      • ...取一段时间内发出的元素:

        Flux#take(Duration)

      • ...只取第一个元素放到

        Mono

        中返回:

        Flux#next()

      • ...使用

        request(N)

        而不是取消:

        Flux#limitRequest(long)

    • 从序列的最后一个元素倒数:

      Flux#takeLast

    • 直到满足某个条件(包含):

      Flux#takeUntil

      (基于判断条件),

      Flux#takeUntilOther

      (基于对 publisher 的比较)
    • 直到满足某个条件(不包含):

      Flux#takeWhile

    • 最多只取 1 个元素:
    • 给定序号:

      Flux#elementAt

    • 最后一个:

      .takeLast(1)

      • ...如果为序列空则发出错误信号:

        Flux#last()

      • ...如果序列为空则返回默认值:

        Flux#last(T)

    • 跳过一些元素:
    • 从序列的第一个元素开始跳过:

      Flux#skip(long)

      • ...跳过一段时间内发出的元素:

        Flux#skip(Duration)

    • 跳过最后的 n 个元素:

      Flux#skipLast

    • Flux#skipUntil

      Flux#skipUntilOther

    • Flux#skipWhile

    • 采样:
    • 给定采样周期:

      Flux#sample(Duration)

      • 取采样周期里的第一个元素而不是最后一个:

        sampleFirst

    • 基于另一个 publisher:

      Flux#sample(Publisher)

    • 基于 publisher“超时”:

      Flux#sampleTimeout

      (每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)
  • 我只想要一个元素(如果多于一个就返回错误)...
    • 如果序列为空,发出错误信号:

      Flux#single()

    • 如果序列为空,发出一个缺省值:

      Flux#single(T)

    • 如果序列为空就返回一个空序列:

      Flux#singleOrEmpty

5)错误处理

  • 我想创建一个错误序列:

    error

    ...
    • ...替换一个完成的

      Flux

      .concat(Flux.error(e))

    • Mono

      .then(Mono.error(e))

    • ...如果元素超时未发出:

      timeout

    • ...“懒”创建:

      error(Supplier&lt;Throwable&gt;)

  • 我想要类似 try/catch 的表达方式:
    • 抛出异常:

      error

    • 捕获异常:
    • 然后返回缺省值:

      onErrorReturn

    • 然后返回一个

      Flux

      Mono

      onErrorResume

    • 包装异常后再抛出:

      .onErrorMap(t -&gt; new RuntimeException(t))

    • finally 代码块:

      doFinally

    • Java 7 之后的 try-with-resources 写法:

      using

      工厂方法
  • 我想从错误中恢复...
    • 返回一个缺省的:
    • 的值:

      onErrorReturn

    • Publisher

      Flux#onErrorResume

      Mono#onErrorResume

    • 重试:

      retry

    • ...由一个用于伴随 Flux 触发:

      retryWhen

  • 我想处理回压错误(向上游发出“MAX”的 request,如果下游的 request 比较少,则应用策略)...
    • 抛出

      IllegalStateException

      Flux#onBackpressureError

    • 丢弃策略:

      Flux#onBackpressureDrop

    • ...但是不丢弃最后一个元素:

      Flux#onBackpressureLatest

    • 缓存策略(有限或无限):

      Flux#onBackpressureBuffer

    • ...当有限的缓存空间用满则应用给定策略:

      Flux#onBackpressureBuffer

      带有策略

      BufferOverflowStrategy

6) 基于时间的操作

  • 我想将元素转换为带有时间信息的

    Tuple2&lt;Long, T&gt;

    • 从订阅时开始:

      elapsed

    • 记录时间戳:

      timestamp

  • 如果元素间延迟过长则中止序列:

    timeout

  • 以固定的周期发出元素:

    Flux#interval

  • 在一个给定的延迟后发出 :static

    Mono.delay

    .
  • 我想引入延迟:
    • 对每一个元素:

      Mono#delayElement

      Flux#delayElements

    • 延迟订阅:

      delaySubscription

7)拆分

Flux

  • 我想将一个

    Flux&lt;T&gt;

    拆分为一个

    Flux&lt;Flux&lt;T&gt;&gt;

    • 以个数为界:

      window(int)

    • ...会出现重叠或丢弃的情况:

      window(int, int)

    • 以时间为界:

      window(Duration)

    • window(Duration, Duration)

    • 以个数或时间为界:

      windowTimeout(int, Duration)

    • 基于对元素的判断条件:

      windowUntil

    • ...触发判断条件的元素会分到下一波(

      cutBefore

      变量):

      .windowUntil(predicate, true)

    • ...满足条件的元素在一波,直到不满足条件的元素发出开始下一波:

      windowWhile

      (不满足条件的元素会被丢弃)
    • 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:

      window(Publisher)

      windowWhen

    • 拆分为一个一个的

      List

      :
    • buffer(int)

      • buffer(int, int)

    • buffer(Duration)

      • buffer(Duration, Duration)

    • bufferTimeout(int, Duration)

    • bufferUntil(Predicate)

      • ...触发判断条件的元素会分到下一个buffer:

        .bufferUntil(predicate, true)

      • ...满足条件的元素在一个buffer,直到不满足条件的元素发出开始下一buffer:

        bufferWhile(Predicate)

    • buffer(Publisher)

      bufferWhen

    • 拆分到指定类型的 "collection":

      buffer(int, Supplier&lt;C&gt;)

  • Flux&lt;T&gt;

    中具有共同特征的元素分组到子 Flux:

    groupBy(Function&lt;T,K&gt;)

    (注意返回值是

    Flux&lt;GroupedFlux&lt;K, T&gt;&gt;

    ,每一个

    GroupedFlux

    具有相同的 key 值

    K

    ,可以通过

    key()

    方法获取)。
    • 在拿到第一个元素前阻塞:

      Flux#blockFirst

    • ...并给出超时时限:

      Flux#blockFirst(Duration)

    • 在拿到最后一个元素前阻塞(如果序列为空则返回 null):

      Flux#blockLast

    • Flux#blockLast(Duration)

    • 同步地转换为

      Iterable&lt;T&gt;

      Flux#toIterable

    • 同步地转换为 Java 8

      Stream&lt;T&gt;

      Flux#toStream

  • 我有一个

    Mono&lt;T&gt;

    ,我想:
    • 在拿到元素前阻塞:

      Mono#block

    • Mono#block(Duration)

    • 转换为

      CompletableFuture&lt;T&gt;

      Mono#toFuture