天天看點

Reactive Programming 一種技術,各自表述

前言

作為一名 Java 開發人員,尤其是 Java 服務端工程師,對于 Reactive Programming 的概念似乎相對陌生。随着 Java 9 以及 Spring Framework 5 的相繼釋出,Reactive 技術逐漸開始被廣大從業人員所注意,筆者作為其中一員,更渴望如何了解 Reactive Programming,以及它所帶來的哪些顯著的程式設計變化,更為重要的是,怎麼将其用于實際生産環境,解決目前面臨的問題。然而,随着時間的推移和了解的深入,筆者對 Reactive Programming 的熱情逐漸被澆息,對它的未來保持謹慎樂觀的态度。

本文從了解 Reactive Programming 的角度出發,盡可能地保持理性和中立的态度,讨論 Reactive Programming 的實質。

初始 Reactive

筆者第一次接觸 Reactive 技術的時間還要回溯到 2015年末,當時部分應用正使用 Hystrix 實作服務熔斷,而 Hystrix 底層依賴是 RxJava 1.x,RxJava 是 Reactive 在 Java 程式設計語言的擴充架構。當時接觸 Reactive 隻能算上一種間接的接觸,根據 Hystrix 特性來了解 Reactive 技術,感覺上,Hystrix 逾時和信号量等特性與 Java 并發架構(J.U.C)的關系密切,進而認為 Reactive 是 J.U.C 的擴充。随後,筆者便參考了兩本關于 Reactive Java 程式設計方面的書:《Reactive Java Programming》和《Reactive Programming with RxJava》。遺憾的是,兩者盡管詳細地描述 RxJava 的使用方法,然而卻沒有把 Reactive 使用場景讨論到要點上,如《Reactive Programming with RxJava》所給出的使用場景說明:

When You Need Reactive Programming

Reactive programming is useful in scenarios such as the following:

  • Processing user events such as mouse movement and clicks, keyboard typing,GPS signals changing over time as users move with their device, device gyroscope signals, touch events, and so on.
  • Responding to and processing any and all latency-bound IO events from disk or network, given that IO is inherently asynchronous ...
  • Handling events or data pushed at an application by a producer it cannot control ...

實際上,以上三種使用場景早已在 Java 生态中完全地實作并充分地實踐,它們對應的技術分别是 Java AWT/Swing、NIO/AIO 以及 JMS(Java 消息服務)。那麼,再談 RxJava 的價值又在哪裡呢?如果讀者是初學者,或許還能蒙混過關。好奇心促使筆者重新開始踏上探索 Reactive 之旅。

了解 Reactive

2017年 Java 技術生态中,最有影響力的釋出莫過于 Java 9 和 Spring 5,前者主要支援子產品化,次要地提供了 Flow API 的支援,後者則将”身家性命“壓在 Reactive 上面,認為 Reactive 是未來的趨勢,它以 Reactive 架構 Reactor 為基礎,逐漸建構一套完整的 Reactive 技術棧,其中以 WebFlux 技術為最引人關注,作為替代 Servlet Web 技術棧的核心特性,承載了多年 Spring 逆轉 Java EE 的初心。于是,業界開始大力地推廣 Reactive 技術,于是筆者又接觸到一些關于 Reactive 的講法。

關于 Reactive 的一些講法

其中筆者挑選了以下三種出鏡率最高的講法:

  • Reactive 是異步非阻塞程式設計
  • Reactive 能夠提升程式性能
  • Reactive 解決傳統程式設計模型遇到的困境

第一種說法描述了功能特性,第二種說法表達了性能收效,第三種說法說明了終極目地。下面的讨論将圍繞着這三種講法而展開,深入地探讨 Reactive Programming 的實質,并且了解為什麼說 Reactive Programming 是”一種技術,各自表述“。

