天天看點

SpringBoot應用可以同時處理多少請求?

作者:架構師成長曆程

前言

前兩天面試的時候,面試官問我:一個ip發請求過來,是一個ip對應一個線程嗎?我突然愣住了,對于SpringBoot如何處理請求好像從來沒仔細思考過,是以面試結束後就仔細研究了一番,現在就來探讨一下這個問題。

正文

我們都知道,SpringBoot預設的内嵌容器是Tomcat,也就是我們的程式實際上是運作在Tomcat裡的。是以與其說SpringBoot可以處理多少請求,到不如說Tomcat可以處理多少請求。

關于Tomcat的預設配置,都在spring-configuration-metadata.json檔案中,對應的配置類則是org.springframework.boot.autoconfigure.web.ServerProperties。

SpringBoot應用可以同時處理多少請求?

和處理請求數量相關的參數有四個:

SpringBoot應用可以同時處理多少請求?
  • server.tomcat.threads.min-spare:最少的工作線程數,預設大小是10。該參數相當于長期工,如果并發請求的數量達不到10,就會依次使用這幾個線程去處理請求。
  • server.tomcat.threads.max:最多的工作線程數,預設大小是200。該參數相當于臨時工,如果并發請求的數量在10到200之間,就會使用這些臨時工線程進行處理。
  • server.tomcat.max-connections:最大連接配接數,預設大小是8192。表示Tomcat可以處理的最大請求數量,超過8192的請求就會被放入到等待隊列。
  • server.tomcat.accept-count:等待隊列的長度,預設大小是100。

舉個例子說明一下這幾個參數之間的關系:

SpringBoot應用可以同時處理多少請求?

如果把Tomcat比作一家飯店的話,那麼一個請求其實就相當于一位客人。min-spare就是廚師(長期工);max是廚師總數(長期工+臨時工);max-connections就是飯店裡的座位數量;accept-count是門口小闆凳的數量。來的客人優先坐到飯店裡面,然後廚師開始忙活,如果長期工可以幹的完,就讓長期工幹,如果長期工幹不完,就再讓臨時工幹。圖中畫的廚師一共15人,飯店裡有30個座位,也就是說,如果現在來了20個客人,那麼就會有5個人先在飯店裡等着。如果現在來了35個人,飯店裡坐不下,就會讓5個人先到門口坐一下。如果來了50個人,那麼飯店座位+門口小闆凳一共40個,是以就會有10人離開。

也就是說,SpringBoot同時所能處理的最大請求數量是max-connections+accept-count,超過該數量的請求直接就會被丢掉。

紙上得來終覺淺,絕知此事要躬行。

上面隻是理論結果,現在通過一個實際的小例子來示範一下到底是不是這樣:

建立一個SpringBoot的項目,在application.yml裡配置一下這幾個參數,因為預設的數量太大,不好測試,是以配小一點:

server:
   tomcat:
     threads:
       # 最少線程數
       min-spare: 10
       # 最多線程數
       max: 15
     # 最大連接配接數
     max-connections: 30
     # 最大等待數
     accept-count: 10
複制代碼           

再來寫一個簡單的接口:

@GetMapping("/test")
     public Response test1(HttpServletRequest request) throws Exception {
         log.info("ip:{},線程:{}", request.getRemoteAddr(), Thread.currentThread().getName());
         Thread.sleep(500);
         return Response.buildSuccess();
     }
複制代碼           

代碼很簡單,隻是列印了一下線程名,然後休眠0.5秒,這樣肯定會導緻部分請求處理一次性處理不了而進入到等待隊列。

然後我用Apifox建立了一個測試用例,去模拟100個請求:

SpringBoot應用可以同時處理多少請求?

觀察一下測試結果:

SpringBoot應用可以同時處理多少請求?

從結果中可以看出,由于設定的 max-connections+accept-count 的和是40,是以有60個請求會被丢棄,這和我們的預期是相符的。由于最大線程是15,也就是有25個請求會先等待,等前15個處理完了再處理15個,最後在處理10個,也就是将40個請求分成了15,15,10這樣三批進行處理。

SpringBoot應用可以同時處理多少請求?

再從控制台的列印日志可以看到,線程的最大編号是15,這也印證了前面的想法。

總結一下:如果并發請求數量低于server.tomcat.threads.max,則會被立即處理,超過的部分會先進行等待,如果數量超過max-connections與accept-count之和,則多餘的部分則會被直接丢棄。

延伸:并發問題是如何産生的

到目前為止,就已經搞明白了SpringBoot可以同時處理多少請求的問題。但是在這裡我還想基于上面的例子再延伸一下,就是為什麼并發場景下會出現一些值和我們預期的不一樣?

設想有以下場景:廚師們用一個賬本記錄一共做了多少道菜,每個廚師做完菜都記錄一下,每次記錄都是将賬本上的數字先抄到草稿紙上,計算x+1等于多少,然後将計算的結果寫回到賬本上。

SpringBoot應用可以同時處理多少請求?

Spring容器中的Bean預設是單例的,也就是說,處理請求的Controller、Service執行個體就隻有一份。在并發場景下,将cookSum定義為全局變量,是所有線程共享的,當一個線程讀到了cookSum=20,然後計算,寫回前另一個線程也讀到是20,兩個線程都加1後寫回,最終cookSum就變成了21,但是實際上應該是22,因為加了兩次。

private int cookSum = 0;
 
 @GetMapping("/test")
 public Response test1(HttpServletRequest request) throws Exception {
     // 做菜。。。。。。
     cookSum += 1;
     log.info("做了{}道菜", cookSum);
     Thread.sleep(500);
     return Response.buildSuccess();
 }
複制代碼           
SpringBoot應用可以同時處理多少請求?

如果要避免這樣的情況發生,就涉及到加鎖的問題了,就不在這裡讨論了。

若你想更進一步提升并發性能不妨看看下面的反應式程式設計!

反應式程式設計

1.1. 阻塞可能造成浪費

現代應用程式可以覆寫大量并發使用者,盡管現代硬體的功能不斷提高,但現代軟體的性能仍然是一個關鍵問題。

總的來說,有兩種方法可以提高程式的性能:

  • 并行化以使用更多線程和更多硬體資源。
  • 在如何使用目前資源方面尋求更高的效率。

通常,Java 開發人員使用阻塞代碼編寫程式。這種做法很好,直到出現性能瓶頸。然後是時候引入額外的線程,運作類似的阻塞代碼。但是這種資源使用率的擴充會很快引入争用和并發問題。

更糟糕的是,阻塞會浪費資源。如果您仔細觀察,一旦程式涉及一些延遲(特别是 I/O,例如資料庫請求或網絡調用),資源就會被浪費,因為線程(可能很多線程)現在處于空閑狀态,等待資料。是以并行化方法不是靈丹妙藥。有必要通路硬體的全部功能,但推理也很複雜,并且容易浪費資源。

1.2. 使用異步(Asynchronicity)

前面提到的第二種方法,尋求更高的效率,可以解決資源浪費問題。通過編寫異步、非阻塞代碼,您可以讓執行切換到另一個使用相同底層資源的活動任務,并在異步處理完成後傳回到目前程序。

但是如何在 JVM 上生成異步代碼呢?Java 提供了兩種異步程式設計模型:

  • Callbacks:異步方法沒有傳回值,但是需要一個額外的回調參數(lambda或者匿名函數),當結果可用時調用這個參數。一個著名的例子是Swing’s的EventListener層次結構。
  • Futures:異步方法立即傳回一個Future。異步程序計算T值,通過Future對象包裝對T值的通路。該值不是立即可用的,可以輪詢該對象,直到該值可用為止。例如,ExecutorService使用Future對象,運作**Callable**任務。

但是這兩種技術都有他們的局限性,回調難以組合在一起,很快就會導緻代碼難以閱讀和維護(這種情況稱為"回調地獄(Callback Hell)")

Future對象要比callbacks好一些,但是Future在組合(composition)上任然比較困難。盡管Java 8通過CompletableFuture進行了改進。編排多個Future對象是可行的,但是這并不容易。并且Future還有其他問題:

  • 調用get()方法很容易導緻Future對象出現另一種阻塞情況。
  • 不支援惰性計算。
  • 缺乏對多值和進階錯誤處理的支援。

1.3. 從指令式程式設計到反應式程式設計

反應式-函數式程式設計解決的問題就是并發和并行。更通俗地說,它解決了回調地獄問題。回調地獄是以指令式的方式來處理反應式和異步用例帶來的問題。反應式程式設計,比如RxJava實作,受到了函數式程式設計的影響,并且會使用聲明式的方式來避免反應式-指令式代碼常見的問題。

響應式庫,如Reactor,Rxjava旨在解決JVM上”經典”異步方法的這些缺點,同時關注一些額外的方面:

  • 可組合性(Composability )和可讀性(readability)
  • 資料作為一個流(flow ),使用豐富的操作符(operators)進行操作
  • 在你訂閱(subscribe)之前什麼都不會發生,延遲釋出。
  • 背壓能力(Backpressure )或消費者向生産者發出排放速度過高信号的能力
  • 與并發無關的進階但高價值的抽象(High level but high value abstraction that is concurrency-agnostic)

2. Reactor Project 如何運作

Reactor的核心是Flux /Mono類型,它代表了資料或事件的流。它的目的是實作推送(反應式),但是也可以用于拉取(互動式)。它是延遲執行的(lazy),不是立即執行的(eager)。它可以同步使用,也可以異步使用。它能夠代表随着時間推移産生的0個、1個、多個或者無窮個值或事件。

