天天看点

RxJava最简单易懂最全面的总结(一)

学习RxJava之前必须先来看下Java的设置模式之一:观察者模式(Observer Pattern)。

原因:RxJava的都是基于观察者模式构建的。

关于观察者模式论述总结:

意图:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

主要解决:一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

何时使用:一个对象(目标对象,被观察者)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。

如何解决:使用面向对象技术,可以将这种依赖关系弱化。

关键代码:在抽象类里有一个 ArrayList 存放观察者们。

下面给出传统观察者模式代码:

代码目录结构很简单,被观察者(Observable)和观察者(Observer)各有一个接口和具体的(Concrete)实现类:

RxJava最简单易懂最全面的总结(一)

被观察者接口:

package com.xw.observerpatterndemo.observable;

import com.xw.observerpatterndemo.observer.Observer;

public interface Observable {
    /**
     * 将观察者注册到被观察者的List集合中
     * @param observer
     */
    public void registerObserver(Observer observer);

    /**
     * 将观察者从被观察者的List集合中删除
     * @param observer
     */
    public void removeObserver(Observer observer);

    /**
     * 当被观察者状态发生改变时通知观察者进行相应的更新(通过观察者对象调用观察者的update()方法)
     */
    public void notifyObservers();
}
           

被观察者的具体实现类:

package com.xw.observerpatterndemo.observable;

import java.util.ArrayList;
import java.util.List;
import com.xw.observerpatterndemo.observer.Observer;

public class ConcreteObservable implements Observable {
    private List<Observer> mObservers;
    private int edition;
    private float cost;
    public ConcreteObservable() {
       mObservers = new ArrayList<>();
    }
    @Override
    public void registerObserver(Observer observer) {
        mObservers.add(observer);
    }
    @Override
    public void removeObserver(Observer observer) {
        int i = mObservers.indexOf(observer);
        if(i>=0){
            mObservers.remove(i);
        }
    }
    @Override
    public void notifyObservers() {
       for(int i=0 ; i < mObservers.size();i++){
           Observer observer = mObservers.get(i);
           observer.update(edition,cost);
       }
    }
    
    //当数据改变时,调用通知方法
    public void setInfomation(int edition,float cost){
        this.edition = edition;
        this.cost =cost;
        notifyObservers();
    }
}
           

观察者(Observer)接口:

package com.xw.observerpatterndemo.observer;

public interface Observer {
    /**
     * 同步更新观察者的状态
     * @param edition
     * @param cost
     */
    public void update(int edition,float cost);
}
           

观察者具体实现类(ConcreteObserver):

package com.xw.observerpatterndemo.observer;
import android.util.Log;

public class ConcreteObserver implements Observer {
    private int edition;
    private float cost;
    private String name;

    public ConcreteObserver(String name) {
        this.name = name;
    }
    
    @Override
    public void update(int edition, float cost) {
       this.edition = edition;
       this.cost = cost;
       buy();
    }

    private void buy() {
        Log.v("ConcreteObserver",name+"购买了"+edition+"期杂志,花了"+cost+"元");
    }
}
           

下面是我用Android studio对上面的类的调用(你可以用eclipse),在MainActivity中:

package com.xw.observerpatterndemo;

import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;

import com.xw.observerpatterndemo.observable.ConcreteObservable;
import com.xw.observerpatterndemo.observer.ConcreteObserver;

public class MainActivity2 extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main2);
        //创建被观察者
        ConcreteObservable concreteObservable = new ConcreteObservable();
        //创建3个观察者
        ConcreteObserver observerA = new ConcreteObserver("A");
        ConcreteObserver observerB = new ConcreteObserver("B");
        ConcreteObserver observerC = new ConcreteObserver("C");

        concreteObservable.registerObserver(observerA);
        concreteObservable.registerObserver(observerB);
        concreteObservable.registerObserver(observerC);
        //concreteObservable.removeObserver(observerA);

        concreteObservable.setInfomation(8,28);
    }
}
           

上面被观察者代码中,一旦它的数据状态发生改变concreteObservable.setInfomation(8,28);上面定义的三个观察者都会随着它更新数据状态,后台输出的结果:

2020-06-01  4267-4267/? V/ConcreteObserver: A购买了8期杂志,花了28.0元
2020-06-01  4267-4267/? V/ConcreteObserver: B购买了8期杂志,花了28.0元
2020-06-01  4267-4267/? V/ConcreteObserver: C购买了8期杂志,花了28.0元
           

熟悉了观察者模式之后再来看RxJava的定义和使用就比较顺其自然了,RxJava的使用通常分为三步:

(1) 创建Observable:

