天天看點

RxJava2.x學習總結

公司項目中網絡架構使用了目前主流的Retrofit+Okhttp+RxJava架構進行開發,三者聯合使用極大簡化了網絡請求及請求結果的處理。對于RxJava其繁多的操作符讓人眼花缭亂,但是隻有掌握了這些知識,項目中才能運用自如。鑒于此,本篇将系統的學習總結RxJava2.x中的知識體系。

Create

RxJava2.x學習總結

create操作符作用為建立一個Observable對象(上遊):

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();

                        e.onComplete();
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer i) {
                        Log.d(TAG, "onNext receive: " + i);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete receive!");
                    }
                });
           

注:

  • 建立

    Observable

    的同時,需要傳入

    ObservableOnSubscribe

    對象,裡面有個

    subscribe()

    回調函數,用于向下遊發射事件
  • 上遊調用

    onComplete()

    之後下遊将停止接受新的事件,但是并不阻止上遊繼續調用

    onNext()

    發送事件
  • subscribe()

    是連結上遊與下遊的橋梁,傳入的

    Observer

    對象即下遊接受事件的對象
  • onCompete()

    onError()

    兩者互斥,不可以同時觸發

Map

RxJava2.x學習總結

将上遊發送的事件變換成新的Observable對象:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();

                        e.onComplete();
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "the " + integer;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String str) {
                        Log.d(TAG, "onNext receive: " + str);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete receive!");
                    }
                });
           

Zip

RxJava2.x學習總結

zip操作符的作用主要是将多個Observable對象發射的事件進行合并,而且每個Observable發射的事件都是按照先後順序進行組合的,是以變換後發射的事件個數由上述Observable中發射的事件個數最少的Observable決定:

io.reactivex.Observable
                .zip(io.reactivex.Observable
                                .create(new ObservableOnSubscribe<String>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext("A");
                                            Log.d(TAG, "String emit : A\n");

                                            e.onNext("B");
                                            Log.d(TAG, "String emit : B\n");

                                            e.onNext("C");
                                            Log.d(TAG, "String emit : C\n");
                                        }
                                    }
                                }),
                        io.reactivex.Observable
                                .create(new ObservableOnSubscribe<Integer>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 1\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 2\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 3\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 4\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 5\n");
                                        }
                                    }
                                }), new BiFunction<String, Integer, String>() {
                            @Override
                            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                                return s + integer;
                            }
                        })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "Observer accept: " + s);
                    }
                });
           

Concat

RxJava2.x學習總結

concat操作符将多個Observable連接配接成一個Observable,并按照順序發射事件:

io.reactivex.Observable
                .concat(io.reactivex.Observable.just(, , ), io.reactivex.Observable.just(, , ))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "conact: " + integer);
                    }
                });
           

FlatMap

RxJava2.x學習總結

flatmap的作用是将一個Observable轉換成多個Observables,然後将多個Observables再裝進單獨的Observable中。該操作符不能保證事件的有序性。

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                        Log.d(TAG, "after flatMap: " + integer);
                        return io.reactivex.Observable
                                .just("converted " + integer)
                                .delay(, TimeUnit.SECONDS);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
           

下遊接收的事件無序,log如下:

- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
           

ConcatMap

concatMap功能與flatMap相同,唯一差別是concatMap保證了發射事件的順序。

Distinct

RxJava2.x學習總結

該操作符功能如示意圖,去重:

io.reactivex.Observable
                .just(, , , , , , , , )
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
           

Filter

RxJava2.x學習總結

filter顧名思義,是個過濾器,過濾掉回調函數

test()

傳回

false

的值:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        for (int i = ; i < ; i++) {
                            e.onNext(i);
                        }
                    }
                })
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer s) throws Exception {
                        return s %  == ;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
           

Log:

- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
           

Buffer

RxJava2.x學習總結

buffer操作符有兩個參數

buffer(count, skip)

,其中

count

為需要被發射的事件的最大個數,

skip

作用是确定每個每個buffer起始位置的索引值,當count與skip的值相等時,效果同

buffer(count)

io.reactivex.Observable
                .just(, , , , , , , )
                .buffer(, )
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.d(TAG, "accept: " + integers);
                    }
                });
           

Log如下:

- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, ]
           

Timer

RxJava2.x學習總結

timer操作符實作計時器功能,預設執行在工作線程:

Log.d(TAG, "start time: " + System.currentTimeMillis()
                + "\ncurr thread: " + Thread.currentThread().getName());

        io.reactivex.Observable
                .timer(, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "end time: " + System.currentTimeMillis()
                                + "\n end thread: " + Thread.currentThread().getName());
                    }
                });
           

Log:

- :: -/com.esimtek.fortest D/MainActivity: start time: 
                                                                 curr thread: main
- :: -/com.esimtek.fortest D/MainActivity: end time: 
                                                                  end thread: RxComputationThreadPool-
           

Interval

RxJava2.x學習總結

interval操作符用于間隔指定時間執行某個操作,其接受三個參數:initalDelay(首次發射延時), period(發射間隔時間), timeUnit(時間機關):

mDisposable = io.reactivex.Observable
                .interval(, , TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                    }
                });
           

這裡有個問題,就是當

accept()

方法中如果存在UI操作,由于持有

Context

引用可能會造成

Activity

記憶體洩露。是以需要在

Activity

OnDestory()

方法中執行

mDisposable.dispose()

:

private Disposable mDisposable;

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed())
            mDisposable.dispose();
    }
           

Skip

RxJava2.x學習總結

如示意圖,skip接受一個參數

count

,這個參數指定了跳過

count

個數目開始接收。

Take

RxJava2.x學習總結

take操作符接受一個參數count,代表至多接受count個參數。

Just

RxJava2.x學習總結

just操作符代表一次發射資料,并執行

onNext

方法。