天天看點

rxjava操作符彙總

轉摘自作者: maplejaw(https://blog.csdn.net/maplejaw_/article/details/52396175) 

本篇隻解析标準包中的操作符。對于擴充包,由于使用率較低,如有需求,請讀者自行查閱文檔。

本篇文章主要是介紹Rxjava的操作符,不對rxjava的原理與實作進行說明!

建立操作

以下操作符用于建立Observable。

  • create: 使用OnSubscribe從頭建立一個Observable,這種方法比較簡單。需要注意的是,使用該方法建立時,建議在OnSubscribe#call方法中檢查訂閱狀态,以便及時停止發射資料或者運算。
    Observable.create(new Observable.OnSubscribe<String>() {
    
            @Override
            public void call(Subscriber<? super String> subscriber) {
    
                subscriber.onNext("item1");
                subscriber.onNext("item2");
                subscriber.onCompleted();
            }
        });
               
  • from: 将一個Iterable, 一個Future, 或者一個數組,内部通過代理的方式轉換成一個Observable。Future轉換為

    OnSubscribe

    是通過

    OnSubscribeToObservableFuture

    進行的,Iterable轉換通過

    OnSubscribeFromIterable

    進行。數組通過

    OnSubscribeFromArray

    轉換。 
    rxjava操作符彙總
    //Iterable
        List<String> list=new ArrayList<>();
        ...
        Observable.from(list)
                .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
    
            }
        });
    
        //Future
         Future<String> futrue= Executors.newSingleThreadExecutor().submit(new Callable<String>() {
    
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return "maplejaw";
            }
        });
    
        Observable.from(futrue)
                  .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
    
            }
        });
    ;
               
  • just: 将一個或多個對象轉換成發射這個或這些對象的一個Observable。如果是單個對象,内部建立的是

    ScalarSynchronousObservable

    對象。如果是多個對象,則是調用了from方法建立。
  • empty: 建立一個什麼都不做直接通知完成的Observable
  • error: 建立一個什麼都不做直接通知錯誤的Observable
  • never: 建立一個什麼都不做的Observable
    Observable observable1=Observable.empty();//直接調用onCompleted。
        Observable observable2=Observable.error(new RuntimeException());//直接調用onError。這裡可以自定義異常
        Observable observable3=Observable.never();//啥都不做
               
  • timer: 建立一個在給定的延時之後發射資料項為0的

    Observable<Long>

    ,内部通過

    OnSubscribeTimerOnce

    工作
    Observable.timer(1000,TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("JG",aLong.toString()); // 0
                    }
                });
               
  • interval: 建立一個按照給定的時間間隔發射從0開始的整數序列的

    Observable<Long>

    ,内部通過

    OnSubscribeTimerPeriodically

    工作。
    Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                         //每隔1秒發送資料項,從0開始計數
                         //0,1,2,3....
                    }
                });
               
  • range: 建立一個發射指定範圍的整數序列的

    Observable<Integer>

    Observable.range(2,5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());// 2,3,4,5,6 從2開始發射5個資料
            }
        });
               
  • defer: 隻有當訂閱者訂閱才建立Observable,為每個訂閱建立一個新的Observable。内部通過

    OnSubscribeDefer

    在訂閱時調用Func0建立Observable。
    Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.just("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("JG",s);
            }
        });
               

合并操作

以下操作符用于組合多個Observable。

