天天看点

10章 RxJava源码分析

本篇文章已授权微信公众号

YYGeeker

独家发布转载请标明出处

CSDN学院课程地址
  • RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10036
  • RxJava2从入门到精通-中级篇:https://edu.csdn.net/course/detail/10037
  • RxJava2从入门到精通-进阶篇:https://edu.csdn.net/course/detail/10038
  • RxJava2从入门到精通-源码分析篇:https://edu.csdn.net/course/detail/10138

10. RxJava源码分析

RxJava源码分析最主要的点在于

  • RxJava是如何从事件流的发送方发送到事件流的接收方的
  • RxJava是如何对操作符进行封装和操作的
  • RxJava是如何随意切换线程的

在分析的过程中,部分源码分析我们会通过手写RxJava的部分代码进行分析,当然也会结合实际RxJava的代码进行分析。其中,手写RxJava的原因是为了简化源代码,让读者方便阅读到主要代码,更快的看懂RxJava的实现思路。在阅读源码之前,我们需要对RxJava的大体概念进行简单的梳理

  • 发射器:Emitter,发射数据的对象
  • 被观察者:Observable,被观察的对象
  • 观察者:Observer,观察的对象
  • 被观察者被订阅时:ObservableOnSubscribe,被订阅时的回调,同时创建出发射器
  • 释放者:Disposable,释放RxJava的对象

RxJava的分析三步骤

  • 创建:被观察者创建的过程
  • 订阅:被观察者订阅观察者的过程
  • 发射:发射器发射的过程

RxJava原理图解

  • 第一排表示各个对象的创建关系,A->B->C->D
  • 第二排表示各个对象的订阅关系,D->C->B->A
  • 第三排表示各个对象的发射关系,A->B->C->D
10章 RxJava源码分析

10.1 RxJava的事件发射原理

知识点:

  • 理解发射数据的过程
  • 理解接收数据的过程
以下是手写RxJava的代码
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(Emitter<String> emitter) {
        emitter.onNext("Hello RxJava");
        emitter.onError();
        emitter.onNext("Hello RxJava");
    }
}).subscribe(new Observabler<String>() {
    @Override
    public void onSubscribe() {
        
    }

    @Override
    public void onNext(String string) {
        System.out.println("onNext=" + string);
    }

    @Override
    public void onError() {
        System.out.println("onError");
    }

    @Override
    public void onComplete() {

    }
});
           

输出结果:在输出onError后,就不会继续收到新的事件流,表示事件已经被释放了

onNext=Hello RxJava
onError
           

1、定义接口

发射器

public interface Emitter<T> {
    void onNext(T t);
    void onError();
}
           

观察者

public interface Observer<T> {
    void onSubscribe();
    void onNext(T t);
    void onError();
    void onComplete();
}
           

被观察者被订阅时

public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter);
}
           

2、实现被观察者

被观察者Observable负责创建、订阅,发射由发射器负责

  • 创建:创建的过程只是将传递进来的参数交给新的ObservableCreate进行管理
  • 订阅:订阅的过程只是实现创建出来的ObservableCreate的subscribeActual方法
public abstract class Observable<T> {

    public static <T> ObservableCreate create(ObservableOnSubscribe<T> observableOnSubscribe) {
        return new ObservableCreate<T>(observableOnSubscribe);
    }

    public void subscribe(Observer<T> observer) {
        subscribeActual(observer);
    }

    public abstract void subscribeActual(Observer<T> observer);
}
           

3、ObservableCreate

ObservableCreate继承自Observable,由于Observable.create返回当前ObservableCreate,所以在subscribe的时候,走的是这里的subscribeActual,subscribeActual中会去创建发射器,并给发射器传递进去observer

public class ObservableCreate<T> extends Observable{

    private ObservableOnSubscribe source;

    public ObservableCreate(ObservableOnSubscribe observableOnSubscribe) {
        this.source = observableOnSubscribe;
    }

    @Override
    public void subscribeActual(Observer observer) {
        //固定的三步曲分析法(个人创建,基本都是这个步骤)
        
        //1、创建发射器
        EmitterCreate<T> emitterCreate = new EmitterCreate<>(observer);
        //2、回调observer的onSubscribe
        observer.onSubscribe();
        //3、回调上一个的subscribe
        source.subscribe(emitterCreate);
    }
}
           

