RxJava背壓主要用來解決異步訂閱關系中,被觀察者發送事件的速度與觀察者接收事件的速度不比對的問題。所謂背壓,即生産者的速度大于消費者的速度帶來的問題,比如在Android中常見的點選事件,點選過快則經常會造成點選兩次的效果。在大多數情況下,由于被觀察者發送事件速度太快,而觀察者來不及響應處理所有事件,進而導緻事件丢失、OOM等異常。
RxJava 2.0中對背壓的支援
主要是通過Flowable類來實作的,Flowable其實就是被觀察者(Observable)的一種新實作,用來解決RxJava 1.0中無法解決的背壓問題。
在使用上,Flowable基本類似于 Observable:
public void simpleUse() {
// 建立被觀察者Flowable
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
Log.d(TAG, "發送事件 1");
emitter.onNext(1);
Log.d(TAG, "發送事件 2");
emitter.onNext(2);
Log.d(TAG, "發送事件 3");
emitter.onNext(3);
Log.d(TAG, "發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)//傳入背壓參數BackpressureStrategy
.subscribeOn(Schedulers.io()) // 設定被觀察者在io線程中進行
.observeOn(AndroidSchedulers.mainThread()) // 設定觀察者在主線程中進行
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
//觀察者通過request擷取事件
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
其中,觀察者通過Subscription.request擷取事件, request決定了觀察者能夠接收多少個事件, 如設定了s.request(3),這就說明觀察者能夠接收3個事件(多出的事件存放在緩存區)。 對比Observer傳入的Disposable參數,Subscription參數同樣具備Disposable的作用,即調用Subscription.cancel()切斷連接配接類似于Disposable.dispose()方法 同樣的,不同的是Subscription增加了void request(long n)方法。
需要注意的:對于不同工作線程的訂閱關系,如果觀察者沒有設定Subscription.request,則說明觀察者不接受事件。但此時被觀察者仍然可以繼續發送事件,隻不過發送的事件存放在緩沖區。其中,Flowable預設緩沖區的隊列大小為128,即存儲128個事件,超出會報錯。
例子:觀察者不接收事件,被觀察者發送事件存放到緩存區, 觀察者需要的時候再按需擷取
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.btn_test:
//每次點選按鈕擷取2個事件,直到擷取完畢
mSubscription.request(2);
break;
default:
break;
}
}
Subscription mSubscription;
private void send() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
Log.d(TAG, "發送事件 1");
emitter.onNext(1);
Log.d(TAG, "發送事件 2");
emitter.onNext(2);
Log.d(TAG, "發送事件 3");
emitter.onNext(3);
Log.d(TAG, "發送事件 4");
emitter.onNext(4);
Log.d(TAG, "發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io()) // 設定被觀察者在io線程中進行
.observeOn(AndroidSchedulers.mainThread()) // 設定觀察者在主線程中進行
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 儲存Subscription對象,等待點選按鈕時觀察者再接收事件
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
同步訂閱中的背壓問題
如果是同步訂閱關系,則不存在緩沖區,在同步訂閱當中,被觀察者發送一個事件後,要等到觀察者接收以後才能繼續發送下一個事件:
public void test() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送3個事件
Log.d(TAG, "發送了事件1");
emitter.onNext(1);
Log.d(TAG, "發送了事件2");
emitter.onNext(2);
Log.d(TAG, "發送了事件3");
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件 " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
是以同步訂閱關系中沒有流速不一緻的問題,因為是同步的,會阻塞等待,但是卻會出現被觀察者發送事件數量 大于觀察者接收事件數量的問題。例如,觀察者隻接受3個事件,但被觀察者卻發送了4個事件就會出問題:
public void test() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送3個事件
Log.d(TAG, "發送了事件1");
emitter.onNext(1);
Log.d(TAG, "發送了事件2");
emitter.onNext(2);
Log.d(TAG, "發送了事件3");
emitter.onNext(3);
Log.d(TAG, "發送了事件4");
emitter.onNext(4);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件 " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
解決方法:控制被觀察者發送事件的數量,主要通過FlowableEmitter類的requested()方法實作,被觀察者通過 FlowableEmitter.requested()可獲得觀察者自身接收事件的能力,進而根據該資訊控制事件發送速度,進而達到了觀察者反向控制被觀察者的效果。
public void test2() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 調用emitter.requested()擷取目前觀察者需要接收的事件數量
long n = emitter.requested();
Log.d(TAG, "觀察者可接收事件" + n);
// 根據emitter.requested()的值,即目前觀察者需要接收的事件數量來發送事件
for (int i = 0; i < n; i++) {
Log.d(TAG, "發送了事件" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 設定觀察者每次能接受10個事件
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
在同步訂閱情況中使用FlowableEmitter.requested()時,有以下幾種使用特性需要注意的:
可疊加性: 觀察者可連續要求接收事件,被觀察者會進行疊加并一起發送
Subscription.request(a1);
Subscription.request(a2);
FlowableEmitter.requested()的傳回值 = a1 + a2
1
2
3
4
public void test3() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 調用emitter.requested()擷取目前觀察者需要接收的事件數量
Log.d(TAG, "觀察者可接收事件" + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10); // 第1次設定觀察者每次能接受10個事件
s.request(20); // 第2次設定觀察者每次能接受20個事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
實時更新性:即每次發送事件後,emitter.requested()會實時更新觀察者能接受的剩餘事件數量
假如一開始觀察者要接收10個事件,發送了1個後,會實時更新為9個
僅計算Next事件,complete & error事件不算。
Subscription.request(10);
// FlowableEmitter.requested()的傳回值 = 10
FlowableEmitter.onNext(1); // 發送了1個事件
// FlowableEmitter.requested()的傳回值 = 9
1
2
3
4
5
public void test4() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 1. 調用emitter.requested()擷取目前觀察者需要接收的事件數量
Log.d(TAG, "觀察者可接收事件數量 = " + emitter.requested());
// 2. 每次發送事件後,emitter.requested()會實時更新觀察者能接受的事件
// 即一開始觀察者要接收10個事件,發送了1個後,會實時更新為9個
Log.d(TAG, "發送了事件 1");
emitter.onNext(1);
Log.d(TAG, "發送了事件1後, 還需要發送事件數量 = " + emitter.requested());
Log.d(TAG, "發送了事件 2");
emitter.onNext(2);
Log.d(TAG, "發送事件2後, 還需要發送事件數量 = " + emitter.requested());
Log.d(TAG, "發送了事件 3");
emitter.onNext(3);
Log.d(TAG, "發送事件3後, 還需要發送事件數量 = " + emitter.requested());
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 設定觀察者每次能接受10個事件
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
異常:當FlowableEmitter.requested()減到0時,代表觀察者已經不可接收事件,若此時被觀察者繼續發送事件,則會抛出MissingBackpressureException異常。
public void test5() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
Log.d(TAG, "觀察者可接收事件數量 = " + emitter.requested());
Log.d(TAG, "發送了事件 1");
emitter.onNext(1);
Log.d(TAG, "發送了事件1後, 還需要發送事件數量 = " + emitter.requested());
Log.d(TAG, "發送了事件 2");
emitter.onNext(2);
Log.d(TAG, "發送事件2後, 還需要發送事件數量 = " + emitter.requested());
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 設定觀察者每次能接受1個事件
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
若觀察者沒有設定可接收事件數量,即無調用Subscription.request
那麼被觀察者預設觀察者可接收事件數量 = 0,即FlowableEmitter.requested()的傳回值 = 0。
背壓政策的選擇
政策名稱含義
BackpressureStrategy.ERROR
當緩存區大小存滿(預設緩存區大小128),被觀察者仍然繼續發送下一個事件時,直接抛出異常MissingBackpressureException
BackpressureStrategy.MISSING
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時,抛出異常MissingBackpressureException , 提示緩存區滿了
BackpressureStrategy.BUFFER
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時,緩存區大小設定無限大, 即被觀察者可無限發送事件,但實際上是存放在緩存區
BackpressureStrategy.DROP
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時, 超過緩存區大小(128)的事件會被全部丢棄
BackpressureStrategy.LATEST
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時,隻儲存最新/最後發送的事件, 其他超過緩存區大小(128)的事件會被全部丢棄
BackpressureStrategy.ERROR:
public void backpressureStrategyError() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送 129個事件
for (int i = 0;i< 129; i++) {
emitter.onNext(i);
Log.d(TAG, "發送了事件" + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
可以看出當發送第129個事件時,抛出異常了,實際上被觀察者發送了129個事件,而觀察者隻能接收到128個。
BackpressureStrategy.MISSING:
public void backpressureStrategyMissing() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送 129個事件
for (int i = 0;i< 129; i++) {
emitter.onNext(i);
Log.d(TAG, "發送了事件" + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
也會抛出異常,感覺貌似跟BackpressureStrategy.ERROR沒啥差別
BackpressureStrategy.BUFFER:
public void backpressureStrategyBuffer() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送 150個事件
for (int i = 0; i< 150; i++) {
emitter.onNext(i);
Log.d(TAG, "發送了事件" + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
此時調用s.request(n);是可以拉取到資料的,前面兩種不行
BackpressureStrategy.DROP:
public void backPressureStrategyDrop() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送150個事件
for (int i = 0;i< 150; i++) {
emitter.onNext(i);
Log.d(TAG, "發送了事件" + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
//嘗試拉取150個事件,但隻能取到128個事件
s.request(150);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
可以出隻接收到127, 127後面的都沒了, 即被丢棄了
BackpressureStrategy.LATEST:
public void backPressureStrategyLatest() {
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
// 發送150個事件
for (int i = 0;i< 150; i++) {
emitter.onNext(i);
Log.d(TAG, "發送了事件" + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
//嘗試拉取150個事件,但隻能取到 前128 + 最後1個(第150個) 事件
s.request(150);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
onBackpressure操作符
通過Flowable.create()方法建立Flowable時,第二個參數可以指定背壓政策常量,當手動建立Flowable時,無法指定這個參數,這時就會出現問題:
public void test6() {
// 通過interval自動建立被觀察者Flowable(從0開始每隔1ms發送1個事件)
Flowable.interval(1, TimeUnit.MILLISECONDS) //interval操作符會預設新開1個新的工作線程
.observeOn(Schedulers.newThread()) //觀察者同樣工作在一個新開線程中
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
//預設可以接收Long.MAX_VALUE個事件
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
// 每次延時1秒再接收事件
// 因為發送事件 = 延時1ms,接收事件 = 延時1s,出現了發送速度 & 接收速度不比對的問題
// 緩存區很快就存滿了128個事件,進而抛出MissingBackpressureException異常,請看下圖結果
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
為了解決這個問題,RxJava提供了對應背壓政策模式的操作符:
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
示例:
public void onBackpressureBuffer() {
// 通過interval自動建立被觀察者Flowable(從0開始每隔1ms發送1個事件)
Flowable.interval(1, TimeUnit.MILLISECONDS) //interval操作符會預設新開1個新的工作線程
.onBackpressureBuffer()//此處選擇Buffer背壓模式,即緩存區大小無限制
.observeOn(Schedulers.newThread()) //觀察者同樣工作在一個新開線程中
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
//預設可以接收Long.MAX_VALUE個事件
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出:
如果不設定onBackpressurexxx方法,則預設采用BackpressureStrategy.ERROR模式,即超出緩沖區大小報異常。
其中,onBackpressureBuffer有多個重載方法:
例如onBackpressureBuffer(int capacity, boolean delayError)第一個參數可以指定buffer緩存區的容量大小,第二個參數為true可以指定當超出緩存區大小時直到下遊觀察者消費完緩存區的所有事件時才會抛出異常(即延時抛出),為false則超出時立即抛出異常。
下面是一些官方的示意圖可以幫助了解onBackpressure操作符的含義:
可以避免背壓問題的操作符
通過一些操作符微調參數可以來應對突發的observable發送爆發性資料(一會沒有,一會很多)就像下圖所示:
RxJava操作符中比如 sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( ) 等允許你通過調節速率來改變Observable發射消息的速度。
sample( ) 或 throttleLast( )操作符定期收集observable發送的資料items,并發射出最後一個資料item:
throttleFirst( )操作符跟sample( ) 有點類似,但是并不是把觀測到的最後一個item發送出去,而是把該時間段第一個item發送出去:
debounce( )或throttleWithTimeout( )操作符會隻發送兩個在規定間隔内的時間發送的序列的最後一個: