天天看點

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

Markdown版本筆記 我的GitHub首頁 我的部落格 我的微信 我的郵箱
MyAndroidBlogs baiqiantao baiqiantao bqt20094 [email protected]

RxJava【過濾】操作符 filter distinct throttle take skip first MD

demo位址

參考

目錄

目錄

常用的過濾操作符

filter ofType

distinct distinctUntilChanged

ignoreElements

throttle sample

throttleFirst

throttleLast

sample

throttleLatest

debounce throttleWithTimeout

timeout

timeout(long,TimeUnit)

timeout(long,TimeUnit,Observable)

timeout(Function)

take takelast takeUntil takeWhile

take

takeLast

takeUntil takeWhile

skip skipLast skipUntil skipWhile

擷取指定位置的元素

first*

last*

elementAt elementAtOrError

single*

常用的過濾操作符

根據指定【條件】過濾事件

  • filter:過濾掉沒有通過謂詞測試的資料項,隻發射通過測試的
  • ofType:僅發出指定類型的項目
  • distinct:過濾掉重複資料項
  • distinctUntilChanged:過濾掉連續重複的資料
  • ignoreElements:丢棄所有的正常資料,隻發射 onError 或 onCompleted 通知
  • single、singleElement、singleOrError:如果隻發射一個資料則發射這個資料,否則發射預設資料或異常

根據指定【時間】過濾事件

  • throttleFirst:定期發射Observable發射的第一項資料
  • throttleLast、sample:定期發射Observable最近的資料,等于是資料抽樣
  • throttleLatest:定期發射Observable最近的資料,類似于 throttleFirst + throttleLast
  • debounce、throttleWithTimeout:去抖動,隻有當Observable在指定的時間後還沒有發射資料時,才發射一個資料
  • timeout:如果在一個指定的時間段後還沒發射資料,就發射一個異常

根據【位置】保留或跳過事件

  • take、takelast:隻保留前面(後面)的N項資料
  • takeUntil、takeWhile:當滿足指定的條件時開始發射或停止發射
  • skip、skipLast:跳過前面(後面)的N項資料
  • skipUntil、skipWhile:當滿足指定的條件時開始發射或停止發射

擷取【指定位置】的事件

  • first、firstElement、firstOrError、last、lastElement、lastOrError:隻發射第一項(最後一項)資料
  • elementAt、elementAtOrError:隻發發射第N項資料

filter ofType

filter:過濾掉沒有通過謂詞測試的資料項,隻發射通過測試的資料項

Observable.range(1, 10)
    .filter(i -> i % 3 == 0)
    .subscribe(i -> log("" + i));//3,6,9 
           

ofType:僅發出指定類型的項目

Observable.just(1, true, "包青天", 2, "哈哈") 
    .ofType(String.class) 
    .subscribe(i -> log("" + i));//包青天,哈哈 
           

distinct distinctUntilChanged

過濾掉重複資料項

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

distinct

:過濾掉重複資料項(隻允許還沒有發射過的資料項通過)

Observable.just(1, 2, 2, 3, 4, 3, 5) 
    .distinct() 
    .subscribe(i -> log("" + i)); //1, 2, 3, 4, 5 
           

可以指定一個函數,這個函數根據原始Observable發射的資料項産生一個Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的。

Observable.just("a", "bcd", "e", "fg", "h") 
      .distinct(String::length) //通過比較函數的傳回值而不是資料本身來判定兩個資料是否是不同的 
      .subscribe(i -> log("" + i)); //a,bcd,fg 
           

distinctUntilChanged

:過濾掉連續重複的資料,同樣可以指定一個判斷是否是相同資料的函數

Observable.just(1, 2, 2, 3, 4, 3, 5) 
    .distinctUntilChanged() 
    .subscribe(i -> log("" + i)); //1, 2, 3, 4, 3, 5 
           

ignoreElements

丢棄所有的正常資料,隻發射 onError 或 onCompleted 通知

Observable.create(emitter -> { 
   emitter.onNext(1); 
   emitter.onError(new Throwable("異常")); 
}).ignoreElements() 
      .subscribe(() -> log("onComplete"), e -> log(e.getMessage())); // 異常 
           

throttle sample

throttle [ˈθrɑ:tl] n.節流閥; 喉嚨,氣管; [機] 風門; vt.扼殺,壓制;勒死,使窒息;使節流;調節; vi.節流,減速; 窒息;

在某段時間内,隻發送該段時間内第1次事件 / 最後1次事件

注意:如果自上次采樣以來,原始Observable沒有發射任何資料,則傳回的Observable在那段時間内也不會發射任何資料。