4、EmitterCreate

传递进来的observer即是我们最开始订阅时候new出来的,此时发射数据,就会去调用Observer的onNext方法,这样数据就从发射器中传递到观察者中了。DisposableHelper在后面会讲到,这里只是用作判断是否被释放的一个工具类

public class EmitterCreate<T>
        extends AtomicReference<Disposable>
        implements Emitter<T>, Disposable {

    private Observer<T> observer;

    public EmitterCreate(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError() {
        if (!isDisposed()) {
            try {
                observer.onError();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}
           
以下是RxJava源代码

1、Observable.create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");//判空
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//返回自身
}
           

RxJavaPlugins.onAssembly只是对传递进来的参数做判断处理,最终还是返回ObservableCreate,有关RxJavaPlugins的东西最终都是返回自身,RxJavaPlugins后面分析会说到,这里只需要知道他是返回参数本身即可

2、Observable.subscribe

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");//判空
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);//返回自身

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);//回调ObservableCreate的subscribeActual
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}
           

observable.subscribe和我们手写代码一样,最终调用的是ObservableCreate的subscribeActual方法

3、ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1、创建发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2、回调observer的onSubscribe
        observer.onSubscribe(parent);

        try {
            //3、回调ObservableOnSubscribe的subscribe
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}
           

ObservableCreate和我们手写代码一样,创建发射器,并在发射器中做发射数据等操作

小结

如图所示

10章 RxJava源码分析

10.2 RxJava的事件释放原理

知识点:

  • 理解释放事件的原理

有关RxJava的释放原理是基于

Observable

可以返回

Disposable

对象,只有调用

dispose()

才能释放事件,通过上面的例子,我们知道在发射器里面有

isDisposed()

dispose()

操作,在发射完

onError

事件的情况下,我们会将事件释放,所以在finally会做释放操作,防止后面的事件再次发射

以下是手写RxJava的代码
public class EmitterCreate<T>
        extends AtomicReference<Disposable>
        implements Emitter<T>, Disposable {

    private Observer<T> observer;

    public EmitterCreate(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError() {
        if (!isDisposed()) {
            try {
                observer.onError();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}
           
以下是RxJava源代码
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
}
           

可以发现事件的释放都是通过

DisposableHelper

去触发的,不管是手写RxJava还是源代码,释放RxJava都是通过

DisposableHelper

进行释放,具体看

DisposableHelper

。在我们的演示程序中,我们通过发射

onNext->onError->onNext

的过程,去挖掘事件是怎么被释放掉的

public enum DisposableHelper implements Disposable {
    
    DISPOSED
    ;

    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();//获取参数的Disposable对象
        Disposable d = DISPOSED;//声明一个已经释放的Disposable对象
        if (current != d) {//如果当前未被释放
            current = field.getAndSet(d);//则将当前的Disposable赋值成已经释放过的Disposable对象
            if (current != d) {//如果当前还未被释放
                if (current != null) {//且不为空
                    current.dispose();//则释放当前Disposable对象
                }
                return true;
            }
        }
        return false;
    }
}
           

在事件释放的过程中,

EmitterCreate

本身是个

AtomicReference<Disposable>

,代码通过

get()

去获取

Disposable

对象,其中代码会通过双层判断去做释放,防止在多线程的时候出现抢夺的情况

  • onNext:第一次发射数据时,

    get()

    会获取一个null对象,所以不符合

    d == DISPOSED

  • onEroor:这时候会调用

    dispose()

    去比较当前和释放过的对象,如果不等于,则将当前的对象设置为释放过的值
  • onNext:第二次发射数据时,

    get()

    会获取一个已经释放过的对象,这个时候符合

    d == DISPOSED

其实这里的操作如同设置一个Flag,但由于

Disposable

是对象的形式,且需要保证原子性,

AtomicReference

类型是个最佳选择,能保证对象的原子性

10.3 RxJava的背压原理

知识点:

  • 理解背压实现的本质
  • 理解背压数据项丢弃的本质

背压原理有一部分和RxJava事件发射原理相似,其背压的过程就是在不同策略的发射器去处理当前的数据项而已。在分析背压策略的时候,我们都知道背压是需要手动进行请求才可以将数据发射到观察者中,所以我们会调用

s.request(Long.MAX_VALUE)

让观察者能接收到数据。有些人就会有疑问,为什么有些人平时用背压的时候,不需要去调用

request()

就能接收到数据,原因是有些背压已经在内部默认调用了

s.request(Long.MAX_VALUE)

,所以这里是不用多想的,是一定要调用

s.request(Long.MAX_VALUE)

才能收到数据的。由于不同背压的策略的原理大同小异,主要以Drop策略去分析背压的原理

public static void drop(View view) {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
           
以下是RxJava源代码

1、Flowable.create

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");//判空
    ObjectHelper.requireNonNull(mode, "mode is null");//判空
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));//返回自身
}
           

