公司項目中網絡架構使用了目前主流的Retrofit+Okhttp+RxJava架構進行開發,三者聯合使用極大簡化了網絡請求及請求結果的處理。對于RxJava其繁多的操作符讓人眼花缭亂,但是隻有掌握了這些知識,項目中才能運用自如。鑒于此,本篇将系統的學習總結RxJava2.x中的知識體系。
Create
create操作符作用為建立一個Observable對象(上遊):
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer i) {
Log.d(TAG, "onNext receive: " + i);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete receive!");
}
});
注:
- 建立
的同時,需要傳入Observable
對象,裡面有個ObservableOnSubscribe
回調函數,用于向下遊發射事件subscribe()
- 上遊調用
之後下遊将停止接受新的事件,但是并不阻止上遊繼續調用onComplete()
發送事件onNext()
-
是連結上遊與下遊的橋梁,傳入的subscribe()
對象即下遊接受事件的對象Observer
-
與onCompete()
兩者互斥,不可以同時觸發onError()
Map
将上遊發送的事件變換成新的Observable對象:
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "the " + integer;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String str) {
Log.d(TAG, "onNext receive: " + str);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete receive!");
}
});
Zip
zip操作符的作用主要是将多個Observable對象發射的事件進行合并,而且每個Observable發射的事件都是按照先後順序進行組合的,是以變換後發射的事件個數由上述Observable中發射的事件個數最少的Observable決定:
io.reactivex.Observable
.zip(io.reactivex.Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
Log.d(TAG, "String emit : A\n");
e.onNext("B");
Log.d(TAG, "String emit : B\n");
e.onNext("C");
Log.d(TAG, "String emit : C\n");
}
}
}),
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext();
Log.d(TAG, "Integer emit : 1\n");
e.onNext();
Log.d(TAG, "Integer emit : 2\n");
e.onNext();
Log.d(TAG, "Integer emit : 3\n");
e.onNext();
Log.d(TAG, "Integer emit : 4\n");
e.onNext();
Log.d(TAG, "Integer emit : 5\n");
}
}
}), new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Observer accept: " + s);
}
});
Concat
concat操作符将多個Observable連接配接成一個Observable,并按照順序發射事件:
io.reactivex.Observable
.concat(io.reactivex.Observable.just(, , ), io.reactivex.Observable.just(, , ))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "conact: " + integer);
}
});
FlatMap
flatmap的作用是将一個Observable轉換成多個Observables,然後将多個Observables再裝進單獨的Observable中。該操作符不能保證事件的有序性。
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
e.onNext();
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
Log.d(TAG, "after flatMap: " + integer);
return io.reactivex.Observable
.just("converted " + integer)
.delay(, TimeUnit.SECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
下遊接收的事件無序,log如下:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: after flatMap:
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
- :: -/com.esimtek.fortest D/MainActivity: accept: converted
ConcatMap
concatMap功能與flatMap相同,唯一差別是concatMap保證了發射事件的順序。
Distinct
該操作符功能如示意圖,去重:
io.reactivex.Observable
.just(, , , , , , , , )
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Filter
filter顧名思義,是個過濾器,過濾掉回調函數
test()
傳回
false
的值:
io.reactivex.Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = ; i < ; i++) {
e.onNext(i);
}
}
})
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer s) throws Exception {
return s % == ;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Log:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
- :: -/com.esimtek.fortest D/MainActivity: accept:
Buffer
buffer操作符有兩個參數
buffer(count, skip)
,其中
count
為需要被發射的事件的最大個數,
skip
作用是确定每個每個buffer起始位置的索引值,當count與skip的值相等時,效果同
buffer(count)
:
io.reactivex.Observable
.just(, , , , , , , )
.buffer(, )
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.d(TAG, "accept: " + integers);
}
});
Log如下:
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, ]
Timer
timer操作符實作計時器功能,預設執行在工作線程:
Log.d(TAG, "start time: " + System.currentTimeMillis()
+ "\ncurr thread: " + Thread.currentThread().getName());
io.reactivex.Observable
.timer(, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "end time: " + System.currentTimeMillis()
+ "\n end thread: " + Thread.currentThread().getName());
}
});
Log:
- :: -/com.esimtek.fortest D/MainActivity: start time:
curr thread: main
- :: -/com.esimtek.fortest D/MainActivity: end time:
end thread: RxComputationThreadPool-
Interval
interval操作符用于間隔指定時間執行某個操作,其接受三個參數:initalDelay(首次發射延時), period(發射間隔時間), timeUnit(時間機關):
mDisposable = io.reactivex.Observable
.interval(, , TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});
這裡有個問題,就是當
accept()
方法中如果存在UI操作,由于持有
Context
引用可能會造成
Activity
記憶體洩露。是以需要在
Activity
的
OnDestory()
方法中執行
mDisposable.dispose()
:
private Disposable mDisposable;
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null && !mDisposable.isDisposed())
mDisposable.dispose();
}
Skip
如示意圖,skip接受一個參數
count
,這個參數指定了跳過
count
個數目開始接收。
Take
take操作符接受一個參數count,代表至多接受count個參數。
Just
just操作符代表一次發射資料,并執行
onNext
方法。