天天看點

盤點常用的Android開發庫(4 ) -- Rxjava2的基本使用

一、簡介

RxJava是Reactive Extensions(Rx)的Java VM實作:該庫用于通過使用可觀察的序列來組成異步和基于事件的程式。

它擴充了觀察者模式以支援資料/事件序列,并添加了運算符,使您可以聲明性地将序列組合在一起,同時消除了對低級線程,同步,線程安全和并發資料結構等問題的擔憂。

RxJava是輕量級的。它作為單個JAR實施,僅關注可觀察的抽象和相關的高階函數。RxJava支援Java 6或更高版本以及基于JVM的語言,例如Groovy,Clojure,JRuby,Kotlin和Scala。

RxJava有以下五個基類,我們可以在之基礎上發現RxJava裡的運算符。這五個基類分别是:

  1. Flowable:用于實作Reactive-Streams模式,并提供了工廠方法,中間運算符以及使用反應式資料流的能力。支援背壓(Backpressure)。
  2. Observable:Observable類是不支援背壓的,它是Reactive的一個抽象類,它提供了工廠方法,中間運算符以及消費同步和/或異步資料流的功能。
  3. Single:Single類為單個值響應實作Reactive Pattern。Single和Observable類似,所不同的是Single隻能發出一個值,要麼發射成功要麼發射失敗,也沒有“onComplete”作為完成時的回調。
  4. Comletable:Completable類表示延遲計算,沒有任何值,隻表示完成或異常。Completable的行為類似于Observable,在計算完成後隻能發出完成或錯誤信号,由onComplete或onError接口來處理,沒有onNext或onSuccess等回調接口。
  5. Maybe:Maybe類表示延遲計算和單個值的發射,這個值可能根本沒有或異常。

二、使用

基本使用

Rxjava迄今為止已經更新到3代版本了,不過常用的還是2代版本,也就是我們常說的Rxjava2,是以本文主要介紹Rxjava2的相關内功和原理分析。

首先第一步肯定是将Rxjava2導入到項目中去,通常是通過作為Gradle編譯依賴項導入的。

compile 'io.reactivex.rxjava2:rxjava:2.1.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
           

Rxjava的使用需要滿足三個基本要素,其他的都是通過運算符在三要素的基礎上添加功能,這三個基本要素分别是:

1.建立Observable(被觀察者)

Observable observable =  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                //TODO 通過調用onNext() onComlete()onError進行發送事件
            }
        });
           

2.建立Observer(觀察者)

Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                //TODO 準備工作
            }

            @Override
            public void onNext(@NonNull String s) {
                //TODO 接收onNext事件并處理
            }

            @Override
            public void onError(@NonNull Throwable e) {
                //TODO 接收onError事件并處理
            }

            @Override
            public void onComplete() {
                //TODO 接收onComplete事件并處理
            }
        };
           

3.subscribe(訂閱)

observable.subscribe(observer);
           

任何形式的Rxjava使用都可以拆分成以上三個部分,當然在實際的使用中為了友善可讀大多會被寫成鍊式調用。 

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            ......
            }
        }).subscribe(new Observer<Integer>() {
            ......

            @Override
            public void onSubscribe(@NonNull Disposable d) {
               ......
            }

            @Override
            public void onNext(@NonNull Integer integer) {
               ......
            }

            @Override
            public void onError(@NonNull Throwable e) {
               ......
            }

            @Override
            public void onComplete() {
               ......
            }
        });
           

線程排程

Scheduler

在RxJava的預設規則中,事件的發送和消費都是在同一個線程的,而Rxjava的目的就是為了更好地解決異步問題,是以Rxjava引入了一個概念

Scheduler來實作線程排程。

那麼如何去進行線程排程呢?簡單一句話就是:subscribeOn()指定發送事件的線程,而observeOn()指定接收事件的線程。

需要注意的是:

  • 多次指定上遊的線程隻有第一次指定的有效, 也就是說多次調用subscribeOn()隻有第一次的有效。 
  • 多次指定下遊的線程是可以的, 也就是說每調用一次observeOn() , 下遊的線程就會切換一次。 
observable.subscribeOn(Schedulers.newThread())     
         .subscribeOn(Schedulers.io())              
         .observeOn(AndroidSchedulers.mainThread()) 
         .observeOn(Schedulers.io())                
         .subscribe(consumer);
           

那麼Rxjava中的線程又有哪些呢?

  • Schedulers.io() :代表io操作的線程, 通常用于網絡,讀寫檔案等io密集型的操作 
  • Schedulers.computation() :代表CPU計算密集型的操作, 例如需要大量計算的操作 
  • Schedulers.newThread() :代表一個正常的新線程 
  • AndroidSchedulers.mainThread() :代表Android的主線程