2.1. Flux 原理

當執行subscribe方法時,釋出者會回調訂閱者的onSubscribe方法,這個方法中,通常訂閱者會借助傳入的Subscription向釋出者請求n個資料。然後釋出者通過不斷調用訂閱者的onNext方法向訂閱者發出最多n個資料。如果資料全部發完,則會調用onComplete告知訂閱者流已經發完;如果有錯誤發生,則通過onError發出錯誤資料,同樣也會終止流。

SpringBoot應用可以同時處理多少請求?
  1. 首先,使用類似Flux .just的方法建立釋出者後,會建立一個具體的釋出者(Publisher),如Flux Array。
  2. 當使用.subscribe訂閱這個釋出者時,首先會new一個具有相應邏輯的****Subscription(如ArraySubscription,這個Subscription定義了如何處理下遊的request,以及如何“發出資料”);每種同類型的Subscriber都會對應一種類型的Subscription。
  3. 然後釋出者将這個Subscription通過訂閱者的.onSubscribe方法傳給訂閱者;
  4. 在訂閱者的.onSubscribe方法中,需要通過Subscription發起第一次的請求.request
  5. Subscription收到請求,就可以通過回調訂閱者的onNext方法發出元素了,有多少發多少,但不能超過請求的個數;
  6. 訂閱者在onNext中通常定義對元素的處理邏輯,處理完成之後,可以繼續發起請求;
  7. 釋出者根據繼續滿足訂閱者的請求;
  8. 直至釋出者的序列結束,通過訂閱者的onComplete予以告知;當然序列發送過程中如果有錯誤,則通過訂閱者的onError予以告知并傳遞錯誤資訊;這兩種情況都會導緻序列終止,訂閱過程結束。

2.2. 操作符原理

操作符 :隻對資料做搬運和加工,對下遊是作為釋出者(Publisher),傳遞上遊的資料到下遊;對上遊是作為訂閱者(Subscriber),傳遞下遊的請求到上遊。

SpringBoot應用可以同時處理多少請求?

3. 建立 Flux

3.1. 以簡單方法建立 Flux

3.1.1 empty()

訂閱後立即完成,不釋出任何的值。

@Test
    public void emptyTest() {
        PrintUtil.println("Before");
        Flux.empty()
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021-08-31 18:04:016 [Thread-Name-main], Before
2021-08-31 18:04:016 [Thread-Name-main], complete
2021-08-31 18:04:016 [Thread-Name-main], After
複制代碼           

3.1.2 never()

不釋出任何的通知,無論是值,還是完成或失敗。這個流适用于測試。

@Test
    public void errorTest() {
        PrintUtil.println("Before");
        Flux.error(new RuntimeException("emitter an error"))
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021-08-31 18:09:023 [Thread-Name-main], Before
2021-08-31 18:09:024 [Thread-Name-main], After
複制代碼           

3.1.3 error()

立即給每個訂閱者發送一個onError()通知。不釋出任何的值,按照契約,也不會發送onCompleted()通知

@Test
    public void neverTest() {
        PrintUtil.println("Before");
        Flux.never()
                .subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
        PrintUtil.println("After");
    }

2021-08-31 18:08:024 [Thread-Name-main], Before
2021-08-31 18:08:024 [Thread-Name-main], java.lang.RuntimeException: emitter an error
2021-08-31 18:08:024 [Thread-Name-main], After
複制代碼           

3.1.4 range

從 start 開始生成n個整型數字。例如,range(3, 3) 将會釋出3、4和5,然後正常完成。每個訂閱者會接收到一組相同的數字。

@Test
    public void rangeTest() {
        PrintUtil.println("Before");
        Flux.range(3, 3)
                .subscribe(PrintUtil::println);
        PrintUtil.println("After");
    }

2021-08-31 17:58:051 [Thread-Name-main], Before
2021-08-31 17:58:052 [Thread-Name-main], 3
2021-08-31 17:58:052 [Thread-Name-main], 4
2021-08-31 17:58:052 [Thread-Name-main], 5
2021-08-31 17:58:052 [Thread-Name-main], After
複制代碼           

print語句的順序也是值得關注的。Before 和After 消息是由main用戶端線程列印出來的,這一點倒不令人驚訝。但是,請注意訂閱也是發生在用戶端線程中的,subscribe() 實際上會阻塞用戶端線程,直到所有的事件都被接收。除非某些操作符需要,否則RxJava不會隐式地線上程池中運作代碼。

3.1.5 interval

interval() 會生成一個long類型數字的序列,從零開始,每個數字之間有固定的時間間隔。某種程度上,interval()類似于 ScheduledExecutorService 中的 scheduleAtFixedRate()。你可以想象一下 interval() 的多種使用場景,比如定期輪詢資料、重新整理使用者界面或者模組化時間的推移。

@Test
    public void intervalTest() throws InterruptedException {
        Flux.interval(Duration.ofSeconds(1))
                .map(input -> {
                    if (input < 3) return "tick " + input;
                    throw new RuntimeException("boom");
                })
                .onErrorReturn("Uh oh")
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

18:17:18.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 0
18:17:19.549 [parallel-1] INFO wangxw.operator.OperatorTest - tick 1
18:17:20.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 2
18:17:21.564 [parallel-1] INFO wangxw.operator.OperatorTest - Uh oh
複制代碼           

3.2. 以程式設計方式建立 Flux

3.2.1 Synchronous :generate on-by-one

public void testGenerate() {
        Flux<String> flux = Flux.generate(
                () -> 0, // 初始state值
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3 * state); // 産生資料是同步的,每次産生一個資料
                    if (state == 10) {
                        sink.complete();
                    }
                    return state + 1; // 改變狀态
                },
                (state) -> System.out.println("state: " + state)); // 最後狀态值
        // 訂閱時觸發requset->sink.next順序産生資料
        // 生産一個資料消費一個
        flux.subscribe(System.out::println);
    }
複制代碼           

3.2.2 Asynchronous and Multi-threaded: create

Flux .cretate() 操作符可以用于橋接監聽器模型等現有API轉為響應式流模型,支援推拉結合的模式。cretate操作符建立的BufferAsyncSink中維護了一個SpscQueue。

  • 推模式:當監聽觸發時,調用sink.next(o),将元素放入SpscQueue,随後立即取出drain(排水)給消費者。
  • **拉模式:**發生在消費者定閱時,當生産者有資料可用時将通過拉模式,拉取資料。

3.2.2.1. 示意圖

SpringBoot應用可以同時處理多少請求?
  1. 首先,使用Flux .create的方法建立釋出者後,會建立一個具體的釋出者(Flux Create)。
  2. 當使用.subscribe訂閱這個釋出者時,首先會new一個具有相應邏輯的****BufferAsyncSink。
  3. 然後釋出者将這個BufferAsyncSink通過訂閱者的.onSubscribe方法傳給訂閱者;并回調Flux Create中的Consumer,執行 sink.onRequest(requestConsumer) 将 requestConsumer 指派給Sink。
  4. 在訂閱者的.onSubscribe方法中,需要通過BufferAsyncSink發起request,執行requsetConsumer,此時是拉資料
  5. 通過異步調用sink.next(o),将資料推送到sqscQueue中,再 poll 出來是 推資料;

3.2.2.2. 代碼示例

@Test
    public void testCreate() throws InterruptedException {
        MyEventProcessor<String> myEventProcesser = new MyEventProcessor<>();
        Flux.create(emitter -> {
            myEventProcesser.register(new MyEventListener<String>() {
                @Override
                public void onDataChunk(MyEvent<String> event) {
                    emitter.next(event);
                }

                @Override
                public void processComplete() {
                    emitter.complete();
                }
            });

            emitter.onRequest(n -> { // n subscribe.requset時調用
                List<String> messages = getHistory(n);
                messages.forEach(PrintUtil::println);
            });
        }).subscribe(PrintUtil::println, PrintUtil::println); // 這時候還沒有任何事件産生;

        for (int i = 0; i < 20; i++) {  // 6
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
            myEventProcesser.newEvent(new MyEvent<>(new Date(), "Event" + i));
        }
        myEventProcesser.processComplete();
    }
複制代碼           

3.2.3. Asynchronous but single-threaded: push

push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.

根據官方的描述,push介于create和 generate 之間。異步生成序列,支援回壓。但是同一時刻隻能有一個線程調用next、compete、error。

3.2.3.2. 示意圖

SpringBoot應用可以同時處理多少請求?

push 和 create 的唯一不同在于Flux Create.CreateMode不同。 Flux .push使用的是CreateMode.PUSH_ONLY,而Flux .create使用的是Flux Create.CreateMode.PUSH_PULL。

public void subscribe(CoreSubscriber<? super T> actual) {
		BaseSink<T> sink = createSink(actual, backpressure);

		actual.onSubscribe(sink);
		try {
			source.accept(
					createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
							sink);
		}
		catch (Throwable ex) {
			Exceptions.throwIfFatal(ex);
			sink.error(Operators.onOperatorError(ex, actual.currentContext()));
		}
	}
複制代碼           

BufferAsyncSink 内部維護了一個SpscLinkedArrayQueue(單生産者單消費者)隊列,隻支援單線程的sink源,sink先将元素發射到SpscLinkedArrayQueue中,然後觸發drain,其實就是從SpscLinkedArrayQueue中poll元素并向消費者傳遞,中間增加SpscLinkedArrayQueue作為中轉。

SerializedSink 内部除了維系一個BufferAsyncSink 作委托外,還維護一個MpscLinkedQueue(多生産者單消費者)隊列,顯然支援多線程的源生産。并發生産元素的時候先将元素push到MpscLinkedQueue,再從MpscLinkedQueue彈出到SpscLinkedArrayQueue中,最終由一個線程傳遞給消費者。

public FluxSink<T> next(T t) {
			Objects.requireNonNull(t, "t is null in sink.next(t)");
			if (sink.isTerminated() || done) {
				Operators.onNextDropped(t, sink.currentContext());
				return this;
			}
            //通過WIP保證僅有一個線程将next委托給sink(BufferAsyncSink)來處理 
			if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
				try {
					sink.next(t);
				}
				catch (Throwable ex) {
					Operators.onOperatorError(sink, ex, t, sink.currentContext());
				}
				if (WIP.decrementAndGet(this) == 0) { // 
					return this;
				}
			}
			else {
                //多線程并發生産元素時,并發的其他線程直接将元素發射到mpscQueue
				this.mpscQueue.offer(t);
				if (WIP.getAndIncrement(this) != 0) {
					return this;
				}
			}
            //這裡其實又把mpsc隊列中的元素取出放到spsc中
            //簡單點可以這麼了解,多源生産模式下,元素先發射到mpsc中,單消費者取出元素從放到spsc中,中間多加了一個過渡
			drainLoop();
			return this;
		}