同時,讨論的方式也一反常态,并不會直奔主題地解釋什麼 Reactive Programming,而是從問題的角度出發,從 Reactive 規範和架構的論點,了解傳統程式設計模型中所遇到的困境,逐漸地揭開 Reactive 神秘的面紗。其中 Reactive 規範是 JVM Reactive 擴充規範

Reactive Streams JVM

,而 Reactive 實作架構則是最典型的實作:

  • Java 9 Flow API
  • RxJava

傳統程式設計模型中的某些困境

Reactor 認為阻塞可能是浪費的

3.1. Blocking Can Be Wasteful

Modern applications can reach huge numbers of concurrent users, and, even though the capabilities of modern hardware have continued to improve, performance of modern software is still a key concern.

There are broadly two ways one can improve a program’s performance:

  1. parallelize: use more threads and more hardware resources.
  2. seek more efficiency in how current resources are used.

Usually, Java developers write programs using blocking code. This practice is fine until there is a performance bottleneck, at which point the time comes to introduce additional threads, running similar blocking code. But this scaling in resource utilization can quickly introduce contention and concurrency problems.

Worse still, blocking wastes resources.

So the parallelization approach is not a silver bullet.

将以上 Reactor 觀點歸納如下,它認為:

  1. 阻塞導緻性能瓶頸和浪費資源
  2. 增加線程可能會引起資源競争和并發問題
  3. 并行的方式不是銀彈(不能解決所有問題)

第三點基本上是廢話,前面兩點則較為容易了解,為了減少了解的偏差,以下讨論将結合示例說明。

了解阻塞的弊端

假設有一個資料加載器,分别加載配置、使用者資訊以及訂單資訊,如下圖所示:

  • 圖示
Reactive Programming 一種技術,各自表述
  • Java 實作
public class DataLoader {

    public final void load() {
        long startTime = System.currentTimeMillis(); // 開始時間
        doLoad(); // 具體執行
        long costTime = System.currentTimeMillis() - startTime; // 消耗時間
        System.out.println("load() 總耗時:" + costTime + " 毫秒");
    }

    protected void doLoad() { // 串行計算
        loadConfigurations();    //  耗時 1s
        loadUsers();                  //  耗時 2s
        loadOrders();                // 耗時 3s
    } // 總耗時 1s + 2s  + 3s  = 6s

    protected final void loadConfigurations() {
        loadMock("loadConfigurations()", 1);
    }

    protected final void loadUsers() {
        loadMock("loadUsers()", 2);
    }

    protected final void loadOrders() {
        loadMock("loadOrders()", 3);
    }

    private void loadMock(String source, int seconds) {
        try {
            long startTime = System.currentTimeMillis();
            long milliseconds = TimeUnit.SECONDS.toMillis(seconds);
            Thread.sleep(milliseconds);
            long costTime = System.currentTimeMillis() - startTime;
            System.out.printf("[線程 : %s] %s 耗時 :  %d 毫秒\n",
                    Thread.currentThread().getName(), source, costTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        new DataLoader().load();
    }

}           
  • 運作結果
[線程 : main] loadConfigurations() 耗時 :  1005 毫秒
[線程 : main] loadUsers() 耗時 :  2002 毫秒
[線程 : main] loadOrders() 耗時 :  3001 毫秒
load() 總耗時:6031 毫秒           
  • 結論

由于加載過程串行執行的關系,導緻消耗實作線性累加。Blocking 模式即串行執行 。

不過 Reactor 也提到,以上問題可通過并行的方式來解決,不過編寫并行程式較為複雜,那麼其中難點在何處呢?

了解并行的複雜

再将以上示例由串行調整為并行,如下圖所示:

Reactive Programming 一種技術,各自表述
  • Java 代碼
public class ParallelDataLoader extends DataLoader {

    protected void doLoad() {  // 并行計算
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 建立線程池
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations, null);      //  耗時 >= 1s
        completionService.submit(super::loadUsers, null);               //  耗時 >= 2s
        completionService.submit(super::loadOrders, null);              //  耗時 >= 3s

        int count = 0;
        while (count < 3) { // 等待三個任務完成
            if (completionService.poll() != null) {
                count++;
            }
        }
        executorService.shutdown();
    }  // 總耗時 max(1s, 2s, 3s)  >= 3s

    public static void main(String[] args) {
        new ParallelDataLoader().load();
    }

}           
[線程 : pool-1-thread-1] loadConfigurations() 耗時 :  1003 毫秒
[線程 : pool-1-thread-2] loadUsers() 耗時 :  2005 毫秒
[線程 : pool-1-thread-3] loadOrders() 耗時 :  3005 毫秒
load() 總耗時:3068 毫秒           

明顯地,程式改造為并行加載後,性能和資源使用率得到提升,消耗時間取最大者,即三秒,由于線程池操作的消耗,整體時間将略增一點。不過,以上實作為什麼不直接使用

Future#get()

方法強制所有任務執行完畢,然後再統計總耗時?

Reactor 這方面的看法并沒有向讀者清晰地表達全秒,不過這還不是全部,聽聽它接下來的說法。

認為異步不一定能夠救贖

3.2. Asynchronicity to the Rescue?

The second approach (mentioned earlier), seeking more efficiency, can be a solution to the resource wasting problem. By writing asynchronous, non-blocking code, you let the execution switch to another active task using the same underlying resources and later come back to the current process when the asynchronous processing has finished.

Java offers two models of asynchronous programming:

