天天看點

RxJava使用介紹-概念

RxJava系列教程:

​​1. RxJava使用介紹​​​ ​​【視訊教程】​​​

​​​2. RxJava操作符​​​

  ​​​• Creating Observables(Observable的建立操作符)​​​ ​​【視訊教程】​​​

  ​​​• Transforming Observables(Observable的轉換操作符)​​​ ​​【視訊教程】​​​

  ​​​• Filtering Observables(Observable的過濾操作符)​​​ ​​【視訊教程】​​​

  ​​​• Combining Observables(Observable的組合操作符)​​​ ​​【視訊教程】​​​

  ​​​• Error Handling Operators(Observable的錯誤處理操作符)​​​ ​​【視訊教程】​​​

  ​​​• Observable Utility Operators(Observable的輔助性操作符)​​​ ​​【視訊教程】​​​

  ​​​• Conditional and Boolean Operators(Observable的條件和布爾操作符)​​​ ​​【視訊教程】​​​

  ​​​• Mathematical and Aggregate Operators(Observable數學運算及聚合操作符)​​​ ​​【視訊教程】​​​

  ​​​• 其他如observable.toList()、observable.connect()、observable.publish()等等;​​​ ​​【視訊教程】​​​

​​​3. RxJava Observer與Subcriber的關系​​​ ​​【視訊教程】​​​

​​​4. RxJava線程控制(Scheduler)​​​ ​​【視訊教程】​​​

​​​5. RxJava 并發之資料流發射太快如何辦(背壓(Backpressure))​​​ ​​【視訊教程】​​

RxJava到底是什麼?使用RxJava到底有什麼好處呢?其實RxJava是ReactiveX中使用Java語言實作的版本,目前ReactiveX已經實作的語言版本有:

Java: RxJava

JavaScript: RxJS

C#: Rx.NET

C#(Unity): UniRx

Scala: RxScala

Clojure: RxClojure

C++: RxCpp

Ruby: Rx.rb

Python: RxPY

Groovy: RxGroovy

JRuby:RxJRuby

Kotlin: RxKotlin

可以看出ReactiveX在開發應用中如此的火爆。那到底什麼是ReactiveX呢?簡單來說,ReactiveX就是”觀察者模式+疊代器模式+函數式程式設計”,它擴充了觀察者模式,通過使用可觀察的對象序列流來表述一系列事件,訂閱者進行占點觀察并對序列流做出反應(或持久化或輸出顯示等等);借鑒疊代器模式,對多個對象序列進行疊代輸出,訂閱者可以依次處理不同的對象序列;使用函數式程式設計思想(functional programming),極大簡化問題解決的步驟。

RxJava的基本概念

RxJava最核心的兩個東西就是Observables(被觀察者,也就是事件源)和Subscribers(觀察者),由Observables發出一系列的事件,Subscribers進行訂閱接收并進行處理,看起來就好像是設計模式中的觀察者模式,但是跟觀察者模式不同的地方就在于,如果沒有觀察者(即Subscribers),Observables是不會發出任何事件的。

由于Observables發出的事件并不僅限于一個,有可能是多個的,如何確定每一個事件都能發送到Subscribers上進行處理呢?這裡就借鑒了設計模式的疊代器模式,對事件進行疊代輪詢(next()、hasNext()),在疊代過程中如果出現異常則直接抛出(throws Exceptions),下表是Observable和疊代器(Iterable)的對比:

事件(event) 疊代器(Iterable) Observable
接收資料 T next() onNext(T)
發現錯誤 throws Exception onError(Exception)
疊代完成 !hasNext() onCompleted()

與疊代器模式不同的地方在于,疊代器模式在事件處理上采用的是“同步/拉式”的方式,而Observable采用的是“異步/推式”的方式,對于Subscriber(觀察者)而言,這種方式會更加靈活。