Flowable.create跟我们前面是一样的,最后还是会交给新对象FlowableCreate去处理

2、Flowable.subscribe

public final void subscribe(FlowableSubscriber<? super T> s) {
    ObjectHelper.requireNonNull(s, "s is null");
    try {
        Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

        ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");

        subscribeActual(z);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}
           

Flowable.subscribe跟我们前面是一样的,最终调用的是FlowableCreate的subscribeActual方法

3、FlowableCreate.subscribeActual

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        //使用三步曲分析法
        
        BaseEmitter<T> emitter;

        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}
           

subscribeActual会根据不同的策略生成不同的发射器,具体的所有策略逻辑都在发射器中体现的

4、DropAsyncEmitter

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

    private static final long serialVersionUID = 8360058422307496563L;

    DropAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    void onOverflow() {
        // nothing to do
    }

}
           

DropAsyncEmitter其实没做什么事情,主要都在其父类中实现了,onOverflow的回调表示事件流溢出的时候的处理,很明显Drop策略就把溢出的数据项直接不做处理,意思就是抛弃掉这个数据项了

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

    private static final long serialVersionUID = 338953216916120960L;

    ErrorAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    void onOverflow() {
        onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
    }

}
           

再看看Error策略,溢出之后就会抛出溢出的异常,其他策略也类似分析,具体父类是如何处理溢出函数的呢

5、NoOverflowBaseAsyncEmitter

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 4127754106204442833L;

    NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
    }

    @Override
    public final void onNext(T t) {
        if (isCancelled()) {
            return;
        }

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        
        //这里暂时将get()函数当作是类似于List这种的容器,存储的是当前需要处理的数据项
        if (get() != 0) { //从数据项容器中取值,如果当前有数据项需要处理
            actual.onNext(t); //发射数据项
            BackpressureHelper.produced(this, 1); //对当前存在需要处理的数据项进行-1操作
        } else {
            onOverflow(); //从数据项容器中取值,如果当前没有数据项需要处理,则回调溢出函数
        }
    }

    abstract void onOverflow();
}
           

NoOverflowBaseAsyncEmitter在发射数据项的时候,会去BaseEmitter中的数据项容器去取出数据项,如果存在则处理,不存在则表示溢出,回调溢出函数,那么具体的数据项容器时候怎么存储需要处理的数据项的呢

6、BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
    
    private static final long serialVersionUID = 7326289992464377023L;

    final Subscriber<? super T> actual;

    final SequentialDisposable serial;

    BaseEmitter(Subscriber<? super T> actual) {
        this.actual = actual;
        this.serial = new SequentialDisposable();
    }

    @Override
    public void onComplete() {
        complete();
    }

    protected void complete() {
        if (isCancelled()) {
            return;
        }
        try {
            actual.onComplete();
        } finally {
            serial.dispose();
        }
    }

    @Override
    public final void onError(Throwable e) {
        if (!tryOnError(e)) {
            RxJavaPlugins.onError(e);
        }
    }

    @Override
    public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            BackpressureHelper.add(this, n);
        }
    }
}
           

BaseEmitter就是一个AtomicLong,如果没学过AtomicLong的话,可以简单理解为一个计数器,get()就是获取当前的Long值,只要不等于0就表示有值。主要还是在

request()

request()

表示此时需要处理的数据项。结合上面NoOverflowBaseAsyncEmitter的中的

BackpressureHelper.produced(this, 1)

和当前BaseEmitter中的

BackpressureHelper.add(this, n)