三、操作符

Rxjava中的操作符有很多,下面就羅列出一些常用的,不需要記憶,隻需要了解運用。

Create

create操作符應該是最常見的操作符了,主要用于産生一個Obserable被觀察者對象。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
               ...
            }
        });
           

Map

map基本上算是Rxjava中最簡單的操作符之一了,它的作用是對發送的每一個事件應用一個函數,使得每一個事件都按照指定的函數去變化。通俗來講就是将一個Observable通過某種函數關系,轉換成另一個Observable。下面的例子就是将一個發送事件類型為Interger的Observable通過Function函數轉換成了發送事件類型為String的Observable。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                ......
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                ......
            }
        });
           

Zip 

zip顧名思義專用于壓縮合并事件,該合并不是連接配接,而是兩兩配對,也就意味着最終配對出的Observable發射事件數目隻和少的那個相同。

需要注意的是:

  • zip 組合事件的過程就是分别從發射器 A 和發射器 B 各取出一個事件來組合,并且一個事件隻能被使用一次,組合的順序是嚴格按照事件發送的順序來進行的。
  • 最終接收器收到的事件數量是和發送器發送事件最少的那個發送器的發送事件數目相同。
Observable.zip(Observable1, Observable2, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                //通過函數生成zip後的結果
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                ......
            }
        });
           

Concat

concat操作符作用是将兩個Observable連接配接成一個Observable。注意它是有順序的,必須等到第一個Observable中的所有事件發送完成後才會開始發送第二個Observable中的事件。如下圖發送的順序是1、2、3、4、5、6。

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                       ......
                    }
                });
           

Merge

merge的作用是把多個Observable結合起來,它和concat的差別在于,它不具有順序性,不用等到第一個Observable發送完所有事件才去發送其他的Observable中的事件。 如下圖發送的順序是未知的。

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        ......
                    }
                });
           

FlatMap

FlatMap可以把一個Observable通過某種方法轉換為多個Observables,然後再把這些分散的Observables裝進一個單一的Observable。需要注意的是FlatMap并不能保證事件的順序,如果需要保證,需要我們用到concatMap。

concatMap

concatMap與FlatMap的唯一差別是concatMap保證了順序。

distinct

distinct這個操作符非常簡單、通俗、易懂,就是簡單的去重。下面例子的事件隻會發送5次,1、2、3、4、5。

Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                       ......
                    }
                });
           

Filter

Filter,顧名思義,過濾器,可以接受一個參數,讓其過濾掉不符合我們條件的事件。

Observable.just(7, 13, 45, -51, 9, 10,-7)
          .filter(new Predicate<Integer>() {
               @Override
               public boolean test(@NonNull Integer integer) throws Exception {
                   return integer >= 10;
               }
           }).subscribe(new Consumer<Integer>() {
           @Override
           public void accept(@NonNull Integer integer) throws Exception {
               ......
           }
        });
           

timer

timer相當于一個定時任務,其接受兩個參數,定時時間和時間機關。預設在新線程中。

Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // timer 預設在新線程,是以需要切換回主線程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                       ......
                    }
                });
           

interval

interval操作符用于間隔時間執行某個操作,其接受三個參數,分别是第一次發送延遲,間隔時間,時間機關。預設在新線程中。

Observable.interval(3,2, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread()) // 由于interval預設在新線程,是以我們應該切回主線程
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(@NonNull Long aLong) throws Exception {
                        ......
                   }
               });
           

Just

Just就是一個簡單的Observable依次調用onNext()方法。

Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                       ......
                    }
                });
           

doOnNext

它的作用是讓訂閱者在接收到資料之前幹點有意思的事情。比如儲存個資料什麼的。

Observable.just(1, 2, 3, 4)
          .doOnNext(new Consumer<Integer>() {
              @Override
              public void accept(@NonNull Integer integer) throws Exception {
                  //TODO 儲存資料
              }
          }).subscribe(new Consumer<Integer>() {
          @Override
          public void accept(@NonNull Integer integer) throws Exception {
              ......
          }
      });
           

四、參考連結

RxJava2 五大重要角色介紹

這可能是最好的RxJava 2.x 教程

五、補充

什麼是觀察者模式?

定義對象間的一種一對多的依賴關系,一個對象(目标對象)的狀态發生改變,所有的依賴對象(觀察者對象)都将得到通知,進行廣播通知。

什麼是背壓?

背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上遊的被觀察者降低發送速度的政策。