複制代碼           

目前沒有GET到兩者的差別,這樣做的好處到底是什麼?

3.2.4. Cleaning up after push() or create()

兩個回調函數,onDispose和onCancel,在取消或終止時執行任何清理。onDispose可用于在Flux 完成、出現錯誤或被取消時執行清理。onCancel可用于在使用onDispose進行清理之前執行任何特定于取消的操作。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
複制代碼           
  • onCancel 首先調用,僅用于取消信号。
  • onDispose 為完成、錯誤或取消信号調用。

4. 從回調 API 到 Flux 流

關于流,我最喜歡的一個例子就是Twitter的狀态更新,也就是所謂的推文(tweet)。每秒都會有數千個使用者更新,很多更新會伴随着地理位置、語言和其他中繼資料。為了完成這個練習,将會使用開源的Twitter4J庫,它使用基于回調的API将新推文的子集推送過來。實時讀取推文的最簡單的可運作樣例如下所示。

import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

import java.util.concurrent.TimeUnit;


/**
 * @Author: wangxw
 * @Date: 2021/08/31
 * @Description:
 */
@Slf4j
public class Callback2FluxTest {

    public void add() throws InterruptedException {
        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                log.info("Status: {} ", status);
            }

            @Override
            public void onException(Exception e) {
                log.error("Error callback ", e);

            }
            // 其他回調
        });

        twitterStream.sample();
        TimeUnit.SECONDS.sleep(10);
        twitterStream.shutdown();
    }   
複制代碼           

調用twitterStream.sample()将會啟動一個背景線程,該線程會登入Twitter并等待新的消息。每次有推文出現,onStatus回調就會執行。執行過程可能會跨線程,是以不能依賴異常抛出的機制,而是使用onException()通知。在休眠10秒之後,通過shutdown()關閉流并清理底層的資源,比如HTTP連接配接或線程。

整體而言,它看上去并沒有那麼糟糕,這個程式的問題在于什麼都不做。在現實生活中,你可能會以某種方式處理每條Status消息(推文),比如儲存到資料庫中或者提供給一個機器學習算法。從技術上來講,你可以将這些邏輯放到回調中,但是這樣就将基礎設施調用和業務邏輯耦合在一起了。簡單地将功能委托給一個單獨的類會更好一些,但很遺憾的是無法重用。我們真正想要的是技術領域(消費HTTP連接配接中的資料)和業務領域(解釋輸入的資料)的清晰分離。是以,我們建構了第二層回調。

void consume(Consumer<Status> onStatus, Consumer<Exception> onException) throws InterruptedException {
        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                onStatus.accept(status);
            }

            @Override
            public void onException(Exception e) {
                onException.accept(e);

            }
            // 其他回調
        });

        twitterStream.sample();
        TimeUnit.SECONDS.sleep(10);
        twitterStream.shutdown();
    }
複制代碼           

通過添加這個額外的抽象層,現在能夠以各種方式重用consume()方法。假設不再是進行日志記錄,而是要進行持久化、分析或欺詐檢測。

但是這隻将問題在層級結構中進行了提升。如果想要記錄每秒推文的數量該怎麼辦?或者隻想消費前5條資料,又該怎樣實作?如果想要有多個監聽器,又會發生什麼情況?前述每種情況都會打開一個新的HTTP連接配接。最後不得不提的是,API不允許完成後再取消訂閱,以免帶來資源洩漏的風險。我們正在努力朝着基于Rx的API的方向努力。此時,不再傳遞回調到可能要執行的地方,而是傳回一個Flux 并允許每人按需對其進行訂閱。但是,需要記住的一點是,如下的實作還是會為每個Subscriber打開一個新的網絡連接配接。

public void flux() {
        Flux<Status> flux = Flux.<Status>create(emmiter -> {
            TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    emmiter.next(status);
                }

                @Override
                public void onException(Exception e) {
                    emmiter.error(e);

                }
                // 其他回調
            });
            emmiter.onDispose(() -> twitterStream.shutdown()); // 關閉twitterStream
        }).doOnSubscribe(s->log.info("doOnSubscribe"));

        flux.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
    }
複制代碼           

以上代碼與consume(...)的巨大差别在于,不必将回調作為參數傳遞給observe()。相反,樣例可以傳回Flux ,将它到處傳遞,然後在某個地方進行存儲,并且隻要需要就可以随時随地使用。還可以将這個Flux 與其他的Flux 進行組合。還未讨論的一個方面是資源清理,有人取消訂閱時,應當關閉TwitterStream,以避免資源洩漏。

public class LazyTwitterFlux {

    private final Set<FluxSink<? super Status>> fluxSinks = new CopyOnWriteArraySet<>();

    private final Flux<Status> flux = Flux.create(emmiter -> {
        registrer(emmiter);
        emmiter.onDispose(() -> unregistrer(emmiter));
    });

    private final TwitterStream twitterStream;

    public LazyTwitterFlux() {
        this.twitterStream = TwitterStreamFactory.getSingleton();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                fluxSinks.forEach(sink -> sink.next(status));
            }

            @Override
            public void onException(Exception e) {
                fluxSinks.forEach(fluxSink -> fluxSink.error(e));

            }
            // 其他回調
        });
    }

    public Flux<Status> flux() {
        return flux;
    }

    private synchronized void registrer(FluxSink<? super Status> fluxSink) {
        fluxSinks.add(fluxSink);
        if (fluxSinks.isEmpty()) {
            twitterStream.sample();
        }
    }

    private synchronized void unregistrer(FluxSink<? super Status> fluxSink) {
        fluxSinks.remove(fluxSink);
        if (fluxSinks.isEmpty()) {
            twitterStream.shutdown();
        }
    }

}

    @Test
    public void lazy() {
        LazyTwitterFlux lazyTwitterFlux = new LazyTwitterFlux();

        Flux<Status> flux1 = lazyTwitterFlux.flux()
                .doOnSubscribe(s -> log.info("doOnSubscribe"));
        Flux<Status> flux2 = lazyTwitterFlux.flux()
                .doOnSubscribe(s -> log.info("doOnSubscribe"));

        flux1.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
        flux2.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
    }
複制代碼           

fluxSinks 的集線程安全地存儲目前已訂閱的Subscriber集合。每次新的Subscriber出現,将其添加到一個集中,并連接配接到底層的事件源上。相反,最後的Subscriber消失時,就關閉上遊的源。這裡的關鍵在于始終隻有一個到上遊系統的連接配接,而不是為每個訂閱者都建立連接配接。這個實作能夠正常運作而且比較健壯,但看起來過于低層級并且易于出錯。對subscribers的通路必須使用synchronized同步,而且集合本身必須支援安全地疊代。對register()的調用必須發生在通過 reregister() 登出回調之前,否則,後者可能會在注冊之前調用。将一個上遊源多路複用到多個Flux 的通用場景,肯定有更好的方式來實作。幸而,至少有兩種這樣的機制。Rx緻力于減少這樣危險的樣闆代碼并抽象出并發性。6.4 節将使用 ConnectableFlux 的refCount() 實作單次訂閱。

5. Hot 和 Cold 類型的 Flux

在得到一個 Flux 執行個體之後,很重要的一點是要了解它是 hot 類型的還是 cold 類型的。它們的API和語義是相同的,但是使用Flux 的方法取決于它的類型。

5.1. Cold 類型

cold類型的Flux 完全是延遲(lazy)執行的,并且在有人對其感興趣時才會開始釋出事件。如果沒有觀察者,那麼Flux 隻是一個靜态的資料結構。這也意味着,每個訂閱者都會接收到屬于自己的流的副本,因為事件是延遲生成的,并且一般不會采取任何形式的緩存。cold類型的Flux 通常來源于Flux .create(),按照語義,它不會啟用任何邏輯,而是推遲到有人實際對其監聽才會執行。從某種程度上來說,cold 類型的 Flux 依賴Subscriber。cold類型的 Flux 的樣例除了create()之外,還包括Flux .just()、from()和range()。訂閱一個cold類型的Flux 通常還涉及create()中的副作用,比如查詢資料庫或打開連接配接。

