天天看點

響應式程式設計入門(RxJava)響應式程式設計入門(RxJava)

作者:不洗碗工作室 - Marklux

出處:Marklux's Pub

版權歸作者所有,轉載請注明出處

響應式程式設計入門(RxJava)

背景

随着時間的發展,程式設計領域不斷地推出新的技術來嘗試解決已有的問題,**響應式程式設計(流式程式設計)**正是近幾年來非常流行的一個解決方案,如果我們關注一下業界動态,就會發現絕大多數的語言和架構都紛紛開始支援這一程式設計模型:

  • Java 8 => 引入Stream流,Observable 和 Observer 模式
  • Spring 5 => 引入WebFlux,底層全面采用了響應式模型
  • RxJava => 去年長期霸占github最受歡迎的項目第一名

可以預見,響應式程式設計未來必将大規模的應用于開發領域,阿裡内部也已經開始了相關的改造,目前淘寶應用架構已經走上了流式架構更新之路,作為開發,響應式的程式設計範式還是很有必要掌握的,下文将基于RxJava 2.0給出相關概念的簡單介紹和基本程式設計思路的講解(基于《Learning Rx Java》一書前三章總結)

基本思路

關于響應式程式設計(Reactive Programming, 下文簡稱RP)的定義,衆說紛纭。維基百科将其定義為一種程式設計範式,ReactiveX将其定義為一種設計模式的強化(觀察者模式),也有大牛認為RP隻不過是已有各種的輪子的一個組裝...有關RP的本質,我們可以在文章的最後進行簡單的讨論,但從學習的角度而言,我認為最好的方式是将RP看做是一種面向事件和流的程式設計思想,如下:

Java推崇OOP的程式設計思想,是以對于Java程式員而言,程式就是各種對象的組合,程式設計就是控制對象狀态和行為的一個過程。

RP推崇面向流的程式設計的思想,是以對于開發人員而言,無論是事件還是資料,全部都将以流的方式呈現,這些流時而并行,時而交叉,程式設計就是觀察和調控這些流的一個過程。

從現實世界出發

在現實世界中有一個顯而易見的實體現象,那就是一切都在運動(變化),無論是交通、天氣、人,即便是一塊岩石,也會随着地球的自轉而運動。而不同物體的運動之間可能互不幹擾,比如運動在不同道路上的車輛和行人,也可能出現交叉,比如在同一個十字路口相遇的行人和車輛。

回歸到我們的程式設計當中,我們往往将程式抽象成多個過程,和現實世界一樣,這些過程也在變化和運動,它們之間有時可以并行,有時會産生依賴(前置、後置條件),傳統程式設計模型中,我們往往會采用多線程或者異步的方式來處理這些過程,但這樣的方式并不自然。

是以RP從現實世界進行抽象和采樣,将程式分為了以下三種組成:

  1. Observable:可被觀察的事物,也就是事件和資料流
  2. Observer:觀察流的事物
  3. Operator:操作符,對流進行連接配接和過濾等等操作的事物

讓我們用一個最簡單的例子來引入這三個概念:

Observable<String> myStrings =
	Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
    myStrings.map(s -> s.length())
        .subscribe(s -> System.out.println(s));
複制代碼
           

在上面的例子中,

myStrings

就是Observable,

map(s -> s.length())

就是Operator,

subscribe(s -> System.out.println(s))

就是Observer。

這個例子将會把幾個字元串先取長度,然後再逐個輸出。

Observable

Observable簡單來說,就是流,在RxJava中隻有三個最核心的API:

  • onNext()

    :傳遞一個對象
  • onComplete()

    :傳遞完成的“信号”
  • onError()

    :傳遞一個錯誤

基本上所有的流都是這三種方法的一個包裝。

建立

  1. 使用

    Observable.create()

    Observable<String> myStrings = Observable.create(emitter -> {
        emitter.onNext("apple");
        emitter.onNext("bear");
        emitter.onNext("change");
        emitter.onComplete();
    });
    複制代碼
               
  2. 使用

    Observable.just()

    Observable<String> myStrings =
    	Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
    複制代碼
               
    注意這種方法建立的元素數量必須是有限的
  3. 從其他資料源建立,例如

    Observable.fromIterable()

    Observable.range()

Hot & Cold Observables

Cold的流生産的資料是靜态的,好比一張CD,無論什麼時候,無論什麼人來聽,都可以聽到完整的内容。

Observable<String> source =
      Observable.just("Alpha","Beta","Gamma","Delta","Epsilon");
//first observer
source.subscribe(s -> System.out.println("Observer 1 Received: " + s));
//second observer
source.subscribe(s -> System.out.println("Observer 2 Received: " + s));
複制代碼
           

上面兩個subscribe注冊的observer将會得到完全相同的一串流

Hot的流産生的資料是動态的,好比收音機電台,錯過了播放的時段,過去的資料就取不到了。直接建立Hot流需要用到Listener,官方給出了一個JavaFx的例子,但并不合适放在這裡。

事實上,通常通過ConnectableObservable的方式來将一個Cold的流轉換成一個Hot的流:

ConnectableObservable<String> source =
   Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
   .publish();
   //Set up observer 1
   source.subscribe(s -> System.out.println("Observer 1: " + s));
  //Set up observer 2
  source.map(String::length)
    .subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
  source.connect();
