天天看点

RxJava2.x学习总结

公司项目中网络框架使用了目前主流的Retrofit+Okhttp+RxJava框架进行开发,三者联合使用极大简化了网络请求及请求结果的处理。对于RxJava其繁多的操作符让人眼花缭乱,但是只有掌握了这些知识,项目中才能运用自如。鉴于此,本篇将系统的学习总结RxJava2.x中的知识体系。

Create

RxJava2.x学习总结

create操作符作用为创建一个Observable对象(上游):

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();

                        e.onComplete();
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer i) {
                        Log.d(TAG, "onNext receive: " + i);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete receive!");
                    }
                });
           

注:

  • 创建

    Observable

    的同时,需要传入

    ObservableOnSubscribe

    对象,里面有个

    subscribe()

    回调函数,用于向下游发射事件
  • 上游调用

    onComplete()

    之后下游将停止接受新的事件,但是并不阻止上游继续调用

    onNext()

    发送事件
  • subscribe()

    是链接上游与下游的桥梁,传入的

    Observer

    对象即下游接受事件的对象
  • onCompete()

    onError()

    两者互斥,不可以同时触发

Map

RxJava2.x学习总结

将上游发送的事件变换成新的Observable对象:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();

                        e.onComplete();
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "the " + integer;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String str) {
                        Log.d(TAG, "onNext receive: " + str);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete receive!");
                    }
                });
           

Zip

RxJava2.x学习总结

zip操作符的作用主要是将多个Observable对象发射的事件进行合并,而且每个Observable发射的事件都是按照先后顺序进行组合的,因此变换后发射的事件个数由上述Observable中发射的事件个数最少的Observable决定:

io.reactivex.Observable
                .zip(io.reactivex.Observable
                                .create(new ObservableOnSubscribe<String>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext("A");
                                            Log.d(TAG, "String emit : A\n");

                                            e.onNext("B");
                                            Log.d(TAG, "String emit : B\n");

                                            e.onNext("C");
                                            Log.d(TAG, "String emit : C\n");
                                        }
                                    }
                                }),
                        io.reactivex.Observable
                                .create(new ObservableOnSubscribe<Integer>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 1\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 2\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 3\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 4\n");

                                            e.onNext();
                                            Log.d(TAG, "Integer emit : 5\n");
                                        }
                                    }
                                }), new BiFunction<String, Integer, String>() {
                            @Override
                            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                                return s + integer;
                            }
                        })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "Observer accept: " + s);
                    }
                });
           

Concat

RxJava2.x学习总结

concat操作符将多个Observable连接成一个Observable,并按照顺序发射事件:

io.reactivex.Observable
                .concat(io.reactivex.Observable.just(, , ), io.reactivex.Observable.just(, , ))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "conact: " + integer);
                    }
                });
           

FlatMap

RxJava2.x学习总结

flatmap的作用是将一个Observable转换成多个Observables,然后将多个Observables再装进单独的Observable中。该操作符不能保证事件的有序性。

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                        e.onNext();
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                        Log.d(TAG, "after flatMap: " + integer);
                        return io.reactivex.Observable
                                .just("converted " + integer)
                                .delay(, TimeUnit.SECONDS);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
           

下游接收的事件无序,log如下:

- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: after flatMap: 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
- :: -/com.esimtek.fortest D/MainActivity: accept: converted 
           

ConcatMap

concatMap功能与flatMap相同,唯一区别是concatMap保证了发射事件的顺序。

Distinct

RxJava2.x学习总结

该操作符功能如示意图,去重:

io.reactivex.Observable
                .just(, , , , , , , , )
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
           

Filter

RxJava2.x学习总结

filter顾名思义,是个过滤器,过滤掉回调函数

test()

返回

false

的值:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        for (int i = ; i < ; i++) {
                            e.onNext(i);
                        }
                    }
                })
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer s) throws Exception {
                        return s %  == ;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
           

Log:

- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
- :: -/com.esimtek.fortest D/MainActivity: accept: 
           

Buffer

RxJava2.x学习总结

buffer操作符有两个参数

buffer(count, skip)

,其中

count

为需要被发射的事件的最大个数,

skip

作用是确定每个每个buffer起始位置的索引值,当count与skip的值相等时,效果同

buffer(count)

io.reactivex.Observable
                .just(, , , , , , , )
                .buffer(, )
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.d(TAG, "accept: " + integers);
                    }
                });
           

Log如下:

- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, , ]
- :: -/com.esimtek.fortest D/MainActivity: accept: [, ]
           

Timer

RxJava2.x学习总结

timer操作符实现计时器功能,默认执行在工作线程:

Log.d(TAG, "start time: " + System.currentTimeMillis()
                + "\ncurr thread: " + Thread.currentThread().getName());

        io.reactivex.Observable
                .timer(, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "end time: " + System.currentTimeMillis()
                                + "\n end thread: " + Thread.currentThread().getName());
                    }
                });
           

Log:

- :: -/com.esimtek.fortest D/MainActivity: start time: 
                                                                 curr thread: main
- :: -/com.esimtek.fortest D/MainActivity: end time: 
                                                                  end thread: RxComputationThreadPool-
           

Interval

RxJava2.x学习总结

interval操作符用于间隔指定时间执行某个操作,其接受三个参数:initalDelay(首次发射延时), period(发射间隔时间), timeUnit(时间单位):

mDisposable = io.reactivex.Observable
                .interval(, , TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                    }
                });
           

这里有个问题,就是当

accept()

方法中如果存在UI操作,由于持有

Context

引用可能会造成

Activity

内存泄露。因此需要在

Activity

OnDestory()

方法中执行

mDisposable.dispose()

:

private Disposable mDisposable;

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed())
            mDisposable.dispose();
    }
           

Skip

RxJava2.x学习总结

如示意图,skip接受一个参数

count

,这个参数指定了跳过

count

个数目开始接收。

Take

RxJava2.x学习总结

take操作符接受一个参数count,代表至多接受count个参数。

Just

RxJava2.x学习总结

just操作符代表一次发射数据,并执行

onNext

方法。