5.2. Hot 類型

hot類型的Flux 則與之不同。在得到這種類型的Flux 的時候,不管是否有Subscriber,它都可能已經開始釋出事件了。即便沒有人監聽,事件可能會丢失,Flux 依然會往下遊推送事件。通常情況下,可以完全控制cold類型的Flux ,但是 hot類型的Flux 是獨立于消費者的。Subscriber出現時,hot類型的Flux 的行為類似于電話竊聽(wire tap),透明地釋出流經它的事件。Subscriber的出現和消失并不會改變Flux 的行為,它是完全解耦和獨立的。

hot類型的Flux 通常發生在完全無法控制事件源的場景下。這種Flux 的樣例包括滑鼠移動、鍵盤輸入或按鈕點選。

依賴事件傳遞時,hot類型和cold類型Flux 的差異就變得非常重要了。不管立即訂閱還是幾個小時之後訂閱cold類型的Flux ,你都會獲得完整且一緻的事件集。但如果Flux 是hot類型的,那麼你就無法確定能接收到所有事件。稍後将介紹一些技術,它們能夠確定每個訂閱者都能接收到所有事件,例如cache()操作符在技術上來講,可以緩沖來自hot類型Flux 的所有事件,讓後續的訂閱者都能接收到相同的事件序列。但是,在理論上,它消耗的記憶體量是沒有限制的,是以在緩存hot類型的Flux 時要非常小心。

6. ConnectableFlux

有時,我們可能不希望僅将某些處理延遲到某一個訂閱者的訂閱時間,而是希望其中幾個訂閱者會合,然後再觸發訂閱和資料生成(這有點類似于CountDownLantch)。

ConnectableFlux 就是為此而生,Flux API 中有兩種常用的傳回ConnectableFlux 的方式:publish 和replay。

  1. publish會嘗試滿足各個不同訂閱者的需求(也就是回壓),并綜合這些請求回報給源。假設有某個訂閱者的需求為 0,釋出者會暫停向所有訂閱者發出元素。
  2. replay将對第一個訂閱後産生的資料進行緩存,最多緩存數量取決于配置(時間/緩存大小)。 它會對後續接入的訂閱者重新發送資料。

ConnectableFlux 提供了多種對訂閱的管理方式。包括:

  • connect() 當有足夠的訂閱接入後,可以對 Flux 手動執行一次。它會觸發對上遊源的訂閱。
  • autoConnect(n) 與 connect() 類似,不過是在有 n 個訂閱的時候自動觸發。
  • refCount(n) 不僅能夠在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動作。如果訂閱者全部取消訂閱,則會将源“斷開連接配接”,再有新的訂閱者接入的時候才會繼續“連上”釋出者。
  • refCount(int, Duration) 增加了一個倒計時:一旦訂閱者數量太低了,它會等待 Duration參數指定的時間,如果沒有新的訂閱者接入才會與源斷開連接配接。

6.1. connect 例子

@Test
    public void connectTest() throws InterruptedException {
        Flux<String> source = Flux.range(1, 3)
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("subscribed to source"));

        ConnectableFlux<String> co = source.publish();

        co.subscribe(log::info);
        co.subscribe(log::info);

        log.info("done subscribing");
        TimeUnit.SECONDS.sleep(1);

        log.info("will now connect");
        co.connect();
    }

16:06:03.494 [main] INFO wangxw.flux.FluxTest - done subscribing
16:06:04.496 [main] INFO wangxw.flux.FluxTest - will now connect
16:06:04.498 [main] INFO wangxw.flux.FluxTest - subscribed to source
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
複制代碼           

隻有當connect() 之後上遊才會發出資料。

6.2. autoConnect(n) 例子

@Test
    public void autoConnect() throws InterruptedException {
        Flux<String> source = Flux.range(1, 3)
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("subscribed to source"));

        Flux<String> co = source.publish().autoConnect(2);

        log.info("subscribed first");
        co.subscribe(log::info);

        TimeUnit.SECONDS.sleep(1);

        log.info("subscribing second");
        co.subscribe(log::info);
    }

17:18:09.468 [main] INFO wangxw.flux.FluxTest - subscribed first
17:18:10.475 [main] INFO wangxw.flux.FluxTest - subscribing second
17:18:10.486 [main] INFO wangxw.flux.FluxTest - subscribed to source
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
複制代碼           

當兩個訂閱者都完成訂閱之後,上遊才收到訂閱請求,并開始發出資料。

6.3. refCount() 例子

@Test
    public void refCountTest() throws InterruptedException {
        Flux<String> source = Flux.interval(Duration.ofMillis(500))
                .map(Object::toString)
                .doOnSubscribe(s -> log.info("doOnSubscribe"))
                .doOnCancel(() -> log.info("doOnCancel"));

        Flux<String> flux = source.publish().refCount(2, Duration.ofSeconds(2));

        log.info("subscribed first");
        Disposable s1 = flux.subscribe(x -> log.info("s1:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed second");
        Disposable s2 = flux.subscribe(x -> log.info("s2:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed first disposable");
        s1.dispose();

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed second disposable"); // 所有的訂閱者都取消了
        s2.dispose();

        TimeUnit.SECONDS.sleep(1); // 在2s内 s3進行了訂閱
        log.info("subscribed third");
        Disposable s3 = flux.subscribe(x -> log.info("s3:" + x));

        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed third disposable");
        s3.dispose(); // 所有訂閱者都取消了 disconnect

        TimeUnit.SECONDS.sleep(3); 
        log.info("subscribed fourth"); // 3s 後(超過了2s)s4、s5訂閱,觸發connect
        Disposable sub4 = flux.subscribe(l -> log.info("s4: " + l));
        TimeUnit.SECONDS.sleep(1);
        log.info("subscribed  fifth");
        Disposable sub5 = flux.subscribe(l -> log.info("s5: " + l));
        TimeUnit.SECONDS.sleep(2);
    }

17:29:23.044 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first
17:29:24.052 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second
17:29:24.067 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:0
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:0
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:1
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:1
17:29:25.076 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first disposable
17:29:25.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:2
17:29:26.094 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:3
17:29:26.094 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second disposable
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:4
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:5
17:29:27.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:6
17:29:28.075 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:7
17:29:28.102 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third disposable
17:29:30.103 [parallel-3] INFO wangxw.flux.ConnectableFluxTest - doOnCancel // 注意時間 2s 後執行了 doOnCancel
17:29:31.103 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fourth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed  fifth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 0
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 0
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 1
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 1
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 2
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 2
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 3
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 3
複制代碼           

本例中,refCount() 設定為最少兩個訂閱者接入是才開始發出資料,當所有訂閱者都取消時,如果不能在兩秒内接入新的訂閱者,則上遊會斷開連接配接。

上邊的例子中,随着前兩個訂閱者相繼取消訂閱,第三個訂閱者及時(在2秒内)開始訂閱,是以上遊會繼續發出資料,而且根據輸出可以看出是“hot flux”。

當第三個訂閱者取消後,第四個訂閱者沒能及時開始訂閱,是以上遊釋出者斷開連接配接。當第五個訂閱者訂閱之後,第四和第五個訂閱者相當于開始了新一輪的訂閱。

6.4. 使用refCount()實作單次訂閱

ConnectableFlux 以一種有意思的方式來協調多個Subscriber,并共享一個底層的訂閱。還記得最初借助LazyTwitterFlux 建立單個延遲執行的對底層資源的連接配接嗎?必須要手動跟蹤所有的Subscriber,如果第一個訂閱者出現或最後一個訂閱者離開,就建立連接配接或斷開連接配接。ConnectableFlux 是Flux 的一個特殊類型,能夠確定底層始終最多隻有一個Subscriber,但實際上它又允許多個Subscriber共享相同的底層資源。

Subject是建立Flux的必要方式,而ConnectableFlux會保護原始的上遊Flux,并確定最多隻能有一個Subscriber可以接觸到它。不管有多少Subscriber連接配接到ConnectableFlux,系統隻會打開一個Flux的訂閱,這個訂閱是基于該Flux建立的。

@Test
    public void refCounted() {
        Flux<Status> flux = Flux.<Status>create(emmiter -> {
            log.info("Establishing connection");
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    emmiter.next(status);
                }

                @Override
                public void onException(Exception e) {
                    emmiter.error(e);

                }
                
                // 其他回調
            });
            emmiter.onDispose(() -> {
                log.info("Disconnecting");
                twitterStream.shutdown();
            });
        }).doOnSubscribe(s -> log.info("doOnSubscribe"))
                .doOnComplete(() -> log.info("doOnComplete"));

        Flux<Status> refCounted = flux.publish().refCount();


        Disposable s1 = refCounted.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));
        Disposable s2 = refCounted.subscribe(status -> log.info("Status: {} ", status),
                ex -> log.error("Error callback ", ex));

        s1.dispose();
        s2.dispose();
    }

17:51:21.610 [main] INFO wangxw.flux.Callback2FluxTest - doOnSubscribe
17:51:21.614 [main] INFO wangxw.flux.Callback2FluxTest - Establishing connection
17:51:21.616 [main] INFO wangxw.flux.Callback2FluxTest - Disconnecting
複制代碼           

