1.定义
RxJava
是一个 基于事件流、实现异步操作的库
2.作用
用于实现异步操作,类似于
Android
中的
AsyncTask
、
Handler+
new Thread的作用
3. 特点
由于
RxJava
的使用方式是:基于事件流的链式调用,所以使得
RxJava
:
- 逻辑简洁
- 实现优雅
- 使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
-
原理 基于 一种扩展的观察者模式Rxjava
-
的扩展观察者模式中有4个角色:Rxjava
角色 | 作用 | 类比 |
---|---|---|
被观察者(Observable) | 产生事件 | 顾客 |
观察者(Observer) | 接收事件,并给出响应动作 | 厨房 |
订阅(Subscribe) | 连接 被观察者 & 观察者 | 服务员 |
事件(Event) | 被观察者 & 观察者 沟通的载体 | 菜式 |
4.使用
导包
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
RxJava 实例1.定义2.作用3. 特点4.使用5.优雅实现其他方法observeOn:观察者的执行线程subscribeOn:被观察者的执行线程RXView 防抖
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL2cjNhVjYzkzMwImN4QmM4AjZxQTZ4MzYzIzY4EWYiF2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.定义被观察者
三种方式:Observable.create、Observable.just、Observable.fromArray
public void buyFood(){
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("potato");
e.onNext("tomato");
e.onNext("noodles");
e.onComplete();
}
});
Observable<String> observable1 = Observable.just("potato","tomato","noodles");
String [] foods = new String[]{"potato","tomato","noodles"};
Observable<String> observable2 = Observable.fromArray(foods);
}
2.定义观察者
使用observer或者subscriber
Observer<String> mObserver = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
Subscriber<String> mStringSubscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别
// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)
// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:
// 1\. onStart():在还未响应事件前调用,用于做一些初始化工作
// 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件
// 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露
3.订阅
observable.subscribe(observer);
5.优雅实现
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("potato");
e.onNext("tomato");
e.onNext("noodles");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
其他方法
observeOn:观察者的执行线程
observeOn 作用于该操作符之后直到出现新的observeOn操作符
subscribeOn:被观察者的执行线程
subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制
实例
//将字符串转换成double类型
String path = "12.3";
public void test(){
//输入事件
Observable.just(path)
//对事件进行处理
.map(new Function<String, Double>() {
@Override
public Double apply(@NonNull String s) throws Exception {
return Double.parseDouble(s);
}
})
//指定被观察者执行线程
.subscribeOn(Schedulers.io())
//指定观察者执行线程
.observeOn(AndroidSchedulers.mainThread())
//订阅观察者
.subscribe(
new Observer<Double>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Double aDouble) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Rxjava中的
Scheduler
相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
另外,Android还有一个专用的
AndroidSchedulers.mainThread()
指定操作将在Android主线程运行。Rxjava通过
subscribeOn()
和
observeOn()
两个方法来对线程进行控制,
subscribeOn()
指定
subscribe()
时间发生的线程,也就是事件产生的线程,
observeOn()
指定
Subscriber
做运行的线程,也就是消费事件的线程。
Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和
newThread()
差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存
Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。
Schedulers.immediate() 这个调度器允许你立即在当前线程执行你指定的工作。这是默认的
Scheduler
。
Schedulers.newThread() 它为指定任务启动一个新的线程。
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用
trampoline()
将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。
RXView 防抖
private void test3() {
RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e(TAG, "onNext: 响应事件....." );
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
RxJava篇(应用场景) - 简书 (jianshu.com)