throttleFirst

在每個采樣周期内,throttleFirst 總是隻發射原始Observable的第一項資料。周期内可能沒有發射任何資料。

RxView.clicks(v) 
      .map(o -> format.format(new Date())) 
      .doOnEach(notification -> log("點選事件的時間為:" + notification.getValue())) 
      .throttleFirst(1, TimeUnit.SECONDS) 
      .subscribe(s -> log("防止按鈕重複點選,值為:" + s)); 
           
點選事件的時間為:22:31:08 304 
防止按鈕重複點選,值為:22:31:08 304 
點選事件的時間為:22:31:08 516 
點選事件的時間為:22:31:08 764 
點選事件的時間為:22:31:09 247 
點選事件的時間為:22:31:09 590 
防止按鈕重複點選,值為:22:31:09 590 
           

throttleLast

定期發射Observable最近的資料,等于是定時對資料進行抽樣。

sample 和 throttleLast 操作符定時檢視一個Observable,然後發射自上次采樣以來它最近發射的資料。

RxView.clicks(v) 
      .map(o -> format.format(new Date())) 
      .doOnEach(notification -> log("點選事件的時間為:" + notification.getValue())) 
      .throttleLast(1, TimeUnit.SECONDS) 
      .subscribe(s -> log("定時對資料進行抽樣,值為:" + s)); 
           
點選事件的時間為:22:33:03 809 
點選事件的時間為:22:33:04 371 
點選事件的時間為:22:33:04 552 
定時對資料進行抽樣,值為:22:33:04 552 
點選事件的時間為:22:33:04 943 
點選事件的時間為:22:33:05 463 
定時對資料進行抽樣,值為:22:33:05 463 
           

sample

相同參數的 sample 的效果和 throttleLast 完全一緻:

public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit) { 
    return sample(intervalDuration, unit); 
} 
           

不過 sample 有一個帶 boolean 類型參數的變體

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

參數 emitLast:

  • 如果為true且上遊完成[upstream completes]且仍有未采樣的項目可用,則該項目在完成之前發送到下遊。
  • 如果為false,則忽略未采樣的最後一項。

throttleLatest

感覺就是 throttleFirst + throttleLast 的組合效果,即:第一個資料項立即發射出去,後續則隻發射周期内最後一個資料項。

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

和 sample 一樣,其也有一個帶 boolean 類型參數的變體

RxView.clicks(v) 
      .map(o -> format.format(new Date())) 
      .doOnEach(notification -> log("點選事件的時間為:" + notification.getValue())) 
      .throttleLatest(1, TimeUnit.SECONDS) 
      .subscribe(s -> log("擷取最接近采樣點的值,值為:" + s)); 
           
點選事件的時間為:22:37:36 202 
擷取最接近采樣點的值,值為:22:37:36 202 
點選事件的時間為:22:37:36 526 
點選事件的時間為:22:37:36 853 
點選事件的時間為:22:37:37 204 
擷取最接近采樣點的值,值為:22:37:37 204 
點選事件的時間為:22:37:37 540 
點選事件的時間為:22:37:38 045 
擷取最接近采樣點的值,值為:22:37:38 045 
           

debounce throttleWithTimeout

僅在過了一段指定的時間還沒發射資料時才發射一個資料,Debounce操作符會過濾掉發射速率過快的資料項。

去抖動,隻有當Observable在指定的時間後還沒有發射資料時,才發射一個資料

注意:這個操作符會會接着最後一項資料發射原始Observable的onCompleted通知,即使這個通知發生在你指定的時間視窗内(從最後一項資料的發射算起)。也就是說,onCompleted通知不會觸發限流。

debounce操作符的一個變體通過對原始Observable的每一項應用一個函數進行限流,這個函數傳回一個Observable。如果原始Observable在這個新生成的Observable終止之前發射了另一個資料,debounce會抑制(suppress)這個資料項。

RxTextView.textChanges(et) 
      .doOnEach(notification -> log("值為:" + notification.getValue())) 
      .debounce(2000, TimeUnit.MILLISECONDS) //防抖動,去除發送頻率過快的項 
      .subscribe(s -> log("2秒鐘内沒有發生變化才發送,值為:" + s), e -> log("異常"), () -> log("完成")); 
           
18:57:20.799: 值為:1 
18:57:21.539: 值為:12 
18:57:22.413: 值為:123 
18:57:23.245: 值為:1234 
18:57:25.179: 值為:12345 
18:57:26.916: 值為:123456 
18:57:28.918: 2秒鐘内沒有發生變化才發送,值為:123456 
18:57:30.205: 值為:1234567 
18:57:32.206: 2秒鐘内沒有發生變化才發送,值為:1234567 
           

