天天看点

rxjava背压_RxJava背压

RxJava背压主要用来解决异步订阅关系中,被观察者发送事件的速度与观察者接收事件的速度不匹配的问题。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则经常会造成点击两次的效果。在大多数情况下,由于被观察者发送事件速度太快,而观察者来不及响应处理所有事件,从而导致事件丢失、OOM等异常。

RxJava 2.0中对背压的支持

主要是通过Flowable类来实现的,Flowable其实就是被观察者(Observable)的一种新实现,用来解决RxJava 1.0中无法解决的背压问题。

在使用上,Flowable基本类似于 Observable:

public void simpleUse() {

// 创建被观察者Flowable

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "发送事件 1");

emitter.onNext(1);

Log.d(TAG, "发送事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件 3");

emitter.onNext(3);

Log.d(TAG, "发送完成");

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)//传入背压参数BackpressureStrategy

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

//观察者通过request获取事件

s.request(3);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

其中,观察者通过Subscription.request获取事件, request决定了观察者能够接收多少个事件, 如设置了s.request(3),这就说明观察者能够接收3个事件(多出的事件存放在缓存区)。 对比Observer传入的Disposable参数,Subscription参数同样具备Disposable的作用,即调用Subscription.cancel()切断连接类似于Disposable.dispose()方法 同样的,不同的是Subscription增加了void request(long n)方法。

需要注意的:对于不同工作线程的订阅关系,如果观察者没有设置Subscription.request,则说明观察者不接受事件。但此时被观察者仍然可以继续发送事件,只不过发送的事件存放在缓冲区。其中,Flowable默认缓冲区的队列大小为128,即存储128个事件,超出会报错。

例子:观察者不接收事件,被观察者发送事件存放到缓存区, 观察者需要的时候再按需获取

@Override

public void onClick(View v) {

switch (v.getId()) {

case R.id.btn_test:

//每次点击按钮获取2个事件,直到获取完毕

mSubscription.request(2);

break;

default:

break;

}

}

Subscription mSubscription;

private void send() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "发送事件 1");

emitter.onNext(1);

Log.d(TAG, "发送事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件 3");

emitter.onNext(3);

Log.d(TAG, "发送事件 4");

emitter.onNext(4);

Log.d(TAG, "发送完成");

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribeOn(Schedulers.io()) // 设置被观察者在io线程中进行

.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 保存Subscription对象,等待点击按钮时观察者再接收事件

mSubscription = s;

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

同步订阅中的背压问题

如果是同步订阅关系,则不存在缓冲区,在同步订阅当中,被观察者发送一个事件后,要等到观察者接收以后才能继续发送下一个事件:

public void test() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送3个事件

Log.d(TAG, "发送了事件1");

emitter.onNext(1);

Log.d(TAG, "发送了事件2");

emitter.onNext(2);

Log.d(TAG, "发送了事件3");

emitter.onNext(3);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(3);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件 " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

所以同步订阅关系中没有流速不一致的问题,因为是同步的,会阻塞等待,但是却会出现被观察者发送事件数量 大于观察者接收事件数量的问题。例如,观察者只接受3个事件,但被观察者却发送了4个事件就会出问题:

public void test() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送3个事件

Log.d(TAG, "发送了事件1");

emitter.onNext(1);

Log.d(TAG, "发送了事件2");

emitter.onNext(2);

Log.d(TAG, "发送了事件3");

emitter.onNext(3);

Log.d(TAG, "发送了事件4");

emitter.onNext(4);

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(3);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件 " + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

解决方法:控制被观察者发送事件的数量,主要通过FlowableEmitter类的requested()方法实现,被观察者通过 FlowableEmitter.requested()可获得观察者自身接收事件的能力,从而根据该信息控制事件发送速度,从而达到了观察者反向控制被观察者的效果。

public void test2() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 调用emitter.requested()获取当前观察者需要接收的事件数量

long n = emitter.requested();

Log.d(TAG, "观察者可接收事件" + n);

// 根据emitter.requested()的值,即当前观察者需要接收的事件数量来发送事件

for (int i = 0; i < n; i++) {

Log.d(TAG, "发送了事件" + i);

emitter.onNext(i);

}

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 设置观察者每次能接受10个事件

s.request(10);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

在同步订阅情况中使用FlowableEmitter.requested()时,有以下几种使用特性需要注意的:

可叠加性: 观察者可连续要求接收事件,被观察者会进行叠加并一起发送

Subscription.request(a1);

Subscription.request(a2);

FlowableEmitter.requested()的返回值 = a1 + a2

1

2

3

4

public void test3() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件" + emitter.requested());

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

s.request(10); // 第1次设置观察者每次能接受10个事件

s.request(20); // 第2次设置观察者每次能接受20个事件

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

实时更新性:即每次发送事件后,emitter.requested()会实时更新观察者能接受的剩余事件数量

假如一开始观察者要接收10个事件,发送了1个后,会实时更新为9个

仅计算Next事件,complete & error事件不算。

Subscription.request(10);

// FlowableEmitter.requested()的返回值 = 10

FlowableEmitter.onNext(1); // 发送了1个事件

// FlowableEmitter.requested()的返回值 = 9

1

2

3

4

5

public void test4() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 1. 调用emitter.requested()获取当前观察者需要接收的事件数量

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

// 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件

// 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个

Log.d(TAG, "发送了事件 1");

emitter.onNext(1);

Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 3");

emitter.onNext(3);

Log.d(TAG, "发送事件3后, 还需要发送事件数量 = " + emitter.requested());

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 设置观察者每次能接受10个事件

s.request(10);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

异常:当FlowableEmitter.requested()减到0时,代表观察者已经不可接收事件,若此时被观察者继续发送事件,则会抛出MissingBackpressureException异常。

public void test5() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 1");

emitter.onNext(1);

Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());

Log.d(TAG, "发送了事件 2");

emitter.onNext(2);

Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

// 设置观察者每次能接受1个事件

s.request(1);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

若观察者没有设置可接收事件数量,即无调用Subscription.request

那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0。

背压策略的选择

策略名称含义

BackpressureStrategy.ERROR

当缓存区大小存满(默认缓存区大小128),被观察者仍然继续发送下一个事件时,直接抛出异常MissingBackpressureException

BackpressureStrategy.MISSING

当缓存区大小存满,被观察者仍然继续发送下一个事件时,抛出异常MissingBackpressureException , 提示缓存区满了

BackpressureStrategy.BUFFER

当缓存区大小存满,被观察者仍然继续发送下一个事件时,缓存区大小设置无限大, 即被观察者可无限发送事件,但实际上是存放在缓存区

BackpressureStrategy.DROP

当缓存区大小存满,被观察者仍然继续发送下一个事件时, 超过缓存区大小(128)的事件会被全部丢弃

BackpressureStrategy.LATEST

当缓存区大小存满,被观察者仍然继续发送下一个事件时,只保存最新/最后发送的事件, 其他超过缓存区大小(128)的事件会被全部丢弃

BackpressureStrategy.ERROR:

public void backpressureStrategyError() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 129个事件

for (int i = 0;i< 129; i++) {

emitter.onNext(i);

Log.d(TAG, "发送了事件" + i);

}

emitter.onComplete();

}

}, BackpressureStrategy.ERROR)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

可以看出当发送第129个事件时,抛出异常了,实际上被观察者发送了129个事件,而观察者只能接收到128个。

BackpressureStrategy.MISSING:

public void backpressureStrategyMissing() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 129个事件

for (int i = 0;i< 129; i++) {

emitter.onNext(i);

Log.d(TAG, "发送了事件" + i);

}

emitter.onComplete();

}

}, BackpressureStrategy.MISSING)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

也会抛出异常,感觉貌似跟BackpressureStrategy.ERROR没啥区别

BackpressureStrategy.BUFFER:

public void backpressureStrategyBuffer() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送 150个事件

for (int i = 0; i< 150; i++) {

emitter.onNext(i);

Log.d(TAG, "发送了事件" + i);

}

emitter.onComplete();

}

}, BackpressureStrategy.BUFFER)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

此时调用s.request(n);是可以拉取到数据的,前面两种不行

BackpressureStrategy.DROP:

public void backPressureStrategyDrop() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送150个事件

for (int i = 0;i< 150; i++) {

emitter.onNext(i);

Log.d(TAG, "发送了事件" + i);

}

emitter.onComplete();

}

}, BackpressureStrategy.DROP)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

