公司项目中网络框架使用了目前主流的Retrofit+Okhttp+RxJava框架进行开发,三者联合使用极大简化了网络请求及请求结果的处理。对于RxJava其繁多的操作符让人眼花缭乱,但是只有掌握了这些知识,项目中才能运用自如。鉴于此,本篇将系统的学习总结RxJava2.x中的知识体系。
Create
create操作符作用为创建一个Observable对象(上游):
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer i) {
Log.d(TAG, "onNext receive: " + i);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete receive!");
}
});
注:
- 创建
的同时,需要传入Observable
对象,里面有个ObservableOnSubscribe
回调函数,用于向下游发射事件subscribe()
- 上游调用
之后下游将停止接受新的事件,但是并不阻止上游继续调用onComplete()
发送事件onNext()
-
是链接上游与下游的桥梁,传入的subscribe()
对象即下游接受事件的对象Observer
-
与onCompete()
两者互斥,不可以同时触发onError()
Map
将上游发送的事件变换成新的Observable对象:
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "the " + integer;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String str) {
Log.d(TAG, "onNext receive: " + str);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete receive!");
}
});
Zip
zip操作符的作用主要是将多个Observable对象发射的事件进行合并,而且每个Observable发射的事件都是按照先后顺序进行组合的,因此变换后发射的事件个数由上述Observable中发射的事件个数最少的Observable决定:
io.reactivex.Observable
.zip(io.reactivex.Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
Log.d(TAG, "String emit : A\n");
e.onNext("B");
Log.d(TAG, "String emit : B\n");
e.onNext("C");
Log.d(TAG, "String emit : C\n");
}
}
}),
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext();
Log.d(TAG, "Integer emit : 1\n");
e.onNext();
Log.d(TAG, "Integer emit : 2\n");
e.onNext();
Log.d(TAG, "Integer emit : 3\n");
e.onNext();
Log.d(TAG, "Integer emit : 4\n");
e.onNext();
Log.d(TAG, "Integer emit : 5\n");
}
}
}), new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Observer accept: " + s);
}
});
Concat
concat操作符将多个Observable连接成一个Observable,并按照顺序发射事件:
io.reactivex.Observable
.concat(io.reactivex.Observable.just(, , ), io.reactivex.Observable.just(, , ))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "conact: " + integer);
}
});
FlatMap
flatmap的作用是将一个Observable转换成多个Observables,然后将多个Observables再装进单独的Observable中。该操作符不能保证事件的有序性。
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
Log.d(TAG, "after flatMap: " + integer);
return io.reactivex.Observable
.just("converted " + integer)
.delay(, TimeUnit.SECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
下游接收的事件无序,log如下:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
ConcatMap
concatMap功能与flatMap相同,唯一区别是concatMap保证了发射事件的顺序。
Distinct
该操作符功能如示意图,去重:
io.reactivex.Observable
.just(, , , , , , , , )
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Filter
filter顾名思义,是个过滤器,过滤掉回调函数
test()
返回
false
的值:
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = ; i < ; i++) {
e.onNext(i);
}
}
})
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer s) throws Exception {
return s % == ;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Log:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
Buffer
buffer操作符有两个参数
buffer(count, skip)
,其中
count
为需要被发射的事件的最大个数,
skip
作用是确定每个每个buffer起始位置的索引值,当count与skip的值相等时,效果同
buffer(count)
:
io.reactivex.Observable
.just(, , , , , , , )
.buffer(, )
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.d(TAG, "accept: " + integers);
}
});
Log如下:
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, ]
Timer
timer操作符实现计时器功能,默认执行在工作线程:
Log.d(TAG, "start time: " + System.currentTimeMillis()
+ "\ncurr thread: " + Thread.currentThread().getName());
io.reactivex.Observable
.timer(, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "end time: " + System.currentTimeMillis()
+ "\n end thread: " + Thread.currentThread().getName());
}
});
Log:
- :: -/com.esimtek.fortest D/MainActivity: start time:
curr thread: main
- :: -/com.esimtek.fortest D/MainActivity: end time:
end thread: RxComputationThreadPool-
Interval
interval操作符用于间隔指定时间执行某个操作,其接受三个参数:initalDelay(首次发射延时), period(发射间隔时间), timeUnit(时间单位):
mDisposable = io.reactivex.Observable
.interval(, , TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});
这里有个问题,就是当
accept()
方法中如果存在UI操作,由于持有
Context
引用可能会造成
Activity
内存泄露。因此需要在
Activity
的
OnDestory()
方法中执行
mDisposable.dispose()
:
private Disposable mDisposable;
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null && !mDisposable.isDisposed())
mDisposable.dispose();
}
Skip
如示意图,skip接受一个参数
count
,这个参数指定了跳过
count
个数目开始接收。
Take
take操作符接受一个参数count,代表至多接受count个参数。
Just
just操作符代表一次发射数据,并执行
onNext
方法。