注意,為了使結構更加清晰以及縮小代碼量,之後的例子部分地方将會使用Lambda表達式書寫,如果你對Lambda表達式不太熟悉的話,可以閱讀JAVA8 Lambda表達式完全解析這篇文章。
  • concat: 按順序連接配接多個Observables。需要注意的是

    Observable.concat(a,b)

    等價于

    a.concatWith(b)

    Observable<Integer> observable1=Observable.just(1,2,3,4);
        Observable<Integer>  observable2=Observable.just(4,5,6);
    
        Observable.concat(observable1,observable2)
                .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
               
  • startWith: 在資料序列的開頭增加一項資料。

    startWith

    的内部也是調用了

    concat

    Observable.just(1,2,3,4,5)
                .startWith(6,7,8)
        .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
               
  • merge: 将多個Observable合并為一個。不同于concat,merge不是按照添加順序連接配接,而是按照時間線來連接配接。其中

    mergeDelayError

    将異常延遲到其它沒有錯誤的Observable發送完畢後才發射。而

    merge

    則是一遇到異常将停止發射資料,發送onError通知。 
    rxjava操作符彙總
  • zip: 使用一個函數組合多個Observable發射的資料集合,然後再發射這個結果。如果多個Observable發射的資料量不一樣,則以最少的Observable為标準進行壓合。内部通過

    OperatorZip

    進行壓合。
    Observable<Integer>  observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);
    
    
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
            @Override
            public String call(Integer item1, Integer item2) {
                return item1+"and"+item2;
            }
        })
        .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
               
  • combineLatest: 。當兩個Observables中的任何一個發射了一個資料時,通過一個指定的函數組合每個Observable發射的最新資料(一共兩個資料),然後發射這個函數的結果。類似于zip,但是,不同的是zip隻有在每個Observable都發射了資料才工作,而combineLatest任何一個發射了資料都可以工作,每次與另一個Observable最近的資料壓合。具體請看下面流程圖。 

    zip工作流程 

    rxjava操作符彙總
    combineLatest工作流程 
    rxjava操作符彙總

過濾操作

  • filter: 過濾資料。内部通過

    OnSubscribeFilter

    過濾資料。
    Observable.just(3,4,5,6)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>4;
                    }
                })
        .subscribe(item->Log.d("JG",item.toString())); //5,6 
               
  • ofType: 過濾指定類型的資料,與filter類似,
    Observable.just(1,2,"3")
                .ofType(Integer.class)
                .subscribe(item -> Log.d("JG",item.toString()));
               
  • take: 隻發射開始的N項資料或者一定時間内的資料。内部通過

    OperatorTake

    OperatorTakeTimed

    過濾資料。
    Observable.just(3,4,5,6)
                .take(3)//發射前三個資料項
                .take(100, TimeUnit.MILLISECONDS)//發射100ms内的資料
               
  • takeLast: 隻發射最後的N項資料或者一定時間内的資料。内部通過

    OperatorTakeLast

    OperatorTakeLastTimed

    過濾資料。takeLastBuffer和takeLast類似,不同點在于takeLastBuffer會收內建List後發射。
    Observable.just(3,4,5,6)
                .takeLast(3)
                .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
               
  • takeFirst:提取滿足條件的第一項。内部實作源碼如下:
    public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
          return filter(predicate).take(1); //先過濾,後提取
    }
               
  • first/firstOrDefault:隻發射第一項(或者滿足某個條件的第一項)資料,可以指定預設值。
    Observable.just(3,4,5,6)
                .first()
                .subscribe(integer -> Log.d("JG",integer.toString()));//3
    
        Observable.just(3,4,5,6)
                   .first(new Func1<Integer, Boolean>() {
                       @Override
                       public Boolean call(Integer integer) {
                           return integer>3;
                       }
                   }) .subscribe(integer -> Log.d("JG",integer.toString()));//4
               
  • last/lastOrDefault:隻發射最後一項(或者滿足某個條件的最後一項)資料,可以指定預設值。
  • skip:跳過開始的N項資料或者一定時間内的資料。内部通過

    OperatorSkip

    OperatorSkipTimed

    實作過濾。
    Observable.just(3,4,5,6)
                   .skip(1)
                .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
               
  • skipLast:跳過最後的N項資料或者一定時間内的資料。内部通過

    OperatorSkipLast

    OperatorSkipLastTimed

    實作過濾。
  • elementAt/elementAtOrDefault:發射某一項資料,如果超過了範圍可以的指定預設值。内部通過

    OperatorElementAt

    過濾。
