天天看点

Android RxJava2(三)组合操作符

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的组合操作符。

merge()

mergeArray()

concat()

concatArray()

mergeArrayDelayError() & concatArrayDelayError()

startWith() & startWithArray()

zip()

combineLatest() & combineLatestDelayError()

reduce()

count()

collect()

merge()

原理图: 

方法:

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

1

作用: 

将两个Observable发射的事件序列组合并成一个时间序列,就想是一个Observable发射的一样,合并后数据是无序的。 

代码:

        //这里的ob1和ob2将在下述所有示例中使用,将不再特殊描述

        final String[] str = new String[]{"a","b","c"};

        final int[] ints = new int[]{1,2,3,4,5};

        Observable ob1 = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function<Long,String>() {

            @Override

            public String apply(Long aLong) throws Exception {

                return str[aLong.intValue()];

            }

        }).take(str.length);

        Observable ob2 = Observable.interval(300,TimeUnit.MILLISECONDS)

                .map(new Function<Long,Integer>() {

                    @Override

                    public Integer apply(Long aLong) throws Exception {

                        return ints[aLong.intValue()];

                    }

                }).take(ints.length);

       Observable.merge(ob1,ob2).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

上述代码中第一个Observable作用是每500毫秒从字符串数组中取一个元素,第二个Observable的作用是每300毫秒从整型数组中取一个元素,打印结果为:

06-06 22:48:53.410 17668-17707/ E/—: 1 

06-06 22:48:53.610 17668-17706/ E/—: a 

06-06 22:48:53.710 17668-17707/ E/—: 2 

06-06 22:48:54.010 17668-17707/ E/—: 3 

06-06 22:48:54.110 17668-17706/ E/—: b 

06-06 22:48:54.310 17668-17707/ E/—: 4 

06-06 22:48:54.610 17668-17706/ E/—: c 

06-06 22:48:54.610 17668-17707/ E/—: 5

mergeArray()

原理图: 

方法:

public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources) 

1

作用: 

作用和merge类似,只不过是组合多个Observable

concat()

原理图: 

方法:

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

...

 public static <T> Observable<T> concat(

            ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,

            ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

1

2

3

4

5

作用: 

功能和merge类似,也是用于将多个Observable合并,最多支持4个,但是concat是有序的,也就是说前一个Observable没发射完是不会发射后一个Observable的数据的。 

代码: 

将上述代码里面的merge直接换成concat

       Observable.concat(ob1,ob2).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

代码执行结果如下:

06-06 23:01:16.189 27785-27822/ E/---: a

06-06 23:01:16.689 27785-27822/ E/---: b

06-06 23:01:17.189 27785-27822/ E/---: c

06-06 23:01:17.490 27785-27872/ E/---: 1

06-06 23:01:17.790 27785-27872/ E/---: 2

06-06 23:01:18.090 27785-27872/ E/---: 3

06-06 23:01:18.390 27785-27872/ E/---: 4

06-06 23:01:18.690 27785-27872/ E/---: 5

1

2

3

4

5

6

7

8

concatArray()

方法:

 public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)

1

作用: 

作用同concat类似,将多个Observable进行数据合并

mergeArrayDelayError() & concatArrayDelayError()

方法:

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)

public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

1

2

作用: 

在mergeArray()和concatArray()两个方法中,如果其中一个Observable发送了一个Error事件,那么就会停止发送事件,如果想onError()事件延迟到所有Observable都发送完事件后再执行,就可以使用mergeArrayDelayError()和concatArrayDelayError() 

代码: 

下面通过代码测试下如果中途发送onError,Observable是否会中断发送

       Observable.mergeArray(ob1,Observable.create(new ObservableOnSubscribe() {

           @Override

           public void subscribe(ObservableEmitter e) throws Exception {

               e.onNext(1);

               e.onError(new NumberFormatException());

           }

       })).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

                Log.e("---","--onError");

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

上述执行结果为如下:

06-06 23:18:58.930 8575-8575/ E/---: 1

06-06 23:18:58.930 8575-8575/ E/---: --onError

1

2

可以发现使用mergeArray()时如果中途发送onError()会中断数据的发送,下面将mergeArray改成mergeArrayDelayError

       Observable.mergeArrayDelayError(ob1,Observable.create(new ObservableOnSubscribe() {

           @Override

           public void subscribe(ObservableEmitter e) throws Exception {

               e.onNext(1);

               e.onError(new NumberFormatException());

           }

       })).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

                Log.e("---","onError");

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

更改之后的执行结果如下:

06-06 23:14:18.191 5668-5668/ E/---: 1

06-06 23:14:18.691 5668-5700/ E/---: a

06-06 23:14:19.191 5668-5700/ E/---: b

06-06 23:14:19.691 5668-5700/ E/---: c

06-06 23:14:19.691 5668-5700/ E/---: --onError

1

2

3

4

5

startWith() & startWithArray()

原理图: 

方法:

public final Observable<T> startWith(ObservableSource<? extends T> other)

public final Observable<T> startWithArray(T... items)

1

2

作用: 