直到真正有第一個Subscriber的時候,連接配接才會建立。但是,更重要的在于,第二個Subscriber不會初始化新的連接配接,它甚至不會接觸到原始的Flux。publish(). refCount()會将底層的Flux 串聯包裝起來,并攔截所有的訂閱。

操作符

defer

延遲建立

@Test
    public void deferTest() throws InterruptedException {
        Flux<String> flux1 = Flux.just(PrintUtil.println(new Date()));

        Flux<String> flux2 = Flux.defer(() -> Flux.just(PrintUtil.println(new Date())));

        flux1.subscribe(x -> log.info("s1: " + x));
        flux2.subscribe(x -> log.info("s2: " + x));

        TimeUnit.SECONDS.sleep(3);

        flux1.subscribe(x -> log.info("s3: " + x));
        flux2.subscribe(x -> log.info("s4: " + x));
    }

15:25:24.629 [main] INFO wangxw.operator.OperatorTest - s1: 2021-09-06 15:25:024
15:25:24.630 [main] INFO wangxw.operator.OperatorTest - s2: 2021-09-06 15:25:024
15:25:27.633 [main] INFO wangxw.operator.OperatorTest - s3: 2021-09-06 15:25:024
15:25:27.634 [main] INFO wangxw.operator.OperatorTest - s4: 2021-09-06 15:25:027
複制代碼           

delayElements

@Test
    public void delayTest() throws InterruptedException {
        Flux.just("1", "2").delayElements(Duration.ofSeconds(2))
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

16:02:40.482 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:02:42.483 [parallel-2] INFO wangxw.operator.OperatorTest - 2
複制代碼           

延遲釋出,在運作這個程式的時候,即便進行了訂閱,應用程式也會立即終止而不展現任何的結果,這是因為事件的釋出是在背景異步運作的,是以需要在最後添加任意一個sleep()。

map

@Test
    public void mapTest() throws InterruptedException {
        Flux.just(1, 2, 3, 4)
                .map(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2 + "";
                })
                .log()
                .subscribe(log::info);
    }

16:07:24.005 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
16:07:24.007 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
16:07:25.007 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(2)
16:07:25.008 [main] INFO wangxw.operator.OperatorTest - 2
16:07:26.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(4)
16:07:26.009 [main] INFO wangxw.operator.OperatorTest - 4
16:07:27.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(6)
16:07:27.009 [main] INFO wangxw.operator.OperatorTest - 6
16:07:28.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(8)
16:07:28.009 [main] INFO wangxw.operator.OperatorTest - 8
16:07:28.012 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()
複制代碼           

map 是同步執行的,會阻塞用戶端線程。

flatMap

flatMap()是Rx中最重要的操作符之一。乍看上去,它類似于map(),但是它對每個元素的轉換都會傳回另外一個(内嵌的)Flux 。鑒于Flux 可以代表另外一個異步操作,我們很快就意識到flatMap()可以為上遊的每個事件執行異步計算(fork執行)并将結果加入進來。

flatMap 本身并不是異步的,但是内嵌的Flux可以執行異步操作。

@Test
    public void flatMapTest() throws InterruptedException {
        Function<Integer, Publisher<String>> mapper = i -> Flux.just(i * 2 + "").delayElements(Duration.ofSeconds(1));

        Flux.just(1, 2, 3, 4)
                .flatMap(mapper)
                .subscribe(log::info);
        
        TimeUnit.SECONDS.sleep(10);
    }

17:23:42.566 [parallel-2] INFO wangxw.operator.OperatorTest - 4
17:23:42.566 [parallel-1] INFO wangxw.operator.OperatorTest - 2
17:23:42.568 [parallel-3] INFO wangxw.operator.OperatorTest - 6
17:23:42.572 [parallel-4] INFO wangxw.operator.OperatorTest - 8
複制代碼           

從本質上來講,flatMap()接收一個随時間(事件)出現的值的主(master)序列(Flux ),然後将每個事件分别替換為獨立的子序列。這些子序列彼此之間是不相關的,并且與生成它們的主序列中的事件也是不相關的。更确切地說,此時擁有的不再是單個主序列,而是一組Flux ,其中每個都是獨立運作的,并且随着時間的推移出現和消失。是以,flatMap()并不能對子事件抵達下遊操作符/訂閱者的順序給出任何的保證。

@Test
    public void flatMap3Test() throws InterruptedException {
        Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
                .flatMap(this::loadRecordFor)
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

    private Flux<String> loadRecordFor(DayOfWeek dow) {
        switch (dow) {
            case SUNDAY:
                return Flux.interval(Duration.ofMillis(90))
                        .take(5)
                        .map(i -> "Sun" + i);
            case MONDAY:
                return Flux.interval(Duration.ofMillis(65))
                        .take(5)
                        .map(i -> "Mon" + i);
            default:
                return Flux.empty();
        }
    }

17:31:38.893 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:31:38.918 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:31:38.957 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:31:39.007 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:31:39.023 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:31:39.087 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:31:39.096 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:31:39.152 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
17:31:39.188 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:31:39.277 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
複制代碼           

控制flatMap 的并發性

假設你有大量使用者的一個清單,它們被包裝在Flux 中。每個User有一個loadProfile()方法,該方法會通過HTTP請求傳回一個Flux 執行個體。我們的目标是盡快擷取所有使用者概況(profile), flatMap()就是為了實作該目标而設計的,可以對上遊的值進行并發計算,如下所示:

@Test
    public void flatMap4Test() {
        List<User> users = new ArrayList<>();
        Flux.fromIterable(users)
                .flatMap(User::loadProfile);

    }

    static class User {

        public Flux<Profile> loadProfile() {
            // 發送HTTP請求 異步執行
            return Flux.empty();
        }
    }

    static class Profile {

    }
複制代碼           

乍看上去這種方式非常不錯。Flux是從一個使用from()操作符的固定List生成的。是以,訂閱它的時候,會将所有的使用者立即釋放出來。對于每個新User,flatMap()都會調用loadProfile()并傳回Flux 。然後,flatMap()透明地訂閱每個新的Flux,将所有的Profile事件轉發至下遊。訂閱内部Flux 就相當于發起新的HTTP連接配接。是以,假設我們有10000個使用者,那就會突然發起10000個并發的HTTP請求。如果所有的這些請求都通路相同的伺服器,預計得到的情況無外乎如下幾種:

  • 拒絕連接配接。
  • 長時間等待和逾時。
  • 伺服器停機。
  • 遇到限速或者被加入黑名單。
  • 整體的延遲增加。
  • 用戶端的問題,包括太多打開(open)狀态的Socket、線程,以及過多的記憶體消耗。

增加并發會在一定的程度上得到回報,但如果你嘗試運作太多并發操作,最終将會導緻大量的上下文切換、過高的記憶體和CPU占用,以及整體性能的下降。

Flux.fromIterable(users)
               .flatMap(User::loadProfile, 10);
複制代碼           

flatMap()有一個非常簡單的重載形式,能夠限制内部流的并發訂閱總數。參數concurrency 限制了内部Flux 的訂閱數量。在實踐中,flatMap()接收前10個User時,它會為每個User調用loadProfile(),但是來自上遊的第11個User出現時,flatMap()不會再調用loadProfile()。相反,它會等正在運作的内部流完成。是以,concurrency 參數限制了flatMap()生成的背景任務的數量。

concatMap(f)在語義上是與flatMap(f, 1)(也就是concurrency 值為1的flatMap())等價的。

利用flatMap計算笛卡爾積

根據兩個流中的所有值生成笛卡兒積。例如,可能有兩個Flux ,一個代表棋盤的行(rank,從1到8),另一個代表棋盤的列(file,從a到h)。應該能夠找到棋盤上所有64個可能的方格。Flux 将會精确地釋出64個事件:針對a它會生成a1、a2...a8,然後是b1、b2等,直到最後達到h7和h8。這是flatMap()另一個非常有意思的樣例,每列(file)都會生成該列對應的所有可能的方格。

@Test
    public void cartesianTest() {
        Flux<Integer> oneToEight = Flux.range(1, 8);
        Flux<String> ranks = oneToEight.map(Objects::toString);
        Flux<String> files = oneToEight.map(x -> 'a' + x - 1)
                .map(ascii -> (char) ascii.intValue())
                .map(ch -> Character.toString(ch));

        Flux<String> squares = files
                .flatMap(file -> ranks.map(rank -> file + rank));

        squares.subscribe(log::info);
    }
複制代碼           

concatMap

concatMap() 可以保持下遊事件的順序,使其與上遊事件的順序完全契合。

@Test
    public void flatMap3Test() throws InterruptedException {
        Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
                .concatMap(this::loadRecordFor)
                .subscribe(log::info);

        TimeUnit.SECONDS.sleep(5);
    }

    private Flux<String> loadRecordFor(DayOfWeek dow) {
        switch (dow) {
            case SUNDAY:
                return Flux.interval(Duration.ofMillis(90))
                        .take(5)
                        .map(i -> "Sun" + i);
            case MONDAY:
                return Flux.interval(Duration.ofMillis(65))
                        .take(5)
                        .map(i -> "Mon" + i);
            default:
                return Flux.empty();
        }
    }

17:27:15.161 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:27:15.250 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:27:15.340 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:27:15.432 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:27:15.520 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
17:27:15.587 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:27:15.652 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:27:15.716 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:27:15.783 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:27:15.848 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
複制代碼           

第一個事件(Sunday)從上遊出現的時候,concatMap()會訂閱loadRecordsFor()産生的Flux ,并将産生的所有事件傳遞到下遊。這個内部流完成時,concatMap()會等待下一個上遊事件(Monday)并重複以上過程。concatMap()不會涉及任何的并發性,但是它保證了上遊事件的順序,避免出現重疊。

flatMap()内部使用了merge()操作符,同時訂閱所有的子Flux ,對它們不做任何的區分。這也是下遊事件互相交叉的原因。但是,concatMap()可以在技術上使用concat()操作符。concat()隻會先訂閱第一個底層的Flux ,隻有第一個完成之後,才會訂閱第二個。

merge

按照順序合并流

SpringBoot應用可以同時處理多少請求?
@Test
    public void mergeTest() throws InterruptedException {
        Flux<String> flux1 = Flux.interval(Duration.ofMillis(300)).map(x -> "p1: " + x);
        Flux<String> flux2 = Flux.interval(Duration.ofMillis(500)).map(x -> "p2: " + x);
        Flux<String> mergeFlux = Flux.merge(flux1, flux2);

        mergeFlux.subscribe(log::info);

        TimeUnit.SECONDS.sleep(2);
    }

19:03:12.339 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 0
19:03:12.541 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 0
19:03:12.641 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 1
19:03:12.939 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 2
19:03:13.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 1
19:03:13.240 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 3
19:03:13.540 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 4
19:03:13.542 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 2
19:03:13.841 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 5
19:03:14.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 3
複制代碼           

需要注意,任何底層Flux 出現的錯誤都會立即傳遞到Observer。可以使用merge()的mergeDelayError()變種形式來推遲錯誤,這樣直到其他的流都完成,錯誤通知才會釋出。mergeDelayError()甚至能夠保證收集所有的異常,而不僅僅是第一個,并将它們封裝到rx.exceptions.CompositeException。

zip

壓縮(zipping)指的是将兩個(或更多)流組合起來的操作,在這個過程中,某個流中的每個事件必須要與其他流對應的事件進行成對組合。下遊事件是通過組合每個流中的第一個事件,然後再組合第二個事件,以此類推生成的。

SpringBoot應用可以同時處理多少請求?

concat

按順序訂閱,等待第一個流完成,再依次訂閱下一個。

SpringBoot應用可以同時處理多少請求?
@Test
    public void concatTest () throws InterruptedException {
        Flux<String> flux1 = Flux.just("1","2","3").delayElements(Duration.ofSeconds(1));
        Flux<String> flux2 = Flux.just("4","5","6");

        Flux<String> flux = Flux.concat(flux1, flux2);

        flux.subscribe(log::info);

        TimeUnit.SECONDS.sleep(3);
    }

16:43:21.315 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:43:22.317 [parallel-2] INFO wangxw.operator.OperatorTest - 2
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 3
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 4
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 5
16:43:23.319 [parallel-3] INFO wangxw.operator.OperatorTest - 6
複制代碼           

錯誤處理操作符

在 try-catch 塊中處理異常最常用的幾種方法:

  1. 捕獲并傳回一個靜态預設值。
  2. 捕獲并動态計算回退值。
  3. 捕獲并執行一個回退方法。
  4. 捕獲,包裝為一個BusinessException,然後重新抛出。
  5. 捕獲,記錄一個特定的錯誤日志,然後重新抛出。
  6. 使用 finally 塊清理資源或 Java 7 “try-with-resource” 構造。

靜态回退值(Static Fallback Value)

在 Reactor 中與“捕獲并傳回靜态預設值”的等價的是onErrorReturn. 以下示例顯示了如何使用它:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

public String doSomethingDangerous(int i) {
    if (i == 10) {
        throw new BusinessException();
     }
     return i + "";
}
複制代碼           

在 Reactor 中等價的是:

Flux<String> flux = Flux.just(10)
      	.map(this::doSomethingDangerous)
      	.onErrorReturn("RECOVERED");
flux.subscribe(log::info, e -> log.error("error", e));

00:07:57.556 [main] INFO wangxw.operator.ErrorHandleOperatorTest - RECOVERED
複制代碼           

還可以執行選擇執對異常執行一個Predicate來決定是否恢複,如下面的示例:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); 
複制代碼           