Observable.just(3,4,5,6)
                 .elementAt(2)
        .subscribe(item->Log.d("JG",item.toString())); //5
           
  • ignoreElements:丢棄所有資料,隻發射錯誤或正常終止的通知。内部通過

    OperatorIgnoreElements

    實作。
  • distinct:過濾重複資料,内部通過

    OperatorDistinct

    實作。
    Observable.just(3,4,5,6,3,3,4,9)
           .distinct()
          .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9
               
  • distinctUntilChanged:過濾掉連續重複的資料。内部通過

    OperatorDistinctUntilChanged

    實作
    Observable.just(3,4,5,6,3,3,4,9)
           .distinctUntilChanged()
          .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
               
  • throttleFirst:定期發射Observable發射的第一項資料。内部通過

    OperatorThrottleFirst

    實作。
    Observable.create(subscriber -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
    
            subscriber.onNext(3);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(4);
            subscriber.onNext(5);
            subscriber.onCompleted();
    
        }).throttleFirst(999, TimeUnit.MILLISECONDS)
                .subscribe(item-> Log.d("JG",item.toString())); //結果為1,3,4
               
  • throttleWithTimeout/debounce:發射資料時,如果兩次資料的發射間隔小于指定時間,就會丢棄前一次的資料,直到指定時間内都沒有新資料發射時 

    才進行發射

    Observable.create(subscriber -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
    
            subscriber.onNext(3);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(4);
            subscriber.onNext(5);
            subscriber.onCompleted();
    
        }).debounce(999, TimeUnit.MILLISECONDS)//或者為throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
                .subscribe(item-> Log.d("JG",item.toString())); //結果為3,5
               
  • sample/throttleLast:定期發射Observable最近的資料。内部通過

    OperatorSampleWithTime

    實作。
    Observable.create(subscriber -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
    
            subscriber.onNext(3);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(4);
            subscriber.onNext(5);
            subscriber.onCompleted();
    
        }).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
                .subscribe(item-> Log.d("JG",item.toString())); //結果為2,3,5
               
  • timeout: 如果原始Observable過了指定的一段時長沒有發射任何資料,就發射一個異常或者使用備用的Observable。

條件/布爾操作

  • all: 判斷所有的資料項是否滿足某個條件,内部通過

    OperatorAll

    實作。
    Observable.just(2,3,4,5)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>3;
                    }
                })
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d("JG",aBoolean.toString()); //false
            }
        })
        ;
               
  • exists: 判斷是否存在資料項滿足某個條件。内部通過

    OperatorAny

    實作。
    Observable.just(2,3,4,5)
                .exists(integer -> integer>3)
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
               
  • contains: 判斷在發射的所有資料項中是否包含指定的資料,内部調用的其實是

    exists

    Observable.just(2,3,4,5)
                .contains(3)
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
               
  • sequenceEqual: 用于判斷兩個Observable發射的資料是否相同(資料,發射順序,終止狀态)。
    Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true
               
  • isEmpty: 用于判斷Observable發射完畢時,有沒有發射資料。有資料false,如果隻收到了onComplete通知則為true。
    Observable.just(3,4,5,6)
                   .isEmpty()
                  .subscribe(item -> Log.d("JG",item.toString()));//false
               
  • amb: 給定多個Observable,隻讓第一個發射資料的Observable發射全部資料,其他Observable将會被忽略。
    Observable<Integer> observable1=Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable<Integer> observable2=Observable.create(subscriber -> {
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onCompleted();
        });
    
        Observable.amb(observable1,observable2)
        .subscribe(integer -> Log.d("JG",integer.toString())); //3,4
               
  • switchIfEmpty: 如果原始Observable正常終止後仍然沒有發射任何資料,就使用備用的Observable。
    Observable.empty()
                .switchIfEmpty(Observable.just(2,3,4))
        .subscribe(o -> Log.d("JG",o.toString())); //2,3,4
               
  • defaultIfEmpty: 如果原始Observable正常終止後仍然沒有發射任何資料,就發射一個預設值,内部調用的switchIfEmpty。
  • takeUntil: 當發射的資料滿足某個條件後(包含該資料),或者第二個Observable發送完畢,終止第一個Observable發送資料。
    Observable.just(2,3,4,5)
                .takeUntil(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer==4;
                    }
                }).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4
               
  • takeWhile: 當發射的資料滿足某個條件時(不包含該資料),Observable終止發送資料。
    Observable.just(2,3,4,5)
                .takeWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer==4;
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString())); //2,3
               
  • skipUntil: 丢棄Observable發射的資料,直到第二個Observable發送資料。(丢棄條件資料)
  • skipWhile: 丢棄Observable發射的資料,直到一個指定的條件不成立(不丢棄條件資料)