Observable 的字面意思是被观察者,使用 Rxjava时需要创建一个被观察者,它会决定什么时候触发事件以及触发怎样的事件。有点类似上游发送命令,可以在这里决定异步操作模块的顺序和异步操作模块的次数。

(2) 创建 Observer:

Observe :即观察者,它可以在不同的线程中执行任务。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者,可以在未来某个时刻响应被观察者(Observable)的通知,而不需要阻塞等待Observable发射来数据。

(3) 使用 subscribe()进行订阅,即订阅关系:

创建了Observable和 observer 之后,我们还需要使用 subscribe()方法将它们连接起来,这样整个上下游就能衔接起来实现链式调用,产生订阅关系。

上面的三步也进一步说明了RxJava的四要素:

① 被观察者Observable;② 观察者Observer; ③ 订阅关系 subscribe() ;④ 事件;

用一句话概括四者的关系就是:Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

在举例之前,我们需要知道几个知识点:

知识点一:常用的创建被观察者对象的方法方式:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
           

知识点二:创建方法中参数需要传递一个接口对象:ObservableOnSubscribe<T>,说明如下:

一个具有subscribe()方法的函数接口,该方法接收ObservableEmitter 实例,该实例允许以取消安全的方式推送事件。

你可以继承这个接口并重写相应的方法自定义自己的被观察者对象:

public class FriendMessageObservable implements ObservableOnSubscribe<List<FriendMessage>> {
   ....

    @Override
    public void subscribe(@NotNull ObservableEmitter<List<FriendMessage>> emitter) {
      ...
      ...
    }

   ....
}
           
A functional interface that has a subscribe() method that receives an instance of an ObservableEmitter instance that allows pushing events in a cancellation-safe manner.
Type parameters:
<T> – the value type pushed

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.//调用每个订阅的观察者。
     * @param emitter the safe emitter instance, never null //安全发射器实例,从不为空
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
           

知识点三:被观察者发射接口ObservableEmitter<T>,在subscribe(@NonNull ObservableEmitter emitter)方法中需要参入一个被观察者发射接口对象,解释说明如下:(这些解释是接口类中头部英文注释的翻译)

翻译:RxJava观察器上的抽象,允许将资源与其关联。

翻译:onNext(Object)、onError(Throwable)、tryOnError(Throwable)和onComplete()方法应该以顺序的方式调用,就像观察者的方法一样。如果要确保这一点(以顺序的方式调用),请使用serialize()方法返回的ObservableEmitter,而不是生成器例程提供的原始ObservableEmitter实例。其他方法是线程安全的。

翻译:发射器允许分别通过setDisposable(Disposable) or setCancellable(Cancellable)以一次性或可取消的形式注册单个资源。当下游的取消流或事件生成器逻辑调用onError(Throwable), onComplete() 或 tryOnError(Throwable)成功后,发射器实现将处理/取消此实例。

翻译:一次只能有一个一次性或可取消对象与发射器关联。调用任一设置方法都将处理/取消以前的任何对象。如果需要处理多个资源,可以创建io.reactivex.disposables.CompositeDisposable并将其与发射器关联。

翻译:Cancelable在逻辑上等同于Disposable,但允许使用可以引发选中异常的清理逻辑(例如Java IO组件上的许多close()方法)。由于资源的释放发生在终端事件被传递或序列被取消之后,因此在Cancelable中抛出的异常将通过io.reactivex.plugins.RxJavaPlugins.onError(Throwable)路由到全局错误处理程序。

Abstraction over an RxJava Observer that allows associating a resource with it.

The onNext(Object), onError(Throwable), tryOnError(Throwable) and onComplete() methods should be called in a sequential manner, 
just like the Observer's methods should be.
Use the ObservableEmitter the serialize() method returns instead of the original ObservableEmitter instance provided by the generator routine 
if you want to ensure this. The other methods are thread-safe.

The emitter allows the registration of a single resource, in the form of a Disposable or Cancellable via setDisposable(Disposable) 
or setCancellable(Cancellable) respectively. 
The emitter implementations will dispose/cancel this instance when the downstream(下游的) cancels the flow or after the event generator logic calls onError(Throwable), 
onComplete() or when tryOnError(Throwable) succeeds.


Only one Disposable or Cancellable object can be associated with the emitter at a time. 
Calling either set method will dispose/cancel any previous object. If there is a need for handling multiple resources, 
one can create a io.reactivex.disposables.CompositeDisposable and associate that with the emitter instead.