  • Callbacks: Asynchronous methods do not have a return value but take an extra

    callback

    parameter (a lambda or anonymous class) that gets called when the result is available. A well known example is Swing’s

    EventListener

    hierarchy.
  • Futures: Asynchronous methods return a

    Future<T>

    immediately. The asynchronous process computes a

    T

    value, but the

    Future

    object wraps access to it. The value is not immediately available, and the object can be polled until the value is available. For instance,

    ExecutorService

    running

    Callable<T>

    tasks use

    Future

    objects.

Are these techniques good enough? Not for every use case, and both approaches have limitations.

Callbacks are hard to compose together, quickly leading to code that is difficult to read and maintain (known as "Callback Hell").

Futures are a bit better than callbacks, but they still do not do well at composition, despite the improvements brought in Java 8 by 

CompletableFuture

.

再次将以上觀點歸納,它認為:

  • Callbacks 是解決非阻塞的方案,然而他們之間很難組合,并且快速地将代碼引導至 "Callback Hell" 的不歸路
  • Futures 相對于 Callbacks 好一點,不過還是無法組合,不過  

    CompletableFuture

    能夠提升這方面的不足

以上 Reactor 的觀點僅給出了結論,沒有解釋現象,其中場景設定也不再簡單直白,從某種程度上,這也側面地說明,Reactive Programming 實際上是”高端玩家“的遊戲。接下來,本文仍通過示例的方式,試圖解釋"Callback Hell" 問題以及

Future

的限制。

了解 "Callback Hell"
  • Java GUI 示例
public class JavaGUI {

    public static void main(String[] args) {
        JFrame jFrame = new JFrame("GUI 示例");
        jFrame.setBounds(500, 300, 400, 300);
        LayoutManager layoutManager = new BorderLayout(400, 300);
        jFrame.setLayout(layoutManager);
        jFrame.addMouseListener(new MouseAdapter() { // callback 1
            @Override
            public void mouseClicked(MouseEvent e) {
                System.out.printf("[線程 : %s] 滑鼠點選,坐标(X : %d, Y : %d)\n",
                        currentThreadName(), e.getX(), e.getY());
            }
        });
        jFrame.addWindowListener(new WindowAdapter() {  // callback 2
            @Override
            public void windowClosing(WindowEvent e) {
                System.out.printf("[線程 : %s] 清除 jFrame... \n", currentThreadName());
                jFrame.dispose(); // 清除 jFrame
            }

            @Override
            public void windowClosed(WindowEvent e) {
                System.out.printf("[線程 : %s] 退出程式... \n", currentThreadName());
                System.exit(0); // 退出程式
            }
        });
        System.out.println("目前線程:" + currentThreadName());
        jFrame.setVisible(true);
    }