聚合操作

  • reduce: 對序列使用reduce()函數并發射最終的結果,内部使用

    OnSubscribeReduce

    實作。
    Observable.just(2,3,4,5)
                .reduce(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer sum, Integer item) {
                        return sum+item;
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));//14
               
  • collect: 使用

    collect

    收集資料到一個可變的資料結構。
    Observable.just(3,4,5,6)
                   .collect(new Func0<List<Integer>>() { //建立資料結構
    
                       @Override
                       public List<Integer> call() {
                           return new ArrayList<Integer>();
                       }
                   }, new Action2<List<Integer>, Integer>() { //收集器
                       @Override
                       public void call(List<Integer> integers, Integer integer) {
                           integers.add(integer);
                       }
                   })
                  .subscribe(new Action1<List<Integer>>() {
                      @Override
                      public void call(List<Integer> integers) {
    
                      }
                  });
               
  • count/countLong: 計算發射的數量,内部調用的是

    reduce

    .

轉換操作

  • toList: 收集原始Observable發射的所有資料到一個清單,然後傳回這個清單.
    Observable.just(2,3,4,5)
                .toList()
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
    
                    }
                });
               
  • toSortedList: 收集原始Observable發射的所有資料到一個有序清單,然後傳回這個清單。
    Observable.just(6,2,3,4,5)
                .toSortedList(new Func2<Integer, Integer, Integer>() {//自定義排序
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer-integer2; //>0 升序 ,<0 降序
                    }
                })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
                    }
                });
               
  • toMap: 将序列資料轉換為一個Map。我們可以根據資料項生成key和生成value。
    Observable.just(6,2,3,4,5)
                .toMap(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer; //根據資料項生成map的key
                    }
                }, new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "value:"+integer; //根據資料項生成map的kvalue
                    }
                }).subscribe(new Action1<Map<String, String>>() {
            @Override
            public void call(Map<String, String> stringStringMap) {
                Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
            }
        });
    
               
  • toMultiMap: 類似于toMap,不同的地方在于map的value是一個集合。

變換操作

  • map: 對Observable發射的每一項資料都應用一個函數來變換。
    Observable.just(6,2,3,4,5)
                .map(integer -> "item:"+integer)
                .subscribe(s -> Log.d("JG",s));//item:6,item:2....
               
  • cast: 在發射之前強制将Observable發射的所有資料轉換為指定類型
  • flatMap: 将Observable發射的資料變換為Observables集合,然後将這些Observable發射的資料平坦化的放進一個單獨的Observable,内部采用merge合并。
    Observable.just(2,3,5)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(Integer integer) {
                        return Observable.create(subscriber -> {
                            subscriber.onNext(integer*10+"");
                            subscriber.onNext(integer*100+"");
                            subscriber.onCompleted();
                        });
                    }
                })
        .subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500
               
  • flatMapIterable: 和flatMap的作用一樣,隻不過生成的是Iterable而不是Observable。
    Observable.just(2,3,5)
                .flatMapIterable(new Func1<Integer, Iterable<String>>() {
                    @Override
                    public Iterable<String> call(Integer integer) {
                        return Arrays.asList(integer*10+"",integer*100+"");
                    }
                }).subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
    
                  }
        });
               
  • concatMap: 類似于flatMap,由于内部使用concat合并,是以是按照順序連接配接發射。
  • switchMap: 和flatMap很像,将Observable發射的資料變換為Observables集合,當原始Observable發射一個新的資料(Observable)時,它将取消訂閱前一個Observable。
    Observable.create(new Observable.OnSubscribe<Integer>() {
    
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i=1;i<4;i++){
                    subscriber.onNext(i);
                    Utils.sleep(500,subscriber);//線程休眠500ms
                }
    
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread())
          .switchMap(new Func1<Integer, Observable<Integer>>() {
                 @Override
               public Observable<Integer> call(Integer integer) {
                       //每當接收到新的資料,之前的Observable将會被取消訂閱
                        return Observable.create(new Observable.OnSubscribe<Integer>() {
                            @Override
                            public void call(Subscriber<? super Integer> subscriber) {
                                subscriber.onNext(integer*10);
                                Utils.sleep(500,subscriber);
                                subscriber.onNext(integer*100);
                                subscriber.onCompleted();
                            }
                        }).subscribeOn(Schedulers.newThread());
                    }
                })
                .subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300
    
    
    
               
  • scan: 與reduce很像,對Observable發射的每一項資料應用一個函數,然後按順序依次發射每一個值。
    Observable.just(2,3,5)
                .scan(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer sum, Integer item) {
                        return sum+item;
                    }
                })
        .subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10
               
  • groupBy: 将Observable分拆為Observable集合,将原始Observable發射的資料按Key分組,每一個Observable發射一組不同的資料。
    Observable.just(2,3,5,6)
                .groupBy(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {//分組
                        return integer%2==0?"偶數":"奇數";
                    }
                })
        .subscribe(new Action1<GroupedObservable<String, Integer>>() {
            @Override
            public void call(GroupedObservable<String, Integer> o) {
    
                o.subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("JG",o.getKey()+":"+integer.toString()); //偶數:2,奇數:3,...
                    }
                });
            }
        })
               
  • buffer: 它定期從Observable收集資料到一個集合,然後把這些資料集合打包發射,而不是一次發射一個
    Observable.just(2,3,5,6)
                .buffer(3)
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
    
                    }
                })
               
  • window: 定期将來自Observable的資料分拆成一些Observable視窗,然後發射這些視窗,而不是每次發射一項。