timeout

如果原始Observable過了指定的一段時長沒有發射任何資料,Timeout操作符會以一個onError通知終止這個Observable。

timeout(Function itemTimeoutIndicator) 
timeout(Function itemTimeoutIndicator, ObservableSource other) 
timeout(long timeout, TimeUnit timeUnit) 
timeout(long timeout, TimeUnit timeUnit, ObservableSource other) 
timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) 
timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource other) 
timeout(ObservableSource firstTimeoutIndicator, Function itemTimeoutIndicator) 
timeout(ObservableSource firstTimeoutIndicator, Function itemTimeoutIndicator, ObservableSource other) 
           

以下案例均用到了如下代碼:

Observable<Integer> observable = Observable.create(emitter -> { 
   emitter.onNext(1); 
   SystemClock.sleep(150L + 100L * new Random().nextInt(2)); 
   emitter.onNext(2); 
   if (new Random().nextBoolean()) emitter.onComplete(); 
}); 
           

timeout(long,TimeUnit)

每當原始Observable發射了一項資料,timeout就啟動一個計時器,如果計時器超過了指定指定的時長而原始Observable沒有發射另一項資料,timeout就抛出TimeoutException,以一個錯誤通知終止Observable。

observable.timeout(200, TimeUnit.MILLISECONDS) 
      .subscribe(i -> log("" + i), e -> log("逾時了:" + e.getClass().getSimpleName()), () -> log("完成")); 
           

可能的結果有:

【1,2,完成】【1,2,逾時了:TimeoutException】【1,逾時了:TimeoutException】

timeout(long,TimeUnit,Observable)

可以在逾時時切換到一個指定的Observable,而不是發錯誤通知。

timeout(long timeout, TimeUnit timeUnit, ObservableSource other) 
           

案例:

observable.timeout(200, TimeUnit.MILLISECONDS, observer -> observer.onNext(3)) 
      .subscribe(i -> log("" + i), e -> log("逾時了:" + e.getClass().getSimpleName()), () -> log("完成")); 
           

可能的結果有:

【1,2,完成】【1,2,3】【1,3】

timeout(Function)

可以使用一個函數針對原始Observable的每一項傳回一個新的Observable,如果當這個新的Observable終止時原始Observable還沒有發射另一項資料,就會認為是逾時了,timeout就抛出TimeoutException,以一個錯誤通知終止Observable。

還可以同時指定逾時時切換到的Observable,還可以單獨給第一項設定一個逾時時切換到的Observable。

timeout(Function itemTimeoutIndicator) 
timeout(Function itemTimeoutIndicator, ObservableSource other) 
timeout(ObservableSource firstTimeoutIndicator, Function itemTimeoutIndicator) 
timeout(ObservableSource firstTimeoutIndicator, Function itemTimeoutIndicator, ObservableSource other) 
           

案例:

observable.timeout(i -> Observable.just("這裡并不會發射資料給訂閱者" + i).delay(200, TimeUnit.MILLISECONDS)) 
      .subscribe(i -> log("" + i), e -> log("逾時了:" + e.getClass().getSimpleName()), () -> log("完成")); 
           

可能的結果有:

【1,2,完成】【1,2,逾時了:TimeoutException】【1,逾時了:TimeoutException】

take takelast takeUntil takeWhile

隻發射前面或後面的N項資料,或當滿足指定的條件時開始發射或停止發射

take

隻發射前面的N項資料

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

使用Take操作符讓你可以修改Observable的行為,隻傳回前面的N項資料,然後發射完成通知,忽略剩餘的資料。

如果 Observable 發射的資料少于 N 項,那麼take操作生成的Observable不會抛異常或發射onError通知,在完成前它隻會發射相同的少量資料。

還可以指定一個時長,它會隻發射在原始Observable的最開始一段時間内發射的資料。

Observable.range(1, 5) 
      .take(2) 
      .subscribe(i -> log("" + i), e -> log("異常"), () -> log("完成")); //1, 2,完成 
Observable.range(11, 1) 
      .take(2) 
      .subscribe(i -> log("" + i), e -> log("異常"), () -> log("完成")); //11,完成 
Observable.create(emitter -> { 
   emitter.onNext(21); 
   emitter.onNext(22); 
   SystemClock.sleep(150L + 100L * new Random().nextInt(2)); 
   emitter.onNext(23); 
   emitter.onComplete(); 
}).take(200, TimeUnit.MILLISECONDS) 
      .subscribe(i -> log("" + i), e -> log("異常"), () -> log("完成")); //21, 22,完成 或 21, 22,23,完成 
           