    private static String currentThreadName() { // 目前線程名稱
        return Thread.currentThread().getName();
    }
}           

點選窗體并關閉視窗,控制台輸出如下:

目前線程:main
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 180, Y : 121)
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 180, Y : 122)
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 180, Y : 122)
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 180, Y : 122)
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 180, Y : 122)
[線程 : AWT-EventQueue-0] 滑鼠點選,坐标(X : 201, Y : 102)
[線程 : AWT-EventQueue-0] 清除 jFrame... 
[線程 : AWT-EventQueue-0] 退出程式...           

Java GUI 以及事件/監聽模式基本采用匿名内置類實作,即回調實作。從本例可以得出,滑鼠的點選确實沒有被其他線程給阻塞。不過當監聽的次元增多時,Callback 實作也随之增多。Java Swing 事件/監聽是一種典型的既符合異步非阻塞,又屬于 Callback 實作的場景,其并發模型可為同步或異步。不過,在 Java 8 之前,由于接口無法支援

default

方法,當接口方法過多時,通常采用

Adapter

模式作為緩沖方案,達到按需實作的目的。尤其在 Java GUI 場景中。即使将應用的 Java 版本更新到 8 ,由于這些 Adapter ”遺老遺少“實作的存在,使得開發人員仍不得不面對大量而繁瑣的 Callback 折中方案。既然 Reactor 提出了這個問題,那麼它或者 Reactive 能否解決這個問題呢?暫時存疑,下一步是如何了解

Future

了解

Future

的限制

Reactor 的觀點僅羅列

Future

的一些限制,并沒有将它們解釋清楚,接下來用兩個例子來說明其中原委。

限制一:

Future

的阻塞性

在前文示例中,

ParallelDataLoader

利用

CompletionService

API 實作

load*()

方法的并行加載,如果将其調整為

Future

的實作,可能的實作如下:

  • Java

    Future

    阻塞式加載示例
public class FutureBlockingDataLoader extends DataLoader {

    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 建立線程池
        runCompletely(executorService.submit(super::loadConfigurations));  //  耗時 >= 1s
        runCompletely(executorService.submit(super::loadUsers));           //  耗時 >= 2s
        runCompletely(executorService.submit(super::loadOrders));          //  耗時 >= 3s
        executorService.shutdown();
    } // 總耗時 sum(>= 1s, >= 2s, >= 3s)  >= 6s

    private void runCompletely(Future<?> future) {
        try {
            future.get(); // 阻塞等待結果執行
        } catch (Exception e) {
        }
    }

    public static void main(String[] args) {
        new FutureBlockingDataLoader().load();
    }

}           
[線程 : pool-1-thread-1] loadConfigurations() 耗時 :  1003 毫秒
[線程 : pool-1-thread-2] loadUsers() 耗時 :  2004 毫秒
[線程 : pool-1-thread-3] loadOrders() 耗時 :  3002 毫秒
load() 總耗時:6100 毫秒           

ParallelDataLoader

加載耗時為”3068 毫秒“,調整後的

FutureBlockingDataLoader

則比串行的

DataLoader

加載耗時(“6031 毫秒”)還要長。說明

Future#get()

方法不得不等待任務執行完成,換言之,如果多個任務送出後,傳回的多個 Future 逐一調用

get()

方法時,将會依次 blocking,任務的執行從并行變為串行。這也是之前為什麼

ParallelDataLoader

不采取

Future

的解決方案的原因。

限制二:

Future

不支援鍊式操作

由于

Future

無法實作異步執行結果鍊式處理,盡管

FutureBlockingDataLoader

能夠解決方法資料依賴以及順序執行的問題,不過它将并行執行帶回了阻塞(串行)執行。是以,它不是一個理想實作。不過

CompletableFuture

可以幫助提升

