天天看点

RxJava 实例1.定义2.作用3. 特点4.使用5.优雅实现其他方法observeOn:观察者的执行线程subscribeOn:被观察者的执行线程RXView 防抖

1.定义

RxJava

 是一个 基于事件流、实现异步操作的库

2.作用

用于实现异步操作,类似于 

Android

中的 

AsyncTask

 、

Handler+

new Thread的作用

3. 特点

由于 

RxJava

的使用方式是:基于事件流的链式调用,所以使得 

RxJava

  • 逻辑简洁
  • 实现优雅
  • 使用简单

更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅

  • Rxjava

    原理 基于 一种扩展的观察者模式
  • Rxjava

    的扩展观察者模式中有4个角色:
角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式

4.使用

导包

implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
           

RxJava 实例1.定义2.作用3. 特点4.使用5.优雅实现其他方法observeOn:观察者的执行线程subscribeOn:被观察者的执行线程RXView 防抖

 1.定义被观察者

三种方式:Observable.create、Observable.just、Observable.fromArray

public void buyFood(){
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        });

        Observable<String> observable1 = Observable.just("potato","tomato","noodles");
        String [] foods = new String[]{"potato","tomato","noodles"};
        Observable<String> observable2 = Observable.fromArray(foods);
    }
           

2.定义观察者

使用observer或者subscriber

Observer<String> mObserver = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull String s) {

        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
           
Subscriber<String> mStringSubscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            
        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    };
           

特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别 

// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)

// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:

    // 1\. onStart():在还未响应事件前调用,用于做一些初始化工作

    // 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件

    // 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露

3.订阅

observable.subscribe(observer);

5.优雅实现

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
           

其他方法

observeOn:观察者的执行线程

observeOn 作用于该操作符之后直到出现新的observeOn操作符

subscribeOn:被观察者的执行线程

subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制

实例

//将字符串转换成double类型
    String path = "12.3";
    public void test(){
        //输入事件
        Observable.just(path)
                //对事件进行处理
                .map(new Function<String, Double>() {
            @Override
            public Double apply(@NonNull String s) throws Exception {
                return Double.parseDouble(s);
            }
        })
                //指定被观察者执行线程
                .subscribeOn(Schedulers.io())
                //指定观察者执行线程
                .observeOn(AndroidSchedulers.mainThread())
                //订阅观察者
                .subscribe(
                new Observer<Double>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Double aDouble) {

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
           

Rxjava中的

Scheduler

相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()

另外,Android还有一个专用的

AndroidSchedulers.mainThread()

指定操作将在Android主线程运行。Rxjava通过

subscribeOn()

observeOn()

两个方法来对线程进行控制,

subscribeOn()

指定

subscribe()

时间发生的线程,也就是事件产生的线程,

observeOn()

指定

Subscriber

做运行的线程,也就是消费事件的线程。

Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和

newThread()

差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存

Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。

Schedulers.immediate()  这个调度器允许你立即在当前线程执行你指定的工作。这是默认的

Scheduler

Schedulers.newThread()    它为指定任务启动一个新的线程。

Schedulers.trampoline()

当我们想在当前线程执行一个任务时,并不是立即,我们可以用

trampoline()

将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。

RXView 防抖

private void test3() {
        RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
                Log.e(TAG, "onNext: 响应事件....." );
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
           

RxJava篇(应用场景) - 简书 (jianshu.com)