複制代碼
           

通過

publish()

方法可以建立一個

ConnectableObservable

,然後通過

connect()

方法啟動流的傳輸,這時source将會把所有的資料都傳遞給兩個observer。此後如果再給source注冊新的Observer,将不會得到任何資料,因為Hot流不允許資料被重複消費。

Observers

觀察者本身是比較簡單的結構,主要功能由調用方自己去實作。

可以通過實作

Observer

接口的方式去建立一個觀察者,當然更常見的case是通過lambda表達式來建立一個觀察者,就像之前用到的例子一樣,這裡不詳細展開了。

注冊的方式是通過調用Observerbale的

subscribe

方法。

Operators

隻建立流和觀察者并不能有什麼太大的作用,大多時候我們需要通過操作符(Operator)來對流進行各種各樣的操作才能使RP變得有實際意義。

RxJava中提供了非常豐富的操作符,大緻分為以下幾類:

  1. 建立操作符,用于建立流,剛才我們已經用到了其中的幾個,比如Create, Just, Range和Interval
  2. 變換操作符,用于将一個流變換成另一種形式,比如Buffer(将流中的元素打包轉換成集合)、Map(對流中的每個元素執行一個函數),Window(将流中的元素拆分成不同的視窗再發射)
  3. 過濾操作符,過濾掉流中的部分資料以擷取指定的資料元素,比如Filter、First、Distinct
  4. 組合操作符,将多個流融合成一個流,比如And、Then、Merge、Zip
  5. 條件/算術/聚合/轉換操作符 ... 起到各種運算輔助作用
  6. 自定義操作符,由使用者自己建立

如果把每個操作符都展開講一遍,差不多就能出一本書了,可見操作符提供的功能之豐富。下文隻展開一些和背壓有關的操作符。

背壓

所謂背壓,是指異步環境中,生産者的速度大于消費者速度時,消費者反過來控制生産者速度的政策(也稱為回壓),這是一種流控的方式,可以更有效的利用資源、同時防止錯誤的雪崩。

為了實作背壓政策,可以使用以下幾種操作符

  1. Throttling 節流類

    通過操作符來調節Observable發射消息的速度,比如使用

    sample()

    來定期采樣,并發出最後一個資料,或者使用

    throttleWithTimeout()

    來丢棄逾時的資料。但這樣做會丢棄一部分資料,最終消費者拿不到完整的消息。
  2. Buffer & Window 緩沖和視窗類

    使用緩沖

    buffer()

    和視窗

    window()

    暫存那些發送速度過快的消息,等到消息發送速度下降的時候再釋放它們。這主要應用于Observable發送速率不均勻的場景。

除了使用操作符之外,還有一種方式可以實作背壓,那就是響應式拉取(Reactive pull)。

通過在Observer中調用

request(n)

方法,可以實作由消費者來反控生産者的生産,也就是說隻有當消費者請求的時候,生産者才去産生資料,當消費者消費完了資料再去請求新的資料,這樣就不會出現生産速度過快的情況了,如下圖

但是這樣做需要上遊的被觀察者能夠對request請求做出響應,這時候又可以用到幾個操作符來控制Observable的行為:

  1. onBackPressureBuffer

    為Observable發出來的資料制作緩存,産生的資料先放在緩存中,每當有request請求過來時,就從緩存裡取出對應數量的事件傳回。
  2. onBackPressureDrop

    指令Observable丢棄後來的時間,直到Subscriber再次調用request(n)方法的時候,就發送給該subscriber調用時間以後的n個時間。

背壓政策是一個值得深入研究和探讨的領域,基于消費者消息的回壓讓動态限流、斷路成為可能,也因為有了背壓的感覺,應用有機會做到動态的縮擴容。

思考:為什麼需要RP

RP的基本概念介紹的差不多了,現在需要思考一下為什麼需要RP,在服務端怎麼應用RP,以及它所能夠帶來的優勢。

首先,有關RP的本質,我覺得它就是一個異步程式設計的輪子,用觀察者模式的API把異步程式設計的過程變得更加清晰和簡單,這就好比go使用CSP來簡化并發操作一樣,底層其實還是對已有技術的封裝。

那麼問題在于,為什麼要封裝異步程式設計,使用異步程式設計能帶來什麼好處,為了解決這個問題我們又需要回歸原點。

如果要問服務端最大的性能瓶頸是什麼,那答案一定是IO,因為處理一個請求的過程中最耗時的部分就是等待IO,而等待就會造成阻塞,是以如果要提升性能,就不能寫出阻塞的代碼來。

如何才能讓代碼不阻塞?以Java服務端來說,傳統的處理方式無外乎以下兩種:

  1. 使用Thread,把業務代碼和IO代碼放到不同的線程裡跑。但你需要面對并發問題(資源競争),同時根據之前對Java線程排程的分析我們知道這樣對CPU的資源使用率并不高效(上下文切換消耗比較大)。
  2. 使用異步回調,你可以用Callback或者Future來實作,但需要自己去實作排程邏輯,同時Callback這樣的模式寫出來的代碼是不好了解的,有可能出現Callbcak Hell。

是以最終為了解決性能瓶頸,RP給出的辦法就是:

提供一個優秀的異步處理架構,同時簡化編寫異步代碼的流程,最終實作減少阻塞,提升性能的大目标。