Future

的限制:

  • CompletableFuture

    重構

    Future

    鍊式實作
public class FutureChainDataLoader extends DataLoader {

    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result, throwable) -> { // 完成時回調
                    System.out.println("加載完成");
                })
                .join(); // 等待完成
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}           
[線程 : ForkJoinPool.commonPool-worker-1] loadConfigurations() 耗時 :  1000 毫秒
[線程 : ForkJoinPool.commonPool-worker-1] loadUsers() 耗時 :  2005 毫秒
[線程 : ForkJoinPool.commonPool-worker-1] loadOrders() 耗時 :  3001 毫秒
加載完成
load() 總耗時:6093 毫秒           

通過輸出日志分析,

FutureChainDataLoader

并沒有像

FutureBlockingDataLoader

那樣使用三個線程分别執行加載任務,僅使用了一個線程,換言之,這三次加載同一線程完成,并且異步于 main 線程,如下所示:

Reactive Programming 一種技術,各自表述

盡管

CompletableFuture

不僅是異步非阻塞操作,而且還能将 Callback 組合執行,也不存在所謂的 ”Callback Hell“ 等問題。如果強制等待結束的話,又回到了阻塞程式設計的方式。同時,相對于

FutureBlockingDataLoader

實作,重構後的

FutureChainDataLoader

不存在明顯性能提升。

稍作解釋,

CompletableFuture

不僅可以支援

Future

鍊式操作,而且提供三種生命周期回調,即執行回調(Action)、完成時回調(Complete)、和異常回調(Exception),類似于 Spring 4

ListenableFuture

以及 Guava

ListenableFuture

至此,Reactor 的官方參考文檔再沒有出現其他有關”傳統程式設計模型中的某些困境“的描述,或許讀者老爺和我一樣,對 Reactive 充滿疑惑,它真能解決以上問題嗎?當然,監聽則明,偏聽則暗,下面我們再來參考

的觀點。

認為異步系統和資源消費需要特殊處理

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

觀點歸納:

  • 流式資料容量難以預判
  • 異步程式設計複雜
  • 資料源和消費端之間資源消費難以平衡

此觀點與 Reactor 相同的部分是,兩者均認為異步程式設計複雜,而前者還提出了資料結構(流式資料)以及資料消費問題。

無論兩者的觀點孰優誰劣,至少說明一個現象,業界對于 Reactive 所解決的問題并非達到一緻,幾乎各說各話。那麼,到底怎樣才算 Reactive Programming 呢?

什麼是 Reactive Programming

關于什麼是 Reactive Programming,下面給出六種管道的定義,嘗試從不同的角度,了解 Reactive Programming 的意涵。首先了解的是“

The Reactive Manifesto

” 中的定義

中的定義

Reactive Systems are: Responsive, Resilient, Elastic and Message Driven.

https://www.reactivemanifesto.org/

該組織對 Reactive 的定義非常簡單,其特點展現在以下關鍵字:

  • 響應的(Responsive)
  • 适應性強的(Resilient)
  • 彈性的(Elastic)
  • 消息驅動的(Message Driven)

不過這樣的定義側重于 Reactive 系統,或者說是設計 Reactive 系統的原則。

維基百科

維基百科作為全世界的權威知識庫,其定義的公允性能夠得到保證:

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.
參考位址: https://en.wikipedia.org/wiki/Reactive_programming

維基百科認為 Reactive programming 是一種聲明式的程式設計範式,其核心要素是資料流(data streams )與其傳播變化( propagation of change),前者是關于資料結構的描述,包括靜态的數組(arrays)和動态的事件發射器(event emitters)。由此描述,在筆者腦海中浮現出以下技術視圖:

  • 資料流:Java 8

    Stream

  • 傳播變化:Java

    Observable

    /

    Observer

  • 事件/監聽:Java

    EventObject

    EventListener