,可得数据项的容器完全都是由BackpressureHelper去控制,我们只需要对BackpressureHelper的存储和获取做分析,就可以知道当前是否有数据项需要处理

7、BackpressureHelper

public final class BackpressureHelper {

    public static long add(AtomicLong requested, long n) {
        for (;;) {
            long r = requested.get(); //获取当前数据项
            if (r == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long u = addCap(r, n);//当前数据项 + 新增的数据项
            if (requested.compareAndSet(r, u)) { //设置最新的数据项
                return r;
            }
        }
    }
    
    public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {
            return Long.MAX_VALUE;
        }
        return u;
    }
    
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get(); //获取当前数据项
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long update = current - n; //当前数据项 - 需要发射的数据项(从源码上,n为1)
            if (update < 0L) { //不能为负数
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            if (requested.compareAndSet(current, update)) { //设置最新的数据项
                return update;
            }
        }
    }
}
           

BackpressureHelper就是利用AtomicLong的原子性就行简单的计数器操作而已,并没有什么复杂的操作。至此,我们就知道背压的原理原来就是利用AtomicLong计数器和生产消费的模式去决定是否发射当前的数据项而已

10.4 RxJava的常用操作符原理

知识点:

  • 理解map操作符的原理

RxJava常用操作符的代表就是map,分析map源码后,其他的操作符的思想是一样的,只不过是实现逻辑不一致而已。下面我们通过分析map的主要流程去分析map是如何转换字符串的,从上面我们知道Observable的创建、订阅、发射的过程,这次对于重复的内容就不再继续分析,主要是分析中间map是如何回调

apply()

去将数据项转换成字符串的

public void map() {
    //创建被观察者
    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                //默认在主线程里执行该方法
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("俊俊俊很帅");
                    e.onNext("你值得拥有");
                    e.onNext("取消关注");
                    e.onNext("但还是要保持微笑");
                    e.onComplete();
                }
            })
            .map(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    return "Hello";
                }
            })
            //创建观察者并订阅
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    if (!d.isDisposed()) {
                        d.dispose();
                    }
                }

                @Override
                public void onNext(String s) {
                    System.out.println("onNext=" + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onNext=" + e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });
}
           
以下是RxJava源代码

1、Observable.map

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
           

从create到map的过程中,create的时候,当前的Observable已经被转换成

ObservableCreate

,再次map的时候,

当前的Observable已经被转换成

ObservableMap

,而且在

ObservableMap

中传递的参数包含

this

,所以当前

ObservableMap

中是嵌套着

ObservableCreate

2、Observable.subscribe

由于当前的Observable是

ObservableMap

,所以Observable.subscribe会回调

ObservableMap

中的

subscribeActual

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));//source是传递进来的ObservableCreate
    }
}
           

ObservableMap

中的

subscribeActual

,会去调用

ObservableCreate

subscribe

方法,最后还是会去回调

ObservableCreate

subscribeActual

,不过这里在回调的过程中增加了一个参数

MapObserver

,这个参数只有在

ObservableCreate

发射器发射的时候才会被调用

3、ObservableCreate.subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
           

ObservableCreate.subscribeActual

中,会接收一个

Observer

的参数,这个时候的

Observer

的参数是从

ObservableMap

中传递过来的

MapObserver

,当

CreateEmitter

发射

onNext

的时候,就会在当前的

MapObserver

对象

onNext

进行处理

4、MapObserver.onNext

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }

        U v;

        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        actual.onNext(v);
    }
    
    ......
}
           

onNext

主要做了两个事情,一个是

mapper.apply(t)

,这个就是

map

操作符所实现的方法,这里就将原来的值转换成新的值,一个是

actual.onNext(v)

,将转换出来的新值

v

继续

onNext

出去,这里的

actual

就是在构造函数中传递进来的

ObservableCreate

,这里就已经将数据项经过map的操作符后继续执行后面正常的发射流程

小结

如图所示

10章 RxJava源码分析

10.5 RxJava的线程切换原理

知识点:

  • 理解在工作线程上为什么能执行耗时操作
  • 理解在UI线程为什么能执行更新UI的操作