takeLast

發射Observable發射的最後N項資料

使用TakeLast操作符修改原始Observable,你可以隻發射Observable發射的後N項資料,忽略前面的資料。

注意:takeLast 操作符會延遲原始 Observable 發射的任何資料項,直到它全部完成。

上述案例在使用 takeLast 操作符時的列印結果為【4,5,完成】【11,完成】【23,完成 或 21, 22,23,完成】

takeLast 還可以添加一些額外的參數,如可以同時指定發射的元素的最大個數以及時長等:

takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) 
           
  • long count:the maximum number of items to emit
  • boolean delayError:如果為true,則由目前Observable發出的 exception 通知會被延遲,直到正常元素[regular elements]被下遊[downstream]消耗為止;如果為false,則立即發射異常通知,并且所有正常元素會被丢棄[dropped]。不指定時都是用的 false 這種情況。
  • int bufferSize:the hint about how many elements to expect to be last。不指定時,預設值為【Math.max(1, Integer.getInteger("rx2.buffer-size", 128))】

takeUntil takeWhile

takeUntil 有兩個重載方法:

  • 傳回一個Observable,它發出源Observable發出的項,其會檢查每個項的指定謂詞[predicate],如果條件滿足則完成。
  • 傳回一個Observable,它發出源Observable發出的項,直到指定的Observable發出一個項為止。
takeUntil(ObservableSource other) //Returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item. 
takeUntil(Predicate stopPredicate) //Returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. 
           

案例:

Observable.range(10, 5) 
      .flatMap(Observable::just) 
      .takeUntil(l -> l >= 12) //當滿足條件時原始的Observable會停止發射 
      .subscribe(i -> log("" + i), e -> log("異常1"), () -> log("完成1")); //10, 11, 12, 完成 
Observable.interval(200, TimeUnit.MILLISECONDS) 
      .takeUntil(Observable.just("開始發射資料時原始的Observable會停止發射").delay(500, TimeUnit.MILLISECONDS)) 
      .subscribe(i -> log("" + i), e -> log("異常2"), () -> log("完成2")); //0, 1,完成 
           

takeWhile 僅有一個帶 Predicate 參數的方法,其是當不滿足條件時原始的Observable停止發射,例如同樣的案例:

Observable.range(20, 5) 
      .flatMap(Observable::just) 
      .takeWhile(l -> l <= 22) //當不滿足條件時原始的Observable會停止發射 
      .subscribe(i -> log("" + i), e -> log("異常3"), () -> log("完成3")); //20, 21, 22, 完成 
           

skip skipLast skipUntil skipWhile

跳過前面(後面)的N項資料,基本上和 take 的結構一緻

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符
注意:skipLast 的機制是這樣實作的:延遲原始Observable發射的任何資料項,直到它發射了N項資料(或直到自這次發射之後過了給定的時長)。

案例:

Observable.range(0, 5).skip(2).subscribe(i -> log("" + i)); //2.3.4 
Observable.range(10, 5).skipLast(2).subscribe(i -> log("" + i)); //10,11,12 
Observable.intervalRange(20, 5, 200, 200, TimeUnit.MILLISECONDS) 
      .skip(500, TimeUnit.MILLISECONDS).subscribe(i -> log("" + i)); //22,23,24 
Observable.intervalRange(30, 5, 500, 200, TimeUnit.MILLISECONDS) 
      .skipLast(500, TimeUnit.MILLISECONDS).subscribe(i -> log("" + i)); //30,31 
Observable.intervalRange(40, 5, 200, 200, TimeUnit.MILLISECONDS) 
      .skipUntil(Observable.just("開始發射資料時源Observable發射的資料才不會被丢掉").delay(500, TimeUnit.MILLISECONDS)) 
      .subscribe(i -> log("" + i)); //42,43,44 
Observable.range(50, 5) 
      .delay(1000, TimeUnit.MILLISECONDS) 
      .flatMap(Observable::just) 
      .skipWhile(l -> l <= 52) //當滿足條件時原始的Observable發射的資料會被丢掉 
      .subscribe(i -> log("" + i)); //53,54 
           

擷取指定位置的元素

first*

隻發射滿足條件的第一條資料

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符
  • first:傳回僅發出源ObservableSource發出的第一個item的Single,如果源ObservableSource不發出任何item,則發出預設item
  • firstOrError:傳回僅發出源ObservableSource發出的第一個item的Single,如果此Observable為empty則發出NoSuchElementException事件
  • firstElement:傳回僅發出源ObservableSource發出的第一個item的Maybe,如果源ObservableSource為空則發出complete事件