Rxjava的看起來很像設計模式中的觀察者模式,但是有一點明顯不同,那就是如果一個Observerble沒有任何的的Subscriber,那麼這個Observable是不會發出任何事件的。

補充概念
  • Observable:發射源,英文釋義“可觀察的”,在觀察者模式中稱為“被觀察者”或“可觀察對象”;
  • Observer:接收源,英文釋義“觀察者”,沒錯!就是觀察者模式中的“觀察者”,可接收Observable、Subject發射的資料;
  • Subject:Subject是一個比較特殊的對象,既可充當發射源,也可充當接收源,為避免初學者被混淆,本章将不對Subject做過多的解釋和使用,重點放在Observable和Observer上,先把最基本方法的使用學會,後面再學其他的都不是什麼問題;
  • Subscriber:“訂閱者”,也是接收源,那它跟Observer有什麼差別呢?Subscriber實作了Observer接口,比Observer多了一個最重要的方法unsubscribe( ),用來取消訂閱,當你不再想接收資料了,可以調用unsubscribe( )方法停止接收,Observer 在 subscribe() 過程中,最終也會被轉換成 Subscriber 對象,一般情況下,建議使用Subscriber作為接收源;
  • Subscription :Observable調用subscribe( )方法傳回的對象,同樣有unsubscribe( )方法,可以用來取消訂閱事件;
  • Action0:RxJava中的一個接口,它隻有一個無參call()方法,且無傳回值,同樣還有Action1,Action2…Action9等,Action1封裝了含有 1 個參的call()方法,即call(T t),Action2封裝了含有 2 個參數的call方法,即call(T1 t1,T2 t2),以此類推;
  • Func0:與Action0非常相似,也有call()方法,但是它是有傳回值的,同樣也有Func0、Func1…Func9;

Hello World

建立一個Observable對象很簡單,直接調用Observable.create即可。

Observable<String> mObservable = Observable.create(  
    new Observable.OnSubscribe<String>() {  
        @Override  
        public void call(Subscriber<? super String> sub) {  
            sub.onNext("Hello, world!");  
            sub.onCompleted();      

這裡定義的Observable對象僅僅發出一個Hello World字元串,然後就結束了。接着我們建立一個Subscriber來處理Observable對象發出的字元串。

Subscriber<String> mSubscriber = new Subscriber<String>() {  
    @Override  
    public void onNext(String s) { System.out.println(s); }  

    @Override  
    public void onCompleted() { }  

    @Override  
    public void onError(Throwable e) { }  
};        

這裡subscriber僅僅就是列印observable發出的字元串。通過subscribe函數就可以将我們定義的mObservable對象和mSubscriber對象關聯起來,這樣就完成了mSubscriber對mObservable的訂閱。

mObservable.subscribe(mSubscriber);        

一旦mSubscriber訂閱了mObservable,mObservable就是調用mSubscriber對象的onNext和onComplete方法,mSubscriber就會列印出Hello World!

更簡潔的代碼(RxJava的流式API調用)

上面的代碼最終可以寫成這樣:

//這就是RxJava的流式API調用
Observable.just("Hello, world!")  
    .subscribe(new Action1<String>() {  
        @Override  
        public void call(String s) {  
              System.out.println(s);  
        }  
    });        

just是RxJava中的建立操作符,RxJava的強大性就來自于它所定義的操作符。後續文章我們會逐一介紹RxJava的操作符。

上面就是一個非常簡易的RxJava流式API的調用:同一個調用主體一路調用下來,一氣呵成。

由于被觀察者産生事件,是事件的起點,那麼開頭就是用Observable這個主體調用來建立被觀察者,産生事件,為了保證流式API調用規則,就直接讓Observable作為唯一的調用主體,一路調用下去。

流程圖如下:

RxJava使用介紹-概念
  • 建立被觀察者,産生事件
  • 設定事件傳遞過程中的過濾,合并,變換等加工操作。
  • 訂閱一個觀察者對象,實作事件最終的處理。