沿用上面的例子,在线程切换的过程中,无非就是相当于不同的操作符继续操作数据项而已,根本的实现思路和map等操作符是一样的,也是通过嵌套Observable的过程来执行的,只不过是线程切换的操作符内部实现的逻辑有区别而已。通过我们以往的思路去想,这两个知识点无非就是启动线程池去执行耗时任务,而UI线程则是交给Handler去处理,RxJava线程切换的原理就是这样的

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            //默认在主线程里执行该方法
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("俊俊俊很帅");
                e.onNext("你值得拥有");
                e.onNext("取消关注");
                e.onNext("但还是要保持微笑");
                e.onComplete();
            }
        })
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "Hello";
            }
        })
        //将被观察者切换到子线程
        .subscribeOn(Schedulers.io())
        //将观察者切换到主线程  需要在Android环境下运行
        .observeOn(AndroidSchedulers.mainThread())
        //创建观察者并订阅
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                if (!d.isDisposed()) {
                    d.dispose();
                }
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext=" + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onNext=" + e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });
           

基础概念:

  • Schedulers:调度器的管理者。管理着多种不同种类的Scheduler
  • Scheduler:调度器。负责线程Worker的创建

    createWorker()

    ,调度Worker的执行

    schedule()

  • Worker:抽象的工作线程。被线程调度器管理,负责线程的创建和执行

在源码中,我们需要先熟悉这三者之间的关系到底是如何运作的

以下是RxJava源代码

1、observeOn()

@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> observeOn(Scheduler scheduler) {
    return this.observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport("custom")
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
           

当前的Observable已经被转换成

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    protected void subscribeActual(Observer<? super T> observer) {
        if (this.scheduler instanceof TrampolineScheduler) {
            this.source.subscribe(observer);
        } else {
            //1、创建工作线程
            Worker w = this.scheduler.createWorker();
            //2、订阅之后,在发射的过程中
            this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
        }

    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Worker worker;
        final boolean delayError;
        final int bufferSize;
        SimpleQueue<T> queue;
        
        public void onNext(T t) {
            if (!this.done) {
                if (this.sourceMode != 2) {
                    this.queue.offer(t);
                }
                //3、在OnNext中执行
                this.schedule();
            }
        }

        void schedule() {
            if (this.getAndIncrement() == 0) {
                //4、执行工作线程
                this.worker.schedule(this);
            }
        }
    }
}
           

其三者的关系简单的说就是在每次订阅的时候,都会去创建出对应的工作线程,这个工作线程取决于你传递的参数是哪个Worker,在发射器发射的过程中,这个工作线程总会去执行它的回调

schedule

,其实大部分的操作就是在

schedule

里面执行线程。搞懂了三者的关系之后,分析线程切换就简单多了,就相当于工厂一样,给个具体的任务给到具体的工人去执行,很像工厂的流水线,我们已经确定下来了流水线的流程了,这个时候我们就需要去关心参数具体是什么东西了。在阅读subscribeOn、observeOn前,我们先看看这两个方法中的参数都是什么

1、Schedulers.io()

public final class Schedulers {

    @NonNull
    static final Scheduler IO;
    
    static {
        
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
        
        //1、在初始化的时候就构建出了IOTask,initIoScheduler会去执行IOTask的call方法
        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //2、IOTask的call方法会去获取IoHolder的值
            return IoHolder.DEFAULT;
        }
    }
    
    static final class IoHolder {
        //3、创建IoScheduler
        static final Scheduler DEFAULT = new IoScheduler();
    }
    
    public static Scheduler io() {
        //Schedulers.io():它会去获取前面3步创建出来的IoScheduler对象
        return RxJavaPlugins.onIoScheduler(IO); //返回IO自身
    }
}
           

其正在实现在IoScheduler,其表示管理Io线程的管理者

public final class IoScheduler extends Scheduler {
    
    final AtomicReference<CachedWorkerPool> pool;
    
    static final CachedWorkerPool NONE;
    static {
        ......
        //1、创建CachedWorkerPool
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    }
    
    static final class CachedWorkerPool implements Runnable {
        
        ......
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        
        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            ......
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
    }
    
    @NonNull
    @Override
    public Worker createWorker() {
        //2、创建具体的线程
        return new EventLoopWorker(pool.get());
    }
    
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //3、最终会去调用ThreadWorker的scheduleActual
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    //4、由于ThreadWorker没有scheduleActual,在父类中找NewThreadWorker
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
}
           