錯誤處理/重試機制

  • onErrorResumeNext: 當原始Observable在遇到錯誤時,使用備用Observable。。
    Observable.just(1,"2",3)
        .cast(Integer.class)
        .onErrorResumeNext(Observable.just(1,2,3))
        .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
        ;
               
  • onExceptionResumeNext: 當原始Observable在遇到異常時,使用備用的Observable。與

    onErrorResumeNext

    類似,差別在于

    onErrorResumeNext

    可以處理所有的錯誤,onExceptionResumeNext隻能處理異常。
  • onErrorReturn: 當原始Observable在遇到錯誤時發射一個特定的資料。
    Observable.just(1,"2",3)
                .cast(Integer.class)
                .onErrorReturn(new Func1<Throwable, Integer>() {
                    @Override
                    public Integer call(Throwable throwable) {
                        return 4;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());1,4
            }
        });
               
  • retry: 當原始Observable在遇到錯誤時進行重試。
    Observable.just(1,"2",3)
        .cast(Integer.class)
        .retry(3)
        .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
        ;//1,1,1,1,onError
               

連接配接操作

ConnectableObservable

與普通的Observable差不多,但是可連接配接的Observable在被訂閱時并不開始發射資料,隻有在它的connect()被調用時才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個Observable之後才開始發射資料。 

ConnectableObservable.connect()

訓示一個可連接配接的Observable開始發射資料. 

Observable.publish()

将一個Observable轉換為一個可連接配接的Observable 

Observable.replay()

確定所有的訂閱者看到相同的資料序列的

ConnectableObservable

,即使它們在Observable開始發射資料之後才訂閱。 

ConnectableObservable.refCount()

讓一個可連接配接的Observable表現得像一個普通的Observable。

ConnectableObservable<Integer> co= Observable.just(1,2,3)
                .publish();

        co .subscribe(integer -> Log.d("JG",integer.toString()) );
        co.connect();//此時開始發射資料
           

阻塞操作

BlockingObservable

是一個阻塞的Observable。普通的Observable 轉換為 BlockingObservable,可以使用 

Observable.toBlocking( )

方法或者

BlockingObservable.from( )

方法。内部通過

CountDownLatch

實作了阻塞操作。

以下的操作符可以用于BlockingObservable,如果是普通的Observable,務必使用Observable.toBlocking()轉為阻塞Observable後使用,否則達不到預期的效果。

  • forEach: 對BlockingObservable發射的每一項資料調用一個方法,會阻塞直到Observable完成。
  • first/firstOrDefault/last/lastOrDefault:這幾個操作符之前有介紹過。也可以用于阻塞操作。
  • single/singleOrDefault:如果Observable終止時隻發射了一個值,傳回那個值,否則抛出異常或者發射預設值。
  • mostRecent:傳回一個總是傳回Observable最近發射的資料的Iterable。
  • next: 傳回一個Iterable,會阻塞直到Observable發射了第二個值,然後傳回那個值。
  • latest: 傳回一個iterable,會阻塞直到或者除非Observable發射了一個iterable沒有傳回的值,然後傳回這個值
  • toFuture: 将Observable轉換為一個Future
  • toIterable:将一個發射資料序列的Observable轉換為一個Iterable。
  • getIterator:将一個發射資料序列的Observable轉換為一個Iterator

