簡介
Reactor是reactivex家族的一個非常重要的成員,Reactor是第四代的reactive library,它是基于Reactive Streams标準基礎上開發的,主要用來建構JVM環境下的非阻塞應用程式。
今天給大家介紹一下Reactor。
Reactor簡介
Reactor是基于JVM的非阻塞API,他直接跟JDK8中的API相結合,比如:CompletableFuture,Stream和Duration等。
它提供了兩個非常有用的異步序列API:Flux和Mono,并且實作了Reactive Streams的标準。
并且還可以和reactor-netty相結合,作為一些異步架構的底層服務,比如我們非常熟悉的Spring MVC 5中引入的WebFlux。
我們知道WebFlux的底層使用的是reactor-netty,而reactor-netty又引用了Reactor。是以,如果你在POM中引入了webFlux依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
那麼項目将會自動引入Reactor。
如果你用的不是Spring webflux,沒關系,你可以直接添加下面的依賴來使用Reactor:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
reactive programming的發展史
最最開始的時候微軟為.NET平台建立了Reactive Extensions (Rx) library。接着RxJava實作了JVM平台的Reactive。
然後Reactive Streams标準出現了,它定義了Java平台必須滿足的的一些規範。并且已經內建到JDK9中的java.util.concurrent類中。
在Flow中定義了實作Reactive Streams的四個非常重要的元件,分别是Publisher,Subscriber,Subscription和Processor。
Iterable-Iterator 和Publisher-Subscriber的差別
一般來說reactive在面向對象的程式設計語言中是以觀察者模式的擴充來使用的。
我們來具體看一下這個觀察者模式的實作,以Publisher和Subscriber為例:
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
上面定義了兩個接口,Publisher和Subscriber,Publisher的作用就是subscribe到subscriber。
而subscriber定義了4個on方法,用來觸發特定的事件。
那麼Publisher中的subscribe是怎麼觸發Subscriber的onSubscribe事件呢?
很簡單,我們看一個具體的實作:
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Subscription sub;
if (throwable != null) {
assert iterable == null : "non-null iterable: " + iterable;
sub = new Subscription(subscriber, null, throwable);
} else {
assert throwable == null : "non-null exception: " + throwable;
sub = new Subscription(subscriber, iterable.iterator(), null);
}
subscriber.onSubscribe(sub);
if (throwable != null) {
sub.pullScheduler.runOrSchedule();
}
}
上面的例子是PullPublisher的subscribe實作。我們可以看到,在這個subscribe中觸發了subscriber.onSubscribe方法。而這就是觀察者模式的秘密。
或者說,當Publisher調用subscribe的時候,是主動push subscriber的onSubscribe方法。
熟悉Iterable-Iterator模式的朋友應該都知道,Iterator模式,其實是一個主動的pull模式,因為需要不斷的去調用next()方法。是以它的控制權是在調用方。
為什麼要使用異步reactive
在現代應用程式中,随着使用者量的增多,程式員需要考慮怎麼才能提升系統的處理能力。
傳統的block IO的方式,因為需要占用大量的資源,是以是不适合這樣的場景的。我們需要的是NO-block IO。
JDK中提供了兩種異步程式設計的模型:
第一種是Callbacks,異步方法可以通過傳入一個Callback參數的形式來在Callback中執行異步任務。比較典型的像是java Swing中的EventListener。
第二中就是使用Future了。我們使用Callable來送出一個任務,然後通過Future來拿到它的運作結果。
這兩種異步程式設計會有什麼問題呢?
callback的問題就在于回調地獄。熟悉JS的朋友應該很了解這個回調地獄的概念。
簡單點講,回調地獄就是在callback中又使用了callback,進而造成了這種callback的層級調用關系。
而Future主要是對一個異步執行的結果進行擷取,它的 get()實際上是一個block操作。并且不支援異常處理,也不支援延遲計算。
當有多個Future的組合應該怎麼處理呢?JDK8 實際上引入了一個CompletableFuture類,這個類是Future也是一個CompletionStage,CompletableFuture支援then的級聯操作。不過CompletableFuture提供的方法不是那麼的豐富,可能滿足不了我的需求。
于是我們的Reactor來了。
Flux
Reactor提供了兩個非常有用的操作,他們是 Flux 和 Mono。 其中Flux 代表的是 0 to N 個響應式序列,而Mono代表的是0或者1個響應式序列。
我們看一個Flux是怎麼transfer items的:
先看下Flux的定義:
public abstract class Flux<T> implements Publisher<T>
可以看到Flux其實就是一個Publisher,用來産生異步序列。
Flux提供了非常多的有用的方法,來處理這些序列,并且提供了completion和error的信号通知。
相應的會去調用Subscriber的onNext, onComplete, 和 onError 方法。
Mono
我們看下Mono是怎麼transfer items的:
看下Mono的定義:
public abstract class Mono<T> implements Publisher<T>
Mono和Flux一樣,也是一個Publisher,用來産生異步序列。
Mono因為隻有0或者1個序列,是以隻會觸發Subscriber的onComplete和onError方法,沒有onNext。
另一方面,Mono其實可以看做Flux的子集,隻包含Flux的部分功能。
Mono和Flux是可以互相轉換的,比如Mono#concatWith(Publisher)傳回一個Flux,而 Mono#then(Mono)傳回一個Mono.
Flux和Mono的基本操作
我們看下Flux建立的例子:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
可以看到Flux提供了很多種建立的方式,我們可以自由選擇。
再看看Flux的subscribe方法:
Disposable subscribe();
Disposable subscribe(Consumer<? super T> consumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
subscribe可以一個參數都沒有,也可以多達4個參數。
看下沒有參數的情況:
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
numbersFromFiveToSeven.subscribe();
注意,沒有參數并不表示Flux的對象不被消費,隻是不可見而已。
看下帶參數的情況:consumer用來處理on each事件,errorConsumer用來處理on error事件,completeConsumer用來處理on complete事件,subscriptionConsumer用來處理on subscribe事件。
前面的3個參數很好了解,我們來舉個例子:
Flux<Integer> ints3 = Flux.range(1, 4);
ints3.subscribe(System.out::println,
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(2));
我們建構了從1到4的四個整數的Flux,on each就是列印出來,如果中間有錯誤的話,就輸出Error,全部完成就輸出Done。
那麼最後一個subscriptionConsumer是做什麼用的呢?
subscriptionConsumer accept的是一個Subscription對象,我們看下Subscription的定義:
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription 定義了兩個方法,用來做初始化用的,我們可以調用request(n)來決定這次subscribe擷取元素的最大數目。
比如上面我們的例子中,雖然建構了4個整數,但是最終輸出的隻有2個。
上面所有的subscribe方法,都會傳回一個Disposable對象,我們可以通過Disposable對象的dispose()方法,來取消這個subscribe。
Disposable隻定義了兩個方法:
public interface Disposable {
void dispose();
default boolean isDisposed() {
return false;
}
dispose的原理是向Flux 或者 Mono發出一個停止産生新對象的信号,但是并不能保證對象産生馬上停止。
有了Disposable,當然要介紹它的工具類Disposables。
Disposables.swap() 可以建立一個Disposable,用來替換或者取消一個現有的Disposable。
Disposables.composite(…)可以将多個Disposable合并起來,在後面統一做處理。
總結
本文介紹了Reactor的基本原理和兩非常重要的元件Flux和Mono,下一篇文章我們會繼續介紹Reactor core的一些更加進階的用法。敬請期待。
本文的例子
learn-reactive本文作者:flydean程式那些事
本文連結:
http://www.flydean.com/introduction-to-reactor/本文來源:flydean的部落格
歡迎關注我的公衆号:「程式那些事」最通俗的解讀,最深刻的幹貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!