僅當異常消息為“boom10”時,執行回退。

回退方法(Fallback Method)

在Reactor中與“捕獲并執行一個回退方法”等價的是 onErrorResume ,例如從外部但不可靠的服務中擷取資料,當外部服務異常時則從緩存中擷取資料作為回退值,示例如下:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}
複制代碼           

在 Reactor 中等價的是:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) // 每個鍵都調用外部服務
        .onErrorResume(e -> getFromCache(k)) // 外部服務執行失敗執行回退
    );
複制代碼           

和 onErrorReturn 一樣,onErrorResume 也有一些變體,可以讓你根據異常的類型或Predicate來篩選傳回的異常。它接受一個Function的事實并且允許根據遇到的錯誤選擇切換到不同的回退方法。下面的例子展示了如何做到這一點:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { // 發生錯誤是,動态選擇如何繼續
            if (error instanceof TimeoutException) 
                return getFromCache(k); // 如果逾時,則從通路本地緩存
            else if (error instanceof UnknownKeyException)  
                return registerNewEntry(k, "DEFAULT"); // 如果 key 不存在則建立一個新的條目
            else
                return Flux.error(error); // 在其他情況下重新抛
        })
    );
複制代碼           

動态回退值(Dynamic Fallback Value)

指令式示例如下:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}
複制代碼           

Reactor 代碼如下:

erroringFlux.onErrorResume(error -> Mono.just( 
        MyWrapper.fromError(error) 
));
複制代碼           

捕獲并重新抛出(Catch and Rethrow)

指令式示例如下:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}
複制代碼           

在“fallback method”示例中,flatMap中的最後一行提示我們如何以反應方式實作相同的目标,如下所示:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );
複制代碼           

但是,有一種更直接的方法,即可以通過onErrorMap實作相同的效果:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
複制代碼           

記錄日志(Log or React on the Side)

對于希望錯誤繼續傳播但在不修改序列的情況下對其作出反應(例如,記錄錯誤)的情況,可以使用doError操作符。這相當于“捕獲,記錄一個特定的錯誤日志,然後重新抛出”模式,如下例所示:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}
複制代碼           

doOnError運算符以及所有以doOn為字首的操作符具有一定的“副作用(side-effect)”。它們允許在不修改序列的情況下檢視序列的事件。

與前面顯示的指令式示例一樣,以下示例仍然傳播錯誤,但確定我們至少記錄了外部服務發生故障:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) 
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); // 記錄日志
        })
        
    );
複制代碼           

使用 Resources 和 Finally 塊 (Using Resources and the Finally Block)

  • 使用 finally 塊清理資源
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
複制代碼           
  • 使用 try-with-resource 文法
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}
複制代碼           

兩者都有各自的 Reactor 等價文法:doFinally 和 using。

doFinally 是當序列終止(使用 onComplete 或 onError )或取消時希望執行的副作用。它給你一個提示,什麼樣的終止觸發了副作用。以下示例顯示了如何使用 doFinally:

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1); 
複制代碼           

重試 (Retrying)

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() 
    .subscribe(System.out::println, System.err::println);

[259,tick 0]
[251,tick 1]
[250,tick 2]
[504,tick 0]
[250,tick 1]
[250,tick 2]
java.lang.RuntimeException: boom
複制代碼           

從前面的例子中可以看出,retry(1)隻是重新訂閱了interval 一次,從 0 重新開始計時。第二次,由于異常仍然發生,它放棄重試并向下遊傳播錯誤。

還有一個更進階的版本的重試 retryWhen,它使用一個“伴生(companion) ”的Flux來判斷某個特定的失敗是否應該重試。這個伴生的Flux是由操作符建立的,但由使用者進行裝飾,以便自定義重試條件。

伴生的 Flux 是一個 Flux,它傳遞了重試的政策/函數,是retryWhen唯一的參數。使用者定義該函數并使其傳回一個新的Publisher<?>。Retry類是一個抽象類,我們可以使用**Retry.from(Function)**來生成伴生的Flux。

retryWhen重試周期

  1. 每次發生錯誤(有可能會重試)時,一個 RetrySignal 就會被被釋出到伴生的Flux中,該Flux已經由您的函數修飾。
  2. 如果伴生的Flux 發出一個值,就會發生重試。
  3. 如果伴生的Flux 完成(completes)那麼錯誤會被吞掉,重試周期停止,結果序列也将完成。
  4. 如果伴生的Flux 産生錯誤(error),重試周期停止,結果序列也将産生錯誤(error)。

使用 retryWhen 模拟 retry(3) 的方法,伴生的Flux将吞掉錯誤,這兩種情況之間的差別很重要。

Flux.<String>error( new RuntimeException("boom"))
        .doOnError(e -> System.err.println("on error"))
        .retryWhen(Retry.from(companion ->
                 companion.take(3)))
        .subscribe(System.out::println, System.err::println);

on error
on error
on error
on error
複制代碼           

實際上,上面的這個示例将會産生一個空的Flux,但是它成功的完成了。而retry(3)将會以一個錯誤終止,是以他們的結果并不完全相同。

Flux.<String>error( new RuntimeException("boom"))
        .doOnError(e -> System.err.println("on error"))
        .retry(3)
        .subscribe(System.out::println, System.err::println);
on error
on error
on error
on error
java.lang.RuntimeException: boom
複制代碼           

指數退避重試

将反應式程式設計應用于已有的程式

從阻塞式到反應式

public class PersonDao {
    
    /**
     * 阻塞式的
     * @return
     */
    public List<Person> listPeople() {
        return query("select * from people");
    }

    /**
     * 反應式的
     * @return
     */
    public Flux<Person> rxListPeople() {
        return Flux.fromIterable(query("select * from people"));
    }

    private List<Person> query(String sql) {
        List<Person> people = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            Person person = new Person();
            person.setId(i);
            people.add(person);
        }
        return people;
    }
}
複制代碼           

我們對已有的阻塞式 API 變更到了 反應式 API 。根據系統規模的不同,和原有系統的相容性可能是一個大問題。接下來我們通過 buffer 和 **blockLast ** 将反應式代碼和阻塞式代碼組合起來。

@Test
    public void listPeopleTest() {
        // 沒有任何副作用
        Flux<Person> peopleFlux = personDao.rxListPeople();
        Flux<List<Person>> listFlux = peopleFlux.buffer()
                .log();
        List<Person> people = listFlux.blockLast(Duration.ofSeconds(3));
        assert people != null;
        people.forEach(person -> log.info(person.toString()));
    }

11:21:50.598 [main] INFO reactor.Flux.Buffer.1 - onSubscribe(FluxBuffer.BufferExactSubscriber)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - request(unbounded)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - onNext([Person(id=0), Person(id=1), Person(id=2)])
11:21:50.603 [main] INFO reactor.Flux.Buffer.1 - cancel()
11:21:50.608 [main] INFO reactor.Flux.Buffer.1 - onComplete()
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=0)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=1)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=2)
複制代碼           

blockLast 阻塞等待onComplete 回調完成,你可能認為以上代碼隻是封裝和拆封 Flux,而沒有特别明确的目的。但是,這隻是第一步。下一個轉換将會引入一些延遲執行的功能。僅僅存在 Flux 并不意味着會有背景任務或副作用,這與Future不同,Future幾乎總是意味着有某些并發操作在執行(隻有任務送出了才能傳回一個Future)。

擁抱延遲執行

public Flux<Person> rxListPeople() {
        return Flux.defer(() ->
                Flux.fromIterable(query("select * from people")));
    }
複制代碼           

在訂閱之前什麼都不會發生。

從指令式并發到聲明式并發

在企業級應用程式中,顯式的并發并不常見。大多數情況下,每個請求都會由單個線程處理。同一個線程要完成如下工作。

  1. 接收 TCP/IP 連接配接
  2. 解析 HTTP 請求
  3. 調用 Controller 或 Servlet
  4. 阻塞對資料庫的調用
  5. 處理結果
  6. 編碼響應(如 JSON 格式)
  7. 将響應發送至用戶端

如果後端要發起多個獨立的請求,比如通路資料庫,那麼這種分層的模型會影響使用者的延遲,因為它們是序列化執行的(當然可以很容易的并發執行)。除此之外,擴充性也會受到影響。例如,在Tomcat的執行器(executor)中,預設有200個負責處理請求的線程,這意味着處理的并發連接配接不能超過200個。如果流量突然暴增,傳入的連接配接将會排隊,伺服器就會出現更高的延遲。但是,這種情況不會持續下去,Tomcat最終會開始拒絕傳入的流量。

**傳統的架構,在一個線程中執行請求處理的各個步驟也有一些益處,比如能夠提升緩存的本地化以及減少同步的損耗。**令人遺憾的是,**在典型的應用程式中,因為整體的延遲是每層延遲的總和,是以一個有故障的元件可能會對整體的延遲産生負面影響。**此外,有時許多步驟是互相獨立的,可以并發執行。例如,調用多個外部API或執行多個獨立的SQL查詢。例如,以下是一個沒有任何并發功能的程式。

@Slf4j
public class TicketService {
    /**
     * 查詢航班
     *
     * @param flightNo
     * @return
     */
    public Flight lookupFlight(String flightNo) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Flight(flightNo);
    }

    /**
     * 查詢乘客
     *
     * @param id
     * @return
     */
    public Passenger findPassenger(Long id) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Passenger(id);
    }

    /**
     * 根據航班和乘客訂票
     *
     * @param flight
     * @param passenger
     * @return
     */
    public Ticket bookTicket(Flight flight, Passenger passenger) {
        return new Ticket(flight, passenger);
    }

    /**
     * 發送郵件
     *
     * @param ticket
     * @return
     */
    public boolean sendEmail(Ticket ticket) throws IOException {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("send email {}", ticket);
        return true;
    }
}
複制代碼           

用戶端代碼如下所示。

@Test
    public void blockBookTicket() throws IOException {
        Flight flight = ticketService.lookupFlight("LOT 783");
        Passenger passenger = ticketService.findPassenger(1L);
        Ticket ticket = ticketService.bookTicket(flight, passenger);
        ticketService.sendEmail(ticket);
    }
複制代碼           

這是非常典型的阻塞式代碼,與衆多應用程式中的代碼都很類似。但是,如果從延遲的角度來看,上述的代碼片段可以分為4個步驟。前兩個步驟互相獨立,隻有第三個步驟(bookTicket())需要 lookupFlight() 和findPassenger() 的傳回值。這裡顯然有機會利用并發的優勢,但是,開發人員很少采用這種方式,因為這需要比較複雜的線程池、Future以及回調。但是,如果API已經相容Rx了,你可以簡單地将遺留的阻塞式代碼包裝到 Flux 中, 如下所示。

public Mono<Flight> rxLookupFlight(String flightNo) {
        return Mono.defer(() ->
                Mono.just(lookupFlight(flightNo)));
    }

    public Mono<Passenger> rxFindPassenger(Long id) {
        return Mono.defer(() ->
                Mono.just(findPassenger(id)));
    }
複制代碼           

從語義上講,rx-方法其實以相同的方式完成了相同的任務,換言之,它們預設都是阻塞的。從用戶端的角度看,除了API更加冗長之外,我們其實沒有得到任何好處。

@Test
    public void rxBookTicket() throws IOException {
        // 隻是占位,不産生任何的副作用
        Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783");
        Mono<Passenger> passenger = ticketService.rxFindPassenger(1L);
        Mono<Ticket> ticket = flight.zipWith(passenger, 
                (f, p) -> ticketService.bookTicket(f, p));
        ticket.subscribe(ticketService::sendEmail);

    }
複制代碼           

無論是傳統的阻塞程式,還是使用 Rx 的程式,它們的運作方式完全相同。不過上述代碼的執行方式是延遲執行的,我們得到了兩個 Flight 和 Passenger 類型的兩個占位符,但是還沒有産生任何的副作用。此時,并沒有執行任何的資料庫查詢或Web服務調用。

在上述代碼中你需要關注 subscribe() 位于何處。不過在通常情況下,你的業務邏輯隻是一直組合Flux,并将它們傳回給某個架構或者腳手架層。實際的訂閱是由Web架構或某些膠水代碼在幕後完成的。自行調用 subscribe() 也算不上糟糕的實踐,但是将訂閱推遲得越遠越好。

為了了解執行的流程,從下往上觀察是一種非常有幫助的方法。我們訂閱了ticket,是以Rx必須透明地訂閱flight和passenger。此時,真正的業務邏輯才會執行。因為兩個Flux 都是cold類型的,并且沒有涉及并發,是以對flight的訂閱會在調用線程中觸發lookupFlight()阻塞方法。當lookupFlight()完成的時候,RxJava就可以訂閱passenger了。此時,它已經通過同步的flight接收到Flight執行個體。rxFindPassenger()會以阻塞的方式調用findPassenger()并接收一個Passenger。經過這個連接配接點之後,資料會往下遊流動。Flight和Passenger執行個體通過提供的lambda表達式(bookTicket)被結合起來,傳遞給ticket.subscribe()。

這裡看上去有不少工作要做,并且運作方式本質上和開始的阻塞式代碼并沒有差別。但是,現在我們不需要修改任何邏輯就能聲明式地應用并發了。

如果業務方法傳回Future(或者CompletableFuture,沒有本質差別),其實系統已經為我們做出了兩個決策。

  • 底層對 lookupFlight() 的調用已經開始,這裡沒有任何延遲執行的空間。我們不會在這個方法上阻塞,但是工作已經啟動。
  • 我們對并發沒有任何控制權。方法的具體實作決定了Future任務是線上程池調用,還是為每個請求建立一個新的線程。

Reactor 給了使用者更多的控制權。實際上Flux 一般都已經是異步的了,但是在個别情況下,還是需要為已有的Flux 添加并發功能。在遇到同步Flux 時,可以自由決定使用何種線程機制的是API的消費者,而不是API的實作者。上述功能都是通過 subscribeOn() 操作符實作的,如下所示。

Mono<Flight> flight = ticketService.rxLookupFlight("LOT 783")
                .subscribeOn(Schedulers.boundedElastic());
        Mono<Passenger> passenger = ticketService.rxFindPassenger(1L)
                .subscribeOn(Schedulers.boundedElastic())
                .timeout(Duration.ofSeconds(3)); // 還可以聲明一個逾時
複制代碼           

如果 API 是Reactor驅動的,我們可以在訂閱之前的任何地方插入 subscribeOn() 操作符,并提供一個所謂的Scheduler執行個體,不用花費太大的力氣就能讓兩個方法并發執行。

但是bookTicket( )依然有點美中不足,它傳回的是Ticket,這毫無疑問是阻塞式的。盡管訂票的執行過程可能會非常迅速,但是将其按照 Reactor 的方式進行聲明也是值得的,這樣會讓API更易于演化。