NewThreadWorker,最终还是调用

executor.submit()

executor.schedule()

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
           

2、AndroidSchedulers.mainThread()

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}
           

返回一个HandlerScheduler,创建单例模式的主线程Handler

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) {
            throw new NullPointerException("run == null");
        } else if (unit == null) {
            throw new NullPointerException("unit == null");
        } else {
            run = RxJavaPlugins.onSchedule(run);
            HandlerScheduler.ScheduledRunnable scheduled = new HandlerScheduler.ScheduledRunnable(this.handler, run);
            this.handler.postDelayed(scheduled, unit.toMillis(delay));
            return scheduled;
        }
    }

    public Worker createWorker() {
        //创建具体工作线程
        return new HandlerScheduler.HandlerWorker(this.handler);
    }

    ......
}
           

就好像我们上面分析的三者关系一样,Schedule最终还是会管理着具体的工作线程

private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);
        //包装新的Runnable交给Handler
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacksAndMessages(this /* token */);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}
           

3、subscribeOn()

理解完参数后,回到我们的分析重点

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
           

subscribeOn就如同普通操作符一样,包装一层

ObservableSubscribeOn

,在subscribe的时候真正走的还是

subscribeActual

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //使用三步曲分析法
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
        //3、将第三步的内容放到线程中去执行
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            //3、回调ObservableOnSubscribe的subscribe
            source.subscribe(parent);
        }
    }
}
           

scheduler.scheduleDirect

中会去执行Scheduler里的方法,这里的scheduler就是IoScheduler

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
           

回调IoScheduler的

createWorker()

并执行

w.schedule()

小结

如图所示

10章 RxJava源码分析

10.6 RxJava的自定义Operator原理

知识点:

  • 自定义Operator是如何实现的

在讲解之前,让我们先回味下自定义Operator

public class CustomOperator implements ObservableOperator<String, List<String>> {

    @Override
    public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
        return new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {
                observer.onSubscribe(d);
            }

            @Override
            public void onNext(List<String> strings) {
                observer.onNext(strings.toString());
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onComplete() {
                observer.onComplete();
            }
        };
    }
}
           
Observable.create(new ObservableOnSubscribe<List<String>>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
      
    }
}).lift(new CustomOperator())
           

自定义Operator如同普通的操作符原理差不多,用的是

lift

的操作符,只不过在

lift

里面将逻辑的执行回调到自定义的Operator的

apply()

以下是RxJava源代码

1、Observable.lift

public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
    ObjectHelper.requireNonNull(lifter, "onLift is null");
    return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}
           

2、ObservableLift.subscribeActual

public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
    /** The actual operator. */
    final ObservableOperator<? extends R, ? super T> operator;

    public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
        super(source);
        this.operator = operator;
    }

    @Override
    public void subscribeActual(Observer<? super R> s) {
        Observer<? super T> observer;
        try {
            observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Disposable already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }

        source.subscribe(observer);
    }
}
           

可以看到代码非常快的就将传递进来的参数

operator

执行

apply()

10.7 RxJava的自定义Transformer原理

知识点:

  • 自定义Transformer是如何实现的

在讲解之前,让我们先回味下自定义Transformer

public class NetWorkTransformer implements ObservableTransformer {

    @Override
    public ObservableSource apply(Observable upstream) {
        return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }
}
           
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        
    }
}).compose(new CustomTransformer())
           

自定义Transformer如同普通的操作符原理差不多,用的是

compose

的操作符,只不过在

compose

里面将逻辑的执行回调到自定义的Transformer的

apply()

以下是RxJava源代码

1、Observable.compose

public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
    return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
           

可以看到代码非常快的就将传递进来的参数

composer

执行

apply()

,这里的

wrap()

只是将代码裹了一层,如果你想简单的理解的话,可以理解为作者的强迫症犯了,只是为了让所有代码看起来都比较规范,不然这里实在和其他操作符的实现不一样,我们可以追进去

wrap()