這些技術能夠很好地滿足維基百科對于 Reactive 的定義,那麼, Reactive 架構和規範的存在意義又在何方?或許以上定義過于抽象,還無法诠釋 Reactive 的全貌。于是乎,筆者想到了去 Spring 官方找尋答案,正如所願,在 Spring Framework 5 官方參考文檔中找到其中定義。

Spring 5 中的定義

The term "reactive" refers to programming models that are built around reacting to change — network component reacting to I/O events, UI controller reacting to mouse events, etc. In that sense non-blocking is reactive because instead of being blocked we are now in the mode of reacting to notifications as operations complete or data becomes available.
https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-why-reactive

相對于維基百科的定義,Spring 5 WebFlux 章節同樣也提到了變化響應(reacting to change ) ,并且還說明非阻塞(non-blocking)就是 Reactive。同時,其定義的側重點在響應通知方面,包括操作完成(operations complete)和資料可用(data becomes available)。Spring WebFlux 作為 Reactive Web 架構,天然支援非阻塞,不過早在 Servlet 3.1 規範時代皆以實作以上需求,其中包括 Servlet 3.1 非阻塞 API

ReadListener

WriteListener

,以及 Servlet 3.0 所提供的異步上下文

AsyncContext

和事件監聽

AsyncListener

。這些 Servlet 特性正是為 Spring WebFlux 提供适配的以及,是以 Spring WebFlux 能完全相容 Servlet 3.1 容器。筆者不禁要懷疑,難道 Reactive 僅是新包裝的概念嗎?或許就此下結論還為時尚早,不妨在了解一下 ReactiveX 的定義。

ReactiveX

廣泛使用的 RxJava 作為 ReactiveX 的 Java 實作,對于 Reactive 的定義,ReactiveX 具備相當的權威性:

ReactiveX extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
http://reactivex.io/intro.html

不過,ReactiveX 并沒有直接給 Reactive 下定義,而是通過技術實作手段說明如何實作 Reactive。ReactiveX 作為觀察者模式的擴充,通過操作符(Opeators)對資料/事件序列(Sequences of data and/or events )進行操作,并且屏蔽并發細節(abstracting away…),如線程 API(

Exectuor

Future

Runnable

)、同步、線程安全、并發資料結構以及非阻塞 I/O。該定義的側重點主要關注于實作,包括設計模式、資料結構、資料操作以及并發模型。除設計模式之外,Java 8

Stream

API 具備不少的操作符,包括疊代操作 for-each、map/reduce 以及集合操作

Collector

等,同時,通過

parallel()

sequential()

方法實作并行和串行操作間的切換,同樣屏蔽并發的細節。至于資料結構,

Stream

和資料流或集合序列可以畫上等号。唯獨在設計模式上,

Stream

是疊代器(Iterator)模式實作,而 ReactiveX 則屬于觀察者(Observer)模式的實作。 對此,Reactor 做了進一步地解釋。

The reactive programming paradigm is often presented in object-oriented languages as an extension of the Observer design pattern. One can also compare the main reactive streams pattern with the familiar Iterator design pattern, as there is a duality to the Iterable-Iterator pair in all of these libraries. One major difference is that, while an Iterator is pull-based, reactive streams are push-based.
http ://projectreactor.io/docs/core/release/reference/# intro-reactive

同樣地,Reactor 也提到了觀察者模式(Observer pattern )和疊代器模式(Iterator pattern)。不過它将 Reactive 定義為響應流模式(Reactive streams pattern ),并解釋了該模式和疊代器模式在資料讀取上的差異,即前者屬于推模式(push-based),後者屬于拉模式(pull-based)。難道就因為這因素,就要使用 Reactive 嗎?這或許有些牽強。個人認為,以上組織均沒有坦誠或者簡單地向使用者表達,都采用一種模糊的描述,多少難免讓人覺得故弄玄虛。幸運地是,我從 ReactiveX 官方找到一位前端牛人

André Staltz

,他在學習 Reactive 過程中與筆者一樣,吃了不少的苦頭,在他博文