工具集

  • materialize: 将Observable轉換成一個通知清單。
  • dematerialize: 與上面的作用相反,将通知逆轉回一個Observable。
  • timestamp: 給Observable發射的每個資料項添加一個時間戳。
    Observable.just(1,2,3)
               .timestamp()
               .subscribe(new Action1<Timestamped<Integer>>() {
                   @Override
                   public void call(Timestamped<Integer> timestamped) {
                       Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
                       //1472627510548 1
                       //1472627510549 2
                       //1472627510549 3
                   }
               });
               
  • timeInterval:給Observable發射的兩個資料項間添加一個時間差,實作在

    OperatorTimeInterval

    中 
    rxjava操作符彙總
  • serialize: 強制Observable按次序發射資料并且要求功能是完好的
  • cache: 緩存Observable發射的資料序列并發射相同的資料序列給後續的訂閱者
  • observeOn: 指定觀察者觀察Observable的排程器
  • subscribeOn: 指定Observable執行任務的排程器
  • doOnEach: 注冊一個動作,對Observable發射的每個資料項使用
    Observable.just(2,3)
                .doOnEach(new Action1<Notification<? super Integer>>() {
                    @Override
                    public void call(Notification<? super Integer> notification) {
                        Log.d("JG","--doOnEach--"+notification.toString());
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));
    //結果為:            
     // --doOnEach--[[email protected] OnNext 2]
    // 2
     // --doOnEach--[[email protected] OnNext 3]
    // 3
    // --doOnEach--[[email protected] OnCompleted]
               
  • doOnCompleted: 注冊一個動作,對正常完成的Observable使用
  • doOnError: 注冊一個動作,對發生錯誤的Observable使用
  • doOnTerminate:注冊一個動作,對完成的Observable使用,無論是否發生錯誤
    Observable.just(2,3)
                .doOnTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.d("JG","--doOnTerminate--");
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));
    // 2 , 3 , --doOnTerminate--
               
  • doOnSubscribe: 注冊一個動作,在觀察者訂閱時使用。内部由

    OperatorDoOnSubscribe

    實作,
    rxjava操作符彙總
  • doOnUnsubscribe: 注冊一個動作,在觀察者取消訂閱時使用。内部由

    OperatorDoOnUnsubscribe

    實作,在

    call

    中加入一個解綁動作。 
    rxjava操作符彙總
  • finallyDo/doAfterTerminate: 注冊一個動作,在Observable完成時使用
    Observable.just(2,3)
                .doAfterTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.d("JG","--doAfterTerminate--");
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));
    //2,3,  --doAfterTerminate-- 
               
  • delay: 延時發射Observable的結果。即讓原始Observable在發射每項資料之前都暫停一段指定的時間段。效果是Observable發射的資料項在時間上向前整體平移了一個增量(除了onError,它會即時通知)。
  • delaySubscription: 延時處理訂閱請求。實作在

    OnSubscribeDelaySubscription

    中 
    rxjava操作符彙總
  • using: 建立一個隻在Observable生命周期存在的資源,當Observable終止時這個資源會被自動釋放。
    Observable.using(new Func0<File>() {//資源工廠
            @Override
            public File call() {
    
                File file = new File(getCacheDir(), "a.txt");
                if(!file.exists()){
                    try {
                        Log.d("JG","--create--");
                        file.createNewFile();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                return file;
            }
        }, new Func1<File, Observable<String>>() { //Observable
            @Override
            public Observable<String> call(File file) {
                return Observable.just(file.exists() ? "exist" : "no exist");
            }
        }, new Action1<File>() {//釋放資源動作
            @Override
            public void call(File file) {
                if(file!=null&&file.exists()){
                    Log.d("JG","--delete--");
                    file.delete();
                }
            }
        })
        .subscribe(s -> Log.d("JG",s))
        ;
     //--create--
     //exist
     //--delete--
               
  • single/singleOrDefault: 強制傳回單個資料,否則抛出異常或預設資料。

最後

關于RxJava标準庫的操作符已經介紹完畢,純粹當個備忘錄。如有錯誤之處,歡迎指出。