public static <T> Observable<T> wrap(ObservableSource<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    if (source instanceof Observable) {
        return RxJavaPlugins.onAssembly((Observable<T>)source);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
           

wrap()

其实是对

composer

操作符做了Hook,因为所有操作符都会被RxJava去Hook住,这里会在下面讲到自定义Plugin原理的时候就明白了

10.8 RxJava的自定义Plugin原理

知识点:

  • 自定义Plugin是如何实现AOP的

在讲解之前,让我们先回味下自定义Plugin

public class CustomObservableAssembly implements Function<Observable, Observable> {
    @Override
    public Observable apply(Observable observable) throws Exception {
        System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
        return observable;
    }
}
           
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());
           

在自定义Plugin中,类似于Android的术语Hook,但在这里并不是真正的Hook,而是作者在写RxJava的时候去限定一套规范,让后面的所有操作符或其他操作等,都可以实现Hook的原理

以下是RxJava源代码

1、RxJavaPlugins.setOnObservableAssembly

public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
           

RxJavaPlugins.setOnObservableAssembly

只是对成员变量设置了自定义的值,这个时候

onObservableAssembly

就有了值,默认是为null的。设置完值就表示已经Hook成功了,当操作符执行的时候,是如何回调我们Hook的函数的

2、Observable.create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
           

create相当于一个操作符,在每个操作符的里面都会去执行一段

RxJavaPlugins.onAssembly

,这里就是RxJava规定的规范,一开始我们只是说返回自身,但是有了Hook之后,就会回调Hook函数,返回已经经过二次加工的自身

3、RxJavaPlugins.onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
           

由于我们已经设置了新值,这里的

onObservableAssembly

就不为null,不为null则执行

apply()

apply()

就是我们Hook传进去参数的回调方法

10.9 美团WhiteBoard

美团的WhiteBoard其实是取自美团的开源框架

Shield——开源的移动端页面模块化开发框架

中的代码,其主要作用是应用RxJava的Subject搭起组件间通讯的桥梁。实质上在WhiteBoard中,是将所有的组件的数据和Subject通讯的桥梁保存起来,通过key作为组件的唯一标志。不过比较可惜的是WhiteBoard使用的是RxJava1,不过关系不大,只要读懂里面的源码即可

1、WhiteBoard的初始化

初始化放在Activity/Fragment界面中,相当于通讯的桥梁,每个界面中仅有一个WhiteBoard的实例,并由所有组件共用

public abstract class ShieldFragment extends Fragment implements AgentCellBridgeInterface, DriverInterface {
    
    static final String TAG = ShieldFragment.class.getSimpleName();
    ......
    protected WhiteBoard whiteBoard;

    public ShieldFragment() {
        this.whiteBoard = new WhiteBoard();//初始化
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        ......
        whiteBoard.onCreate(savedInstanceState);//对应生命周期
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        ......
        whiteBoard.onDestory();//对应生命周期
    }

    @Override
    public void onSaveInstanceState(Bundle outState) {
        super.onSaveInstanceState(outState);
        ......
        whiteBoard.onSaveInstanceState(outState);//对应生命周期
    }

    @Override
    public WhiteBoard getWhiteBoard() {
        return whiteBoard;//获取实例
    }
}
           

2、WhiteBoard监听通知

组件只监听某个key的事件,有通知的时候就能收到

public class MixCellAgent extends LightAgent {
    private MixCell mixCell;
    private Subscription loadingSubscription;
    private Subscription emptySubscription;

    public MixCellAgent(Fragment fragment, DriverInterface bridge, PageContainerInterface pageContainer) {
        super(fragment, bridge, pageContainer);
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        mixCell = new MixCell(getContext(), this);
        loadingSubscription = getWhiteBoard().getObservable(MixLoadingAgent.KEY_LOADING).filter(new Func1() {
            @Override
            public Object call(Object o) {
                return o instanceof Boolean && ((Boolean) o);
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                loading();
            }
        });

        emptySubscription = getWhiteBoard().getObservable(MixLoadingAgent.KEY_EMPTY).filter(new Func1<Object, Boolean>() {
            @Override
            public Boolean call(Object o) {
                return o instanceof Boolean && ((Boolean) o);
            }
        }).subscribe(new Action1() {
            @Override
            public void call(Object o) {
                mixCell.onEmpty();
            }
        });
        
        ......
    }

    @Override
    public void onDestroy() {
        if (loadingSubscription != null) {
            loadingSubscription.unsubscribe();
            loadingSubscription = null;
        }

        if (emptySubscription != null) {
            emptySubscription.unsubscribe();
        }
        
        ......
    }
}
           

3、WhiteBoard的发送通知

WhiteBoard发送通知就是调用WhiteBoard提供的所有put方法,具体是如何收到消息的,还需要通过WhiteBoard的源码看下

public class MixLoadingAgent extends LightAgent implements MixLoadingCell.MixLoadingListener {
    public static final String KEY_LOADING = "loading";
    public static final String KEY_EMPTY = "empty";
    public static final String KEY_FAILED = "failed";
    public static final String KEY_MORE = "more";
    public static final String KEY_DONE = "done";

    private MixLoadingCell mixLoadingCell;

    public MixLoadingAgent(Fragment fragment, DriverInterface bridge, PageContainerInterface pageContainer) {
        super(fragment, bridge, pageContainer);
        mixLoadingCell = new MixLoadingCell(getContext());
        mixLoadingCell.setOnMixLoadingListener(this);
    }

    @Override
    public SectionCellInterface getSectionCellInterface() {
        return mixLoadingCell;
    }

    @Override
    public void onLoading() {
        getWhiteBoard().putBoolean(KEY_LOADING, true);
    }

    @Override
    public void onEmpty() {
        getWhiteBoard().putBoolean(KEY_EMPTY, true);
    }

    @Override
    public void onFailed() {
        getWhiteBoard().putBoolean(KEY_FAILED, true);
    }

    @Override
    public void onMore() {
        getWhiteBoard().putBoolean(KEY_MORE, true);
    }

    @Override
    public void onDone() {
        getWhiteBoard().putBoolean(KEY_DONE, true);
    }
}
           

4、WhiteBoard的原理

最后只需要获取实例后发送通知即可,

getWhiteBoard().putBoolean(key)

。WhiteBoard原理是只要还是Subject的桥梁的作用

public class WhiteBoard {

    public static final String WHITE_BOARD_DATA_KEY = "White_Board_Data";
    protected Bundle mData;//保存所有组件的数据
    protected HashMap<String, Subject> subjectMap;//保存所有组件的通讯桥梁

    public WhiteBoard() {
        this(null);
    }

    public WhiteBoard(Bundle data) {
        mData = data;
        if (mData == null) {
            mData = new Bundle();//初始化
        }

        subjectMap = new HashMap<>();//初始化
    }

    public void onCreate(Bundle savedInstanceState) {
        if (savedInstanceState != null) {
            mData = savedInstanceState.getBundle(WHITE_BOARD_DATA_KEY);
        }

        if (mData == null) {
            mData = new Bundle();
        }
    }

    public void onSaveInstanceState(Bundle outState) {
        if (outState != null) {

            // here we must save a new copy of the mData into the outState
            outState.putBundle(WHITE_BOARD_DATA_KEY, new Bundle(mData));
        }
    }

    public void onDestory() {
        subjectMap.clear();
        mData.clear();
    }

    //通过key获取某组件的桥梁
    public Observable getObservable(final String key) {

        Subject res = null;
        if (subjectMap.containsKey(key)) {
            res = subjectMap.get(key);
        } else {
            res = PublishSubject.create();
            subjectMap.put(key, res);
        }
        if (getData(key) != null) {
            return res.startWith(getData(key));//带上已经存储过的数据
        } else {
            return res;
        }
    }

    //通过key通知某组件
    protected void notifyDataChanged(String key) {
        if (subjectMap.containsKey(key)) {
            subjectMap.get(key).onNext(mData.get(key));
        }
    }

    //移除组件中的数据
    public void removeData(String key) {
        mData.remove(key);
        notifyDataChanged(key);
    }
    
    //每次put值的时候,就会去通知对应的组件
    public void putBoolean(@Nullable String key, boolean value) {
        mData.putBoolean(key, value);
        notifyDataChanged(key);
    }
    public void putInt(@Nullable String key, int value) {
        mData.putInt(key, value);
        notifyDataChanged(key);
    }
    public void putString(@Nullable String key, @Nullable String value) {
        mData.putString(key, value);
        notifyDataChanged(key);
    }
    ......

    public double getDouble(String key) {
        return mData.getDouble(key);
    }
    public String getString(String key, String defaultValue) {
        return mData.getString(key, defaultValue);
    }
    ......
}