The Cancellable is logically equivalent to Disposable but allows using cleanup logic that can throw 
a checked exception (such as many close() methods on Java IO components). Since the release of resources happens after the terminal events have been delivered or the sequence gets cancelled, 
exceptions throw within Cancellable are routed to the global error handler via io.reactivex.plugins.RxJavaPlugins.onError(Throwable).

Type parameters:
<T> – the value type to emit

public interface ObservableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous {@link Disposable}
     * or {@link Cancellable} will be disposed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(@Nullable Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous {@link Disposable}
     * or {@link Cancellable} will be disposed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(@Nullable Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence or the
     * emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a
     * successful {@link #tryOnError(Throwable)}.
     * <p>This method is thread-safe.
     * @return true if the downstream disposed the sequence or the emitter was terminated
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    @NonNull
    ObservableEmitter<T> serialize();

    /**
     * Attempts to emit the specified {@code Throwable} error if the downstream
     * hasn't cancelled the sequence or is otherwise terminated, returning false
     * if the emission is not allowed to happen due to lifecycle restrictions.
     * <p>
     * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
     * if the error could not be delivered.
     * <p>History: 2.1.1 - experimental
     * @param t the throwable error to signal if possible
     * @return true if successful, false if the downstream is not able to accept further
     * events
     * @since 2.2
     */
    boolean tryOnError(@NonNull Throwable t);
}
           

知识点四:被观察者发射接口ObservableEmitter<T>又继承了Emitter<T>接口,它有三个抽象方法,正是通过这三个方法才可以完成消息数据的传递通知的:

import io.reactivex.annotations.NonNull;

/**
 * Base interface for emitting signals in a push-fashion in various generator-like source
 * operators (create, generate).
 *
 * @param <T> the value type emitted
 */
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}
           

知识点五:在订阅方法中创建观察者对象:subscribe(Observer<? super T> observer) ,观察者Observer接口中的源码定义如下:

package io.reactivex;

import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;


public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
           

下面简单的解释说明一下:

onNext() :普通事件,相当于 onClick() / onEvent(),所有的产生订阅关系的Observer(观察者)都能通过这个方法接收到传递过来的消息。

onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为结束标志。

onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个,否则会报错。

onSubscribe(Disposable d)里面的Disposable对象要说一下,Disposable英文意思是可随意使用的,用于管理订阅关系的,如果需要取消订阅则可以调用mDisposable.dispose()取消订阅关系。

或者使用简易版的Observer观察者接口,也就是消费者接口Consumer,它只有一个accept()抽象方法,可以自定义需要处理的信息,同样在订阅方法中创建消费者对象:

 subscribe(Consumer<? super T> onNext)

在被观察者接口Observable中订阅方法subscribe()有很多重载方法,这里不再贴出。

package io.reactivex.functions;

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}
           

下面,进入实例举例。按照上面的使用三部曲举一个简单的例子:

import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.widget.TextView;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;


public class MainActivity extends AppCompatActivity {
   private TextView textTips;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        textTips = findViewById(R.id.text_tips);

        //第一步:创建被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("我爱我的故乡!");
                emitter.onNext("那是我无数个夜晚醒来后魂牵梦萦的地方!");
                emitter.onNext("心与心虽远隔千里!");
                emitter.onNext("但与故乡确咫尺之间!");
                emitter.onComplete();
            }
        });

        //第二步:创建观察者
        Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.v("ssss",s);
                textTips.setText(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        //第三步:产生订阅关系:
        observable.subscribe(observer);
    }
}
           

运行程序,我还是在Android Studio中运行的,控制台输出结果如下:

2020-06-02  23617-23617/? V/ssss: 我爱我的故乡!
2020-06-02  23617-23617/? V/ssss: 那是我无数个夜晚醒来后魂牵梦萦的地方!
2020-06-02  23617-23617/? V/ssss: 心与心虽远隔千里!
2020-06-02  23617-23617/? V/ssss: 但与故乡确咫尺之间!
           

被观察者、观察者、订阅方法我们都直观的创建和调用了,那么什么是RxJava中所说的事件呢?

上述的例子中,在“第二步:创建观察者”的对象new Observer<String>() { ...}中,必须实现的四个方法onSubscribe(Disposable d) 、onNext(String s) 、 onError(Throwable e) 、onComplete(),后三者就是RxJava的三个事件。

再回过头来看一下被观察者的创建方式,在Observable.java中:

/**
     *......
     *......
     * @param <T> the element type
     * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
     * @return the new Observable instance
     * @see ObservableOnSubscribe
     * @see ObservableEmitter
     * @see Cancellable
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
           

create(ObservableOnSubscribe<T> source)方法用于创建一个被观察者,其中参数对象ObservableOnSubscribe<T> source 可以像上面的例子中那样给定一个匿名对象,而这个类ObservableOnSubscribe<T>的源码如下,它必须实现一个方法:

subscribe(@NonNull ObservableEmitter<T> emitter);

package io.reactivex;

import io.reactivex.annotations.*;

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
           

这个subscribe(@NonNull ObservableEmitter<T> emitter)方法中传递了一个ObservableEmitter<T>类型的发射器参数emitter,点进这个类型我们会看到这个类的继承关系: public interface ObservableEmitter<T> extends Emitter<T>{  },这个Emitter<T>是我们关心的,因为它实现了几个比较重要的方法,来看源码:

/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */
package io.reactivex;

import io.reactivex.annotations.NonNull;

/**
 * Base interface for emitting signals in a push-fashion in various generator-like source
 * operators (create, generate).
 *
 * @param <T> the value type emitted
 */
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}
           

是不是很惊讶,原来被观察者内部emitter.onNext()等方法是Emitter<T>接口定义的方法,而不是观察者Observer接口中定义的方法,这里是需要明确区分的,有的文章说在被观察者中调用的onNext()、onError()、onComplete()方法是观察者中定义的方法,应该是错的。所以在这里特别说一下。 

我们应该这样理解,在被观察者中发射器对象调用的onNext()、onError()、onComplete()发射信号的方法与观察者中接收信号的方法是一一对应的,就OK了!

emitter.onNext("我爱我的故乡!");
                emitter.onNext("那是我无数个夜晚醒来后魂牵梦萦的地方!");
                emitter.onNext("心与心虽远隔千里!");
                emitter.onNext("但与故乡确咫尺之间!");
                emitter.onComplete();
           

最后给出上面例子的链式写法:

package com.xw.rxjavademo;

import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

public class MainActivity extends AppCompatActivity {

    private TextView textTips;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        textTips = findViewById(R.id.text_tips);

        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("我爱我的故乡!");
                emitter.onNext("那是我无数个夜晚醒来后魂牵梦萦的地方!");
                emitter.onNext("心与心虽远隔千里!");
                emitter.onNext("但与故乡确咫尺之间!");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.v("ssss",s);
                textTips.setText(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
}
           

或者使用简洁版的观察者接口:消费者接口Consumer<String>作为信号的消费者! 

import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;

public class MainActivity extends AppCompatActivity {

    private TextView textTips;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        textTips = findViewById(R.id.text_tips);

        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("我爱我的故乡!");
                emitter.onNext("那是我无数个夜晚醒来后魂牵梦萦的地方!");
                emitter.onNext("心与心虽远隔千里!");
                emitter.onNext("但与故乡却咫尺之间!");
                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.v("ssss",s);
                textTips.setText(s);
            }
        });
    }
}
           

在实际的开发中,我们在被观察者接口对象的重写方法subscribe(){}中处理构建需要观察者们观察的随时变动的数据,比如我们将查询数据库的结果作为观察者们需要观察的数据,再比如构建上面的那首诗:“我爱我的故乡,......,但与故乡却咫尺之间!”。

在实际的开发中,我们需要继承接口ObservableOnSubscribe自定义自己的被观察者,并构建被观察的数据:

public class PengYouMessageObservable implements ObservableOnSubscribe<List<PengYouMessage>> {
     
    ......

    @Override
    public void subscribe(@NotNull ObservableEmitter<List<PengYouMessage>> emitter) {
        PengYouMessageManager manager = PengYouMessageManager.getInstance();
        //将查询数据库数据有无新增变化,作为被观察的数据
        List<PengYouMessage> PengYouInfoList = manager.query(mobile);      
        
        if (ListUtil.notEmpty(friendInfoList)) {
            //对朋友消息进行处理等;
        }
        //将消息发射出去
        emitter.onNext(PengYouInfoList);
        emitter.onComplete();
    }

   ......
}
           

在需要处理数据的地方,比如Activity、Fragment的主线程中通过create()方法创建订阅关系,更新UI数据:

.........
        Observable.create(new PengYouMessageObservable( mobile)).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<PengYouMessage>>() {
                    @Override
                    public void accept(List<PengYouMessage> pengYouInfoList) {
                        //这里已经是在观察者、消费者对象中,也就是切换到了AndroidSchedulers.mainThread()主线程中了,可以调用适配器更新UI数据了
                        mPengYouAdapter.isUseEmpty(true);
                        mPengYouAdapter.setNewData(pengYouInfoList);
                    }
                });
        updateUIGift();
    }

.........
           

上面算是RxJava的入门使用基础!后面我会总结一下RxJava常用的几种操作符和线程控制Scheduler的内容。也方便以后查阅使用!