天天看點

這可能是最好的RxJava 2.x 入門教程(三)

這可能是最好的 RxJava 2.x 入門教程系列專欄

文章連結:

這可能是最好的 RxJava 2.x 入門教程(完結版) 【重磅推出】 這可能是最好的 RxJava 2.x 入門教程(一) 這可能是最好的 RxJava 2.x 入門教程(二) 這可能是最好的 RxJava 2.x 入門教程(三) 這可能是最好的 RxJava 2.x 入門教程(四) 這可能是最好的 RxJava 2.x 入門教程(五) GitHub 代碼同步更新: https://github.com/nanchen2251/RxJava2Examples 為了滿足大家的饑渴難耐,GitHub 将同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x 所有操作符應用場景介紹和實際應用場景,後期除了 RxJava 可能還會增添其他東西,總之,GitHub 上的 Demo 專為大家傾心打造。傳送門:

前言

年輕的老司機們,我這麼勤的為大家分享,卻少有催更的,好吧。其實寫這個系列不是為了吸睛,那咱們繼續寫我們的 RxJava 2.x 的操作符。

正題

distinct

這個操作符非常的簡單、通俗、易懂,就是簡單的去重嘛,我甚至都不想貼代碼,但人嘛,總得持之以恒。

Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });
           

輸出:

Log 日志顯而易見,我們在經過

dinstinct()

後接收器接收到的事件隻有1,2,3,4,5了。

Filter

信我,

Filter

你會很常用的,它的作用也很簡單,過濾器嘛。可以接受一個參數,讓其過濾掉不符合我們條件的值

Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("filter : " + integer + "\n");
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });
           

可以看到,我們過濾器舍去了小于 10 的值,是以最好的輸出隻有 20, 65, 19。

buffer

buffer

操作符接受兩個參數,

buffer(count,skip)

,作用是将

Observable

中的資料按

skip

(步長) 分成最大不超過 count 的

buffer

,然後生成一個

Observable

。也許你還不太了解,我們可以通過我們的示例圖和示例代碼來進一步深化它。

Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        mRxOperatorsText.append("buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        mRxOperatorsText.append("buffer value : ");
                        Log.e(TAG, "buffer value : " );
                        for (Integer i : integers) {
                            mRxOperatorsText.append(i + "");
                            Log.e(TAG, i + "");
                        }
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "\n");
                    }
                });
           

如圖,我們把 1, 2, 3, 4, 5 依次發射出來,經過

buffer

操作符,其中參數

skip

為 2,

count

為 3,而我們的輸出 依次是 123,345,5。顯而易見,我們

buffer

的第一個參數是

count

,代表最大取值,在事件足夠的時候,一般都是取

count

個值,然後每次跳過

skip

個事件。其實看 Log 日志,我相信大家都明白了。

timer

timer

很有意思,相當于一個定時任務。在 1.x 中它還可以執行間隔邏輯,但在 2.x 中此功能被交給了

interval

,下一個會介紹。但需要注意的是,

timer

interval

均預設在新線程。

mRxOperatorsText.append("timer start : " + TimeUtil.getNowStrTime() + "\n");
        Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n");
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // timer 預設在新線程,是以需要切換回主線程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        mRxOperatorsText.append("timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                        Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                    }
                });
           

顯而易見,當我們兩次點選按鈕觸發這個事件的時候,接收被延遲了 2 秒。

interval

如同我們上面可說,

interval

操作符用于間隔時間執行某個操作,其接受三個參數,分别是第一次發送延遲,間隔時間,時間機關。

mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
       Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
       Observable.interval(3,2, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread()) // 由于interval預設在新線程,是以我們應該切回主線程
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(@NonNull Long aLong) throws Exception {
                       mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                       Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                   }
               });
           

如同 Log 日志一樣,第一次延遲了 3 秒後接收到,後面每次間隔了 2 秒。

然而,心細的小夥伴可能會發現,由于我們這個是間隔執行,是以當我們的Activity 都銷毀的時候,實際上這個操作還依然在進行,是以,我們得花點小心思讓我們在不需要它的時候幹掉它。檢視源碼發現,我們subscribe(Cousumer<? super T> onNext)傳回的是Disposable,我們可以在這上面做文章。

@Override
   protected void doSomething() {
       mRxOperatorsText.append("interval start : " + TimeUtil.getNowStrTime() + "\n");
       Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
       mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread()) // 由于interval預設在新線程,是以我們應該切回主線程
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(@NonNull Long aLong) throws Exception {
                       mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                       Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                   }
               });
   }

   @Override
   protected void onDestroy() {
       super.onDestroy();
       if (mDisposable != null && !mDisposable.isDisposed()) {
           mDisposable.dispose();
       }
   }
           

哈哈,再次驗證,解決了我們的疑惑。

doOnNext

其實覺得

doOnNext

應該不算一個操作符,但考慮到其常用性,我們還是咬咬牙将它放在了這裡。它的作用是讓訂閱者在接收到資料之前幹點有意思的事情。假如我們在擷取到資料之前想先儲存一下它,無疑我們可以這樣實作。

Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("doOnNext 儲存 " + integer + "成功" + "\n");
                        Log.e(TAG, "doOnNext 儲存 " + integer + "成功" + "\n");
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("doOnNext :" + integer + "\n");
                Log.e(TAG, "doOnNext :" + integer + "\n");
            }
        });
           

skip

skip

很有意思,其實作用就和字面意思一樣,接受一個 long 型參數 count ,代表跳過 count 個數目開始接收。

Observable.just(1,2,3,4,5)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("skip : "+integer + "\n");
                        Log.e(TAG, "skip : "+integer + "\n");
                    }
                });
           

take

take

,接受一個 long 型參數 count ,代表至多接收 count 個資料。

Flowable.fromArray(1,2,3,4,5)
                .take(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("take : "+integer + "\n");
                        Log.e(TAG, "accept: take : "+integer + "\n" );
                    }
                });
           

just

just

,沒什麼好說的,其實在前面各種例子都說明了,就是一個簡單的發射器依次調用

onNext()

方法。

Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        mRxOperatorsText.append("accept : onNext : " + s + "\n");
                        Log.e(TAG,"accept : onNext : " + s + "\n" );
                    }
                });
           

寫在最後

好吧,本節先講到這裡,下節我們還是繼續講簡單的操作符,雖然我們的教程比較枯燥,現在也不那麼受人關注,但後面的系列我相信大家一定會非常喜歡的,我們下期再見!

代碼全部同步到GitHub:

做不完的開源,寫不完的矯情。歡迎掃描下方二維碼或者公衆号搜尋「nanchen」關注我的微信公衆号,目前多營運 Android ,盡自己所能為你提升。如果你喜歡,為我點贊分享吧~

nanchen

繼續閱讀