//尝试拉取150个事件,但只能取到128个事件

s.request(150);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

可以出只接收到127, 127后面的都没了, 即被丢弃了

BackpressureStrategy.LATEST:

public void backPressureStrategyLatest() {

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter emitter) throws Exception {

// 发送150个事件

for (int i = 0;i< 150; i++) {

emitter.onNext(i);

Log.d(TAG, "发送了事件" + i);

}

emitter.onComplete();

}

}, BackpressureStrategy.LATEST)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

//尝试拉取150个事件,但只能取到 前128 + 最后1个(第150个) 事件

s.request(150);

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "接收到了事件" + integer);

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

onBackpressure操作符

通过Flowable.create()方法创建Flowable时,第二个参数可以指定背压策略常量,当手动创建Flowable时,无法指定这个参数,这时就会出现问题:

public void test6() {

// 通过interval自动创建被观察者Flowable(从0开始每隔1ms发送1个事件)

Flowable.interval(1, TimeUnit.MILLISECONDS) //interval操作符会默认新开1个新的工作线程

.observeOn(Schedulers.newThread()) //观察者同样工作在一个新开线程中

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

//默认可以接收Long.MAX_VALUE个事件

s.request(Long.MAX_VALUE);

}

@Override

public void onNext(Long aLong) {

Log.d(TAG, "onNext: " + aLong);

try {

Thread.sleep(1000);

// 每次延时1秒再接收事件

// 因为发送事件 = 延时1ms,接收事件 = 延时1s,出现了发送速度 & 接收速度不匹配的问题

// 缓存区很快就存满了128个事件,从而抛出MissingBackpressureException异常,请看下图结果

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

为了解决这个问题,RxJava提供了对应背压策略模式的操作符:

onBackpressureBuffer()

onBackpressureDrop()

onBackpressureLatest()

示例:

public void onBackpressureBuffer() {

// 通过interval自动创建被观察者Flowable(从0开始每隔1ms发送1个事件)

Flowable.interval(1, TimeUnit.MILLISECONDS) //interval操作符会默认新开1个新的工作线程

.onBackpressureBuffer()//此处选择Buffer背压模式,即缓存区大小无限制

.observeOn(Schedulers.newThread()) //观察者同样工作在一个新开线程中

.subscribe(new Subscriber() {

@Override

public void onSubscribe(Subscription s) {

Log.d(TAG, "onSubscribe");

//默认可以接收Long.MAX_VALUE个事件

s.request(Long.MAX_VALUE);

}

@Override

public void onNext(Long aLong) {

Log.d(TAG, "onNext: " + aLong);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

@Override

public void onError(Throwable t) {

Log.w(TAG, "onError: ", t);

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

}

输出:

rxjava背压_RxJava背压

如果不设置onBackpressurexxx方法,则默认采用BackpressureStrategy.ERROR模式,即超出缓冲区大小报异常。

其中,onBackpressureBuffer有多个重载方法:

rxjava背压_RxJava背压

例如onBackpressureBuffer(int capacity, boolean delayError)第一个参数可以指定buffer缓存区的容量大小,第二个参数为true可以指定当超出缓存区大小时直到下游观察者消费完缓存区的所有事件时才会抛出异常(即延时抛出),为false则超出时立即抛出异常。

下面是一些官方的示意图可以帮助理解onBackpressure操作符的含义:

rxjava背压_RxJava背压
rxjava背压_RxJava背压
rxjava背压_RxJava背压

可以避免背压问题的操作符

通过一些操作符微调参数可以来应对突发的observable发送爆发性数据(一会没有,一会很多)就像下图所示:

rxjava背压_RxJava背压

RxJava操作符中比如 sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( ) 等允许你通过调节速率来改变Observable发射消息的速度。

sample( ) 或 throttleLast( )操作符定期收集observable发送的数据items,并发射出最后一个数据item:

rxjava背压_RxJava背压

throttleFirst( )操作符跟sample( ) 有点类似,但是并不是把观测到的最后一个item发送出去,而是把该时间段第一个item发送出去:

rxjava背压_RxJava背压

debounce( )或throttleWithTimeout( )操作符会只发送两个在规定间隔内的时间发送的序列的最后一个:

rxjava背压_RxJava背压