《The introduction to Reactive Programming you've been missing》

中,他給出了中肯的解釋。

給出的定義

Reactive programming is programming with asynchronous data streams. In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc.
"What is Reactive Programming?"

他在文章指出,Reactive Programming 并不是新東西,而是司空見慣的混合物,比如事件總監、滑鼠點選事件等。同時,文中也提到異步(asynchronous )以及資料流(data streams)等關鍵字。如果說因為 Java 8 Stream 是疊代器模式的緣故,它不屬于Reactive Programming 範式的話,那麼,Java GUI 事件/監聽則就是 Reactive。那麼,Java 開發人員學習 RxJava、Reactor、或者 Java 9 Flow API 的必要性又在哪裡呢?是以,非常有必要深入探讨 Reactive Programming 的使用場景。

Reactive Programming 使用場景

正如同 Reactive Programming 的定義那樣,各個組織各執一詞,下面仍采用多方引證的方式,尋求 Reactive Programming 使用場景的“最大公約數”。

認為的使用場景

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary.
https://github.com/reactive-streams/reactive-streams-jvm

認為 Reactive Streams 用于在異步邊界(asynchronous boundary)管理流式資料交換( govern the exchange of stream data)。異步說明其并發模型,流式資料則展現資料結構,管理則強調它們的它們之間的協調。

Spring 5

Reactive and non-blocking generally do not make applications run faster. They can, in some cases, for example if using the 

WebClient

 to execute remote calls in parallel. On the whole it requires more work to do things the non-blocking way and that can increase slightly the required processing time.

The key expected benefit of reactive and non-blocking is the ability to scale with a small, fixed number of threads and less memory. That makes applications more resilient under load because they scale in a more predictable way.

Spring 認為 Reactive 和非阻塞通常并非讓應用運作更快速(generally do not make applications run faster),甚至會增加少量的處理時間,是以,它的使用場景則利用較少的資源,提升應用的伸縮性(scale with a small, fixed number of threads and less memory)。

The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.

ReactiveX 所描述的使用場景與 Spring 的不同,它沒有從性能入手,而是代碼可讀性和減少 Bugs 的角度出發,解釋了 Reactive Programming 的價值。同時,強調其架構的核心特性:異步(asynchronous)、同順序(same sort)群組合操作(composable operations)。它也間接地說明了,Java 8

Stream

在組合操作的限制,以及操作符的不足。

Composability and readability

Data as a flow manipulated with a rich vocabulary of operators

Nothing happens until you subscribe

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high

High level but high value abstraction that is concurrency-agnostic

Reactor 同樣強調結構性和可讀性(Composability and readability)和高層次并發抽象(High level abstraction),并明确地表示它提供豐富的資料操作符( rich vocabulary of operators)彌補

Stream

API 的短闆,還支援背壓(Backpressure)操作,提供資料生産者和消費者的消息機制,協調它們之間的産銷失衡的情況。同時,Reactor 采用訂閱式資料消費(Nothing happens until you subscribe)的機制,實作

Stream

所不具備的資料推送機制。

至此,讨論接近尾聲,最後的部分将 Reactive Programming 内容加以總結。

總結 Reactive Programming

Reactive Programming 作為觀察者模式(

Observer

) 的延伸,不同于傳統的指令程式設計方式(

Imperative programming

)同步拉取資料的方式,如疊代器模式(

Iterator

) 。而是采用資料釋出者同步或異步地推送到資料流(Data Streams)的方案。當該資料流(Data Steams)訂閱者監聽到傳播變化時,立即作出響應動作。在實作層面上,Reactive Programming 可結合函數式程式設計簡化面向對象語言文法的臃腫性,屏蔽并發實作的複雜細節,提供資料流的有序操作,進而達到提升代碼的可讀性,以及減少 Bugs 出現的目的。同時,Reactive Programming 結合背壓(Backpressure)的技術解決釋出端生成資料的速率高于訂閱端消費的問題。