用于在源Observable发射的数据前插入另一个Observable发射的数据 

代码:

       ob1.startWith(ob2).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

代码执行结果如下:

06-06 23:07:16.014 32379-32405/ E/---: 1

06-06 23:07:16.314 32379-32405/ E/---: 2

06-06 23:07:16.614 32379-32405/ E/---: 3

06-06 23:07:16.913 32379-32405/ E/---: 4

06-06 23:07:17.214 32379-32405/ E/---: 5

06-06 23:07:17.716 32379-32444/ E/---: a

06-06 23:07:18.215 32379-32444/ E/---: b

06-06 23:07:18.714 32379-32444/ E/---: c

1

2

3

4

5

6

7

8

zip()

原理图: 

方法:

public static <T1, T2, R> Observable<R> zip(

            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,

            BiFunction<? super T1, ? super T2, ? extends R> zipper)

1

2

3

作用: 

用来合并两个Observable发射的事件,根据BiFunction函数生成一个新的值发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发送数据。也就是说正常的情况下数据长度会与两个Observable中最少事件的数量一样。 

代码: 

简单的将两个Observable的数据进行拼接

       Observable.zip(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override

           public String apply(String s, Integer integer) throws Exception {

               return s+integer;

           }

       }).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

                Log.e("---", String.valueOf(value));

            }

            @Override

            public void onError(Throwable e) {

                Log.e("---","--onError");

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

运行结果如下:

06-06 23:29:56.547 14888-14926/ E/---: a1

06-06 23:29:57.049 14888-14926/ E/---: b2

06-06 23:29:57.547 14888-14926/ E/---: c3

1

2

3

combineLatest() & combineLatestDelayError()

原理图: 

可能上面这张图不是太好理解,可以看下面这张图 

方法:

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)

1

作用: 

用于将两个Observable最近发射的数据经BiFunction函数的规则进行组合,combineLatest()发送事件的序列是与发送的时间线有关的。拿上图解释当发送A之后会从上一个Observ拿最近发送的1进行组合生成‘1A’,当发送2时拿第二个Observable最近发送的数据B组合成‘2B’,接下来到事件C时还是取第一个Observable最近发送的时间2进行组合成‘2C’,以此类推。 

代码:

       Observable.combineLatest(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override

           public String apply(String s, Integer integer) throws Exception {

               return s + integer;

           }

       }).subscribe(new Observer() {

            @Override

            public void onSubscribe(Disposable d) {

            }

            @Override

            public void onNext(Object value) {

            }

            @Override

            public void onError(Throwable e) {

                Log.e("---","--onError");

            }

            @Override

            public void onComplete() {

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

执行结果如下:

06-06 23:39:58.087 22418-22447/ E/---: a1

06-06 23:39:58.190 22418-22448/ E/---: a2

06-06 23:39:58.488 22418-22448/ E/---: a3

06-06 23:39:58.589 22418-22447/ E/---: b3

06-06 23:39:58.788 22418-22448/ E/---: b4

06-06 23:39:59.088 22418-22447/ E/---: c4

06-06 23:39:59.088 22418-22448/ E/---: c5

1

2

3

4

5

6

7

reduce()

方法:

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

1

作用: 

与scan()操作符类似,作用是将数据以一定的逻辑聚合起来,这两个的区别在于scan()没处理一次数据将会发送一个事件给观察者,但是reduce()会将所有数据聚合在一起之后才会发送给观察者,还有一点区别就是scan的返回值是Observable,而reduce的返回值是Maybe 

代码:

        Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {

            @Override

            public Integer apply(Integer integer, Integer integer2) throws Exception {

                return integer + integer2;

            }

        }).subscribe(new Consumer<Integer>() {

            @Override

            public void accept(Integer integer) throws Exception {

                Log.e("---",integer+"");

            }

        });

1

2

3

4

5

6

7

8

9

10

11

上述代码的作用就是对数组数据进行相加处理,最终输出数据为15

count()

方法:

public final Single<Long> count() 

1

作用: 

统计要发送事件的总数 

代码:

        Observable.just(1,1,2,2).count().subscribe(new Consumer<Long>() {

            @Override

            public void accept(Long aLong) throws Exception {

                Log.e("---",aLong+"");

            }

        });

1

2

3

4

5

6

运行结果为:

E/---: 4

1

collect()

方法:

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

1

作用: 

收集数据到一个可变的数据结构中 

代码:

        Observable.just("1","2","3","2")

                .collect(new Callable<List<Integer>>() { //创建数据结构

                    @Override

                    public List<Integer> call() throws Exception {

                        return new ArrayList<Integer>();

                    }

                }, new BiConsumer<List<Integer>, String>() {//收集器

                    @Override

                    public void accept(List<Integer> integers, String s) throws Exception {

                        integers.add(Integer.valueOf(s));

                    }

                }).subscribe(new Consumer<List<Integer>>() {

            @Override

            public void accept(List<Integer> integers) throws Exception {

                Log.e("---",integers+"");

            }

        });

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

打印结果为:

E/---: [1, 2, 3, 2]