public Mono<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
        return Mono.defer(() ->
                Mono.just(bookTicket(flight, passenger)));
    }
複制代碼           

但是,現在zipWith()傳回的是一個看上去很詭異的Mono<Mono>,根據經驗,每當你看到雙重包裝的類型(比如Optional<Optional<...>>),就意味着在某些地方缺失了對flatMap()的調用。

Mono<Mono<Ticket>> ticket = flight
           .zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p));
複制代碼           

我們可以使用flatMap,并為其傳遞一個恒等式函數,如下:

Mono<Ticket> ticket = flight
             .zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p))
             .flatMap(abs -> abs);
複制代碼           

我們難免會認為subscribeOn()是在 Reactor 中實作并發的恰當工具。這個操作符的确能夠實作這一點,但盡量還是不要使用它(以及後文描述的publishOn() )。在現實中,Flux 來源于異步源,是以根本就沒有必要進行自定義的排程。這裡使用 subscribeOn() 隻是為了展現如何更新已有的應用程式,進而有選擇性地使用反應式原則。但是,在實踐中,Scheduler和subscribeOn()應該是最後的“武器”。

使用 flatMap()

在上述樣例中,我們必須通過電子郵件發送一個 Ticket 清單,在這裡我們需要注意以下三點:

  1. 這個清單可能很長。
  2. 發送一封郵件需要幾毫秒,甚至幾秒。
  3. 發送失敗的時候,應用程式需要平穩運作,但是最後要報告哪些 ticket 沒有投遞成功。

最後一項需求迅速排除了簡單的tickets.forEach(this::sendEmail)方式,因為這種方式會立即抛出異常,并且不會繼續投遞ticket。是以你隻能使用疊代器,大緻代碼如下:

List<Ticket> faitures = new ArrayList<>();
        for (Ticket ticket : tickets) {
            try {
                ticketService.sendEmail(ticket);
            } catch (Exception e) {
                log.warn("Failed to send {}", ticket, e);
                faitures.add(ticket);
            }
        }
複制代碼           

但是,前面兩個需求并沒有得到解決。我們不需要在一個線程中串行的發送郵件,按照傳統的方式我們可以使用ExecutorService 将每個電子郵件送出為一個獨立的任務,代碼如下:

List<Pair<Ticket, Future<Boolean>>> tasks = tickets.stream()
                .map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
                .collect(Collectors.toList());

        List<Ticket> failures = tasks.stream().flatMap(pair -> {
            try {
                Future<Boolean> future = pair.getRight(); 
                future.get(1, TimeUnit.SECONDS); // 1s 的發送時間
                return Stream.empty();
            } catch (Exception e) {
                Ticket ticket = pair.getLeft();
                log.warn("Failed to send {}", ticket, e);
                return Stream.of(ticket);
            }
        }).collect(Collectors.toList());

//-----------------------------------------------------------

    ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


    public Future<Boolean> sendEmailAsync(Ticket ticket) {
        return pool.submit(() -> sendEmail(ticket));
    }    
複制代碼           

首先,我們周遊tickets,并将它們送出到一個線程池中。準确地說,我們調用sendEmailAsync()輔助方法,将對sendEmail()的調用送出到一個線程池中,執行過程會包裝到一個Callable中。更準确地說,Callable的執行個體被先放到線程池前面的一個無界(預設情況下)隊列中。如果任務送出的速度太快,它們将無法及時得到處理,進而線上程池的無界隊列中積壓。這裡缺乏一種減緩送出速度的機制,這也是反應式流和回壓緻力于解決的問題。

為了在發送郵件失敗時能夠重試,是以必須跟蹤哪個Future負責哪個Ticket,這裡以Pair來表示。第二個循環周遊所有的Future,并試圖通過阻塞(get())和等待完成的方式來解除對它們的引用。如果get()成功傳回,将會跳過這個Ticket。但是,如果出現異常,将會傳回與該任務關聯的Ticket執行個體,這樣就能知道它失敗了,稍後再報告它。Stream.flatMap()允許傳回零個或一個元素(實際上可以是任意數量),而Stream.map()通常需要一個元素。

你可能想問,為何需要兩個循環而不是如下的一個循環呢?

tickets.stream()
                .map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
                .flatMap(pair -> {
                    // ...
                }).collect(Collectors.toList());
複制代碼           

如果你不了解Java 8的Stream是如何運作的,就很難發現這裡有一個非常有意思的bug。因為流與 Flux 類似,它們都是延遲執行的,是以隻有在請求終端操作(terminal operation)的時候(例如collect(toList())),才會針對底層集合中的每個元素依次執行操作。這意味着啟動背景任務的map()操作并沒有針對所有ticket立即執行,而是每次隻執行一個元素,交替使用flatMap()。除此之外,我們實際上啟動了一個Future,阻塞等待,然後啟動第二個Future,阻塞等待,以此類推。

上述代碼涉及很多工作,并且出現錯誤的可能性非常高,更不要提什麼可讀性和可維護性了。那麼 Reactor 能在這個場景中發揮什麼作用那? 首先,我們将發送郵件的 API 更新為使用 Reactor,如下所示:

public Mono<Boolean> rxSendEmail(Ticket ticket){
        // 将阻塞的源包裝為反應式
        return Mono.fromCallable(() -> sendEmail(ticket));
    }
複制代碼           

這裡還沒有涉及并發,它隻是将sendEmail()包裝進了一個 Mono 中。然後可以像前面一樣周遊所有的tickets,它并不涉及任何并發,如下所示:

List<Ticket> failures = Flux.fromIterable(this.tickets)
                .flatMap(ticket ->
                        ticketService.rxSendEmail(ticket)
//                                .flatMap(response -> Mono.<Ticket>empty())
                                .ignoreElement()
                                .ofType(Ticket.class)
                                .doOnError(e -> log.warn("Failed to send {}", ticket, e))
                                .onErrorReturn(ticket)
                .buffer()
                .blockLast(); // 收集發送失敗的tickets
複制代碼           

在以上的樣例中,很容易看到内層的flatMap()忽略了response并傳回了一個空的流。在這樣的場景中,flatMap()就有點大材小用了,更有效的方式是ignoreElements()。ignoreElements()會忽略上遊釋出的值,隻轉發onCompleted()或onError()通知。因為我們忽略實際的響應,隻處理錯誤,是以這裡的ignoreElements()能夠運作得非常好。

我們感興趣的内容都在外層flatMap()中。如果隻是使用flatMap(this::rxSendEmail),代碼也可以運作,隻不過rxSendEmail引發的任何故障都會終結整個流。但是,我們想要“捕獲”所有釋出出來的錯誤,将其收集起來供後續使用。我們使用了與Stream.flatMap()類似的技巧:如果response能夠成功釋出,就将其轉換為一個空的 Flux。它的基本含義就是丢棄成功的ticket。但是,如果遇到故障,樣例會傳回引發故障的ticket。額外的doOnError()回調允許将異常以日志的形式記錄下來。當然,也可以将日志記錄添加到onErrorReturn()操作符中,但是我們發現這種關注點分離的方式更符合函數式的風格。

需要注意,如果将外層的flatMap()替換為concatMap(),我們将會遇到與前文提及的JDK中的Stream類似的bug。flatMap(或merge)會立即訂閱所有的内部流。與之相反,concatMap(或concat)則會依次訂閱每個内部Flux。并且隻要沒有人真正訂閱Flux,它就不會開展任何工作。

到目前為止,一個帶有try-catch的for循環被替換成了更難閱讀、更複雜的 Flux。但是,為了将序列化代碼轉換為多線程計算,我們隻需要再加一個操作符,**通過将外層 flatMap 内嵌的 Flux 使用 subscribeOn() 這個操作符使 Mono 采用異步運作的方式。**如下所示。

List<Ticket> failures = Flux.fromIterable(this.tickets)
                .flatMap(ticket ->
                        ticketService.rxSendEmail(ticket)
//                                .flatMap(response -> Mono.<Ticket>empty())
                                .ignoreElement()
                                .ofType(Ticket.class)
                                .doOnError(e -> log.warn("Failed to send {}", ticket, e))
                                .onErrorReturn(ticket)
                                .subscribeOn(Schedulers.boundedElastic())) // 内嵌的flux執行異步操作
                .buffer()
                .blockLast(); // 收集發送失敗的tickets
複制代碼           

它沒有太多的侵入性,你甚至可能很難發現它的存在。額外的 subscribeOn() 操作符會讓每個單獨的rxSendMail()都在一個特定的Scheduler中執行,這是 Reactor 的優勢之一。

線上程方面,Flux 預設同步執行,但是它能夠實作無縫甚至透明的多線程功能。當然,這并不意味着我們可以在任意位置安全地注入Scheduler。隻不過,它的API更加簡潔,抽象層級也更高。我們隻需要記住 Flux 預設是同步的。但是,我們可以很輕松地改變這種行為,将并發功能用到往常我們認為不可能出現的地方。這對于現存的遺留應用程式很有價值,借助這種功能,可以輕松地對其進行優化。

subscribeOn() 放在外部還是内部也是值得讨論的,此外應該盡可能推遲對Flux的訂閱,一般這會發生在外部世界的Web架構附近。這會大大改變你的思維方式,因為整個業務邏輯都是延遲執行的,直到有人真正想要看到結果的時候才會運作。