Observable.range(0, 5) 
      .doOnEach(notification -> log("first1發射的事件為:" + notification.getValue())) //隻有一個 0 
      .first(100) //此時的結果和使用 firstOrError、firstElement 的效果完全一樣 
      .subscribe(i -> log("first1:" + i), e -> log("first1異常"));  //0 
Observable.empty() 
      .doOnEach(notification -> log("first2發射的事件為:" + notification.getValue())) //null 
      .first(100)  //此時的結果和使用 firstOrError、firstElement 的效果僅僅是結果不一樣 
      .subscribe(o -> log("first2:" + o), e -> log("first2異常")); //100、NoSuchElementException、onComplete 
           

last*

隻發射滿足條件的最後一條資料

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

如果将以上代碼改為 last* 時,則對于【Observable.empty()】,其結果和 first* 完全一緻;

而對于【Observable.range(0, 5)】,其結果和 first* 相比有一點差別,那就是 doOnEach 中一定會列印一項内容【發射的事件為:null】

Observable.range(10, 5) 
      .doOnEach(notification -> log("last發射的事件為:" + notification.getValue())) //有六個:10,11,12,13,14,null 
      .last(100) 
      .subscribe(i -> log("last:" + i), e -> log("last異常"));  //14 
           

elementAt elementAtOrError

隻發射第N項資料,和 first、last 基本一樣

RxJava【過濾】操作符 filter distinct throttle take skip first MD目錄常用的過濾操作符

first* 方法其實調用的就是 elementAt 方法:

public final Single<T> first(T defaultItem) { 
    return elementAt(0L, defaultItem); 
} 
public final Single<T> firstOrError() { 
    return elementAtOrError(0L); 
} 
public final Maybe<T> firstElement() { 
    return elementAt(0L); 
} 
           

其行為基本上是 first + last 的效果:

  • 如果源ObservableSource存在第 N 項資料,則源 ObservableSource 從第一項資料釋出到第N項資料後就停止發射了,然後新的 ObservableSource 把此第N項資料發射出去;
  • 如果源ObservableSource發射的資料個數小于 N,則還會在源 ObservableSource 釋出完所有資料後再發射一個【值為null】的對象;如果指定了預設資料,則在最最後還會發射預設資料
Observable.range(0, 5) 
      .doOnEach(notification -> log("1發射的事件為:" + notification.getValue())) //0,1 
      .elementAt(1) 
      .subscribe(i -> log("1:" + i), e -> log("1異常"), () -> log("完成"));  //1 
Observable.range(10, 5) 
      .doOnEach(notification -> log("2發射的事件為:" + notification.getValue())) //10,11,12,13,14,null 
      .elementAt(6) 
      .subscribe(i -> log("2:" + i), e -> log("2異常"), () -> log("完成"));  //0,完成 
Observable.range(20, 5) 
      .doOnEach(notification -> log("3發射的事件為:" + notification.getValue())) //20,21,22,23,24,null 
      .elementAt(6, 100) 
      .subscribe(i -> log("3:" + i), e -> log("3異常"));  //100 
Observable.empty() 
      .doOnEach(notification -> log("4發射的事件為:" + notification.getValue())) //null 
      .elementAtOrError(6) 
      .subscribe(i -> log("4:" + i), e -> log("4異常"));  //4異常 
           

single*

如果隻發射一個則發射這個值,如果不是則發射預設資料或異常

  • Maybe
  • Single
  • Single
Observable.range(0, 5) 
      .doOnEach(notification -> log("發射的事件為:" + notification.getValue())) //0,1 
      .singleElement()  //此時的結果和使用 single、singleOrError 的效果完全一樣 
      .subscribe(i -> log("" + i), e -> log("異常"), () -> log("完成"));  //IllegalArgumentException 
Observable.empty().delay(100, TimeUnit.MILLISECONDS) 
      .doOnEach(notification -> log("發射的事件為:" + notification.getValue())) //null 
      .single(100)  //此時的結果和使用 singleElement、singleOrError 的效果僅僅是結果不一樣 
      .subscribe(i -> log("" + i), e -> log("異常"));  //100、完成、NoSuchElementException 
Observable.just(20).delay(200, TimeUnit.MILLISECONDS) 
      .doOnEach(notification -> log("發射的事件為:" + notification.getValue())) //20,null 
      .singleOrError()  //此時的結果和使用 single、singleElement 的效果完全一樣 
      .subscribe(i -> log("" + i), e -> log("異常"));  //20 
           

2018-9-26