天天看點

【32】RxJava與RxAndroidRxJava與RxAndroid

RxJava與RxAndroid

文章目錄

  • RxJava與RxAndroid
    • 1.為什麼要學習RxJava與RxAndroid
      • 1.1RxJava能提高工作效率
      • 1.2RxJava能優雅解決複雜業務場景
      • 1.3RxJava使用越來越流行
    • 2.這門課程能夠學到什麼
      • 2.1了解什麼是響應式程式設計
      • 2.2了解RxJava與RxAndroid到底是什麼
      • 2.3了解RxJava的曆史來源
      • 2.4清楚RxJava與RxAndroid關系
      • 2.5學會在項目中使用RxJava和RxAndroid
    • 3.相關資源
    • 4.響應式程式設計概述
      • 4.1什麼是響應式程式設計?
      • 4.2響應式程式設計關鍵概念
        • 4.2.1事件
      • 4.3響應式程式設計的使用場景
        • 4.3.1UI(通用)
        • 4.3.2微軟的響應式擴充程式設計(函數響應式程式設計)
    • 5.RxJava概述
      • 5.1RxJava是什麼?
        • 5.1.1是一個異步資料處理庫
        • 5.1.2是一個擴充的觀察者模式
      • 5.2RxJava曆史來源
      • 5.3RxJava特點
        • 5.3.1 Jar包<1MB
        • 5.3.2輕量級架構
        • 5.3.3 支援Java 8 lambda
        • 5.3.4支援Java 6+ & Android2.3 +
        • 5.3.5支援異步和同步
      • 5.4擴充的觀察者模式
        • 5.4.1 onCompleted()事件
        • 5.4.2onError()事件
        • 5.4.3組合而不是嵌套,避免陷入回調地獄.
    • 6.RxAndroid概述
      • 6.1RxAndroid是什麼?
      • 6.2Schedulers(排程器)
        • 6.2.1解決Android主線程問題【針對Android】
        • 6.2.2解決多線程問題
    • 7.RxJava與觀察者模式
      • 7.1觀察者模式的四大要素
        • 7.1.1 Observable被觀察者
        • 7.1.2 Observer(觀察者)
        • 7.1.3 subscribe(訂閱)
        • 7.1.4 事件
      • 7.2觀察者模式
      • 7.3 RxJava擴充的觀察者模式
    • 8.總結
      • 8.1RxJava是什麼?
      • 8.2RxJava的曆史來源
      • 8.3RxJava特點
      • 8.4RxAndroid的概念
      • 8.5RxJava的排程器(Schedulers)
      • 8.6觀察者模式四大要素
      • 8.6RxJava擴充的觀察者模式
    • 9.RxJava hello world
    • 10.操作符
      • 10.1操作符的分類
        • 10.1.1 Creating Observables (建立Observable)
        • 10.1.2 Transforming Observables(轉換Observable)
        • 10.1.3 Filtering Observables(過濾Observable)
        • 10.1.4 Combining Observables(組合Observable)
        • 10.1.5 Error Handling Operators(處理錯誤)
      • 10.2 Creating Observables (建立Observable)
        • 10.2.1 Create
        • 10.2.2 Just
        • 10.2.3 From
        • 10.2.4 Defer
        • 10.2.5 Empty/Never/Throw
        • 10.2.6 Interval
        • 10.2.7 Range
        • 10.2.8 Repeat
        • 10.2.9 Start
        • 10.3 Transforming Observables(轉換Observable)
        • 10.3.1 Map
        • 10.3.2 flatMap
        • 10.3.3 GroupBy
        • 10.3.4 Buffer
        • 10.3.5 Scan
        • 10.3.6 Window
      • 10.4 Filtering Observables(過濾Observable)
        • 10.4.1 Debounce
        • 10.4.2 Distinct
        • 10.4.3 ElementAt
        • 10.4.4 Filter
        • 10.4.5 First
        • 10.4.6 IgnoreElements
        • 10.4.7 Last
        • 10.4.8 Sample
        • 10.4.9 Skip
        • 10.4.10 SkipLast
        • 10.4.11 Take
        • 10.4.12 TakeLast
      • 10.5 Combining Observables(組合Observable)
        • 10.5.1 Zip
        • 10.5.2 Merge
        • 10.5.3 StartWith
        • 10.5.4 CombineLatest
        • 10.5.5 Join
        • 10.5.6 SwitchOnNext
      • 10.6 Error Handling Operators(處理錯誤)
        • 10.6.1 Catch
        • 10.6.2 Retry
    • 11.Schedulers(排程器)
      • 11.1 Schedulers(排程器)是什麼?
      • 11.2 Schedulers(排程器)種類
        • 11.2.1 io()
        • 11.2.2 computation()
        • 11.2.3 immediate()
        • 11.2.4 newThread()
        • 11.2.5 trampoline()
    • 12.Android Schedulers
      • 12.1 Android Schedulers是什麼?
      • 12.2 代碼示例
    • 13.非阻塞I/O操作
      • 13.1 非阻塞I/O操作圖檔儲存
    • 14.SubscribeOn and ObserveOn
      • 14.1 SubscribeOn
      • 14.2 ObserveOn

1.為什麼要學習RxJava與RxAndroid

1.1RxJava能提高工作效率

1.2RxJava能優雅解決複雜業務場景

1.3RxJava使用越來越流行

2.這門課程能夠學到什麼

2.1了解什麼是響應式程式設計

2.2了解RxJava與RxAndroid到底是什麼

2.3了解RxJava的曆史來源

它是為了解決什麼問題而來的。
           

2.4清楚RxJava與RxAndroid關系

2.5學會在項目中使用RxJava和RxAndroid

3.相關資源

RxJava文檔:https://github.com/ReactiveX/RxJava/wiki
RxJava中文文檔:https://mcxiaoke.gitbooks.io/rxdocs/content
RxJava經典資料:https//github.com/lzyzsd/Awesome-RxJava
           

4.響應式程式設計概述

4.1什麼是響應式程式設計?

是一種基于異步資料流概念的程式設計模式.
解說:資料流通俗一點講它其實就是像我們現實生活中的河流一樣,這些河流是可以被我們進行觀察的,也可以對這些河流進行過濾,以及可以進行其他各種操作,同時我們也可以把這條河流與另外的河流進行合并,然後組成一條新的河流,總之它其實是一種新的程式設計模式。
           

4.2響應式程式設計關鍵概念

4.2.1事件

解說: 它其實是軟體界于我們生活中的事件,通俗的講就是把現實生活中的事件搬到軟體中的事件。也可以像現實生活中的事件一樣,可以對這個事件進行等待,同時這個事件可以觸發一個新的過程.
例如學習RxJava,其實可以了解為一件事情,它也可以觸發一個學習的過程。同時這個事件也可以觸發另外的事件。
           

4.3響應式程式設計的使用場景

4.3.1UI(通用)

例如現在手機APP的一個網絡請求,還有手機上的一個輸入系統,例如系統的一個對話框響應,這些都是響應式程式設計的使用場景。
也就是把我們現實生活中的這種事件場景發送到我們的軟體裡面來。
           

4.3.2微軟的響應式擴充程式設計(函數響應式程式設計)

微軟RX庫
即.NET響應式的擴充,即一組我們可以觀察的,基于異步的和基于事件的它的一個驅動程式。
           

5.RxJava概述

5.1RxJava是什麼?

5.1.1是一個異步資料處理庫

5.1.2是一個擴充的觀察者模式

5.2RxJava曆史來源

是Netflix公司遇到問題,這個公司不滿足于使用者量的增長,想找一種什麼樣的架構來代替他們原來的架構,是以他們借鑒了微軟公司提出的響應式程式設計。
目标:重構目前架構來減少REST調用的次數
嘗試:微軟RX遷移到JVM
           

5.3RxJava特點

5.3.1 Jar包<1MB

5.3.2輕量級架構

入侵我們項目不重
           

5.3.3 支援Java 8 lambda

5.3.4支援Java 6+ & Android2.3 +

5.3.5支援異步和同步

5.4擴充的觀察者模式

5.4.1 onCompleted()事件

傳統的觀察者模式是沒有事件結束的通知的,也沒有事件錯誤的通知的模式.
這個方法會在事件結束之後,把通知發送到觀察者。
           

5.4.2onError()事件

把事件錯誤資訊發送給觀察者。
           

5.4.3組合而不是嵌套,避免陷入回調地獄.

6.RxAndroid概述

6.1RxAndroid是什麼?

RxAndroid是RxJava針對Android平台的一個擴充,用于Android開發.
針對Android平台提供響應式擴充元件,快速、易于開發Android應用程式。
           

6.2Schedulers(排程器)

解說:排程器簡單說是RxJava可以指定一些相應的操作在某一個線程,比如說在子線程或者是在主線程進行操作。
           

6.2.1解決Android主線程問題【針對Android】

例如在主線程進行UI操作,可以通過排程器來指定UI的更新,就是在主線程來進行操作,而不是在其他的線程。
           

6.2.2解決多線程問題

解說:Android的UI操作必須是在主線程,而排程器還可以解決一個多線程的問題.
在做Android開發的時候,涉及到網絡請求資料,網絡請求資料是一個耗時的操作,Android中耗時的操作都必須放到子線程中來進行一個操作,如果不是放在子線程,那肯定對Android主線程造成卡頓現象。也就是說排程器解決了一個子線程與主線程之間的一個通迅問題。
在Android開發過程中為了解決主線程與子線程的一個通迅問題需要通過Handler,如果使用了RxJava,使用了RxAndroid,這個庫就通過排程器來指定我們這些操作是在主線程當中還是在子線程中,即一句代碼就可以解決了,就不像平常用Handler寫一長串的代碼。
例:
           
Observable.just(1,2,3,4)//IO線程,由subscribeOn()指定
	.subscribeOn(Schedulers.io())
	.observeOn(Schedulers.newThread())
	.map(mapOperator)//新線程,由observerOn()指定
	.observeOn(Schedulers.io())
	.map(mapOperator2)//IO線程,由observeOn()指定
	.observeOn(AndroidSchedulers.mainThread)
	.subscribe(subscriber);//Android主線程,由obServeOn()指定
           

7.RxJava與觀察者模式

7.1觀察者模式的四大要素

7.1.1 Observable被觀察者

7.1.2 Observer(觀察者)

7.1.3 subscribe(訂閱)

解說:訂閱的意思是我們的觀察者要觀察被觀察者,
           

7.1.4 事件

解說:觀察者訂閱被觀察者,遇到被觀察者發出事件,觀察者就可以接收得到,
           

7.2觀察者模式

觀察者訂閱被觀察者,其中就是訂閱它的事件.
           

7.3 RxJava擴充的觀察者模式

onNext()就是訂閱的一個事件
onCompleted()
onError()
解說:RxJava擴充的觀察者模式多了兩個方法,一個是當事件流完成之後,它會回調onCompleted()這個方法,當這些流在流動的過程中如果出現了異常,那麼它會回調我們觀察者中的onError()方法。
           

8.總結

8.1RxJava是什麼?

其實是一個異步操作的一個庫,同時它也是一個支援觀察者模式的一個擴充。
           

8.2RxJava的曆史來源

Netflix公司解決目前系統不能适應使用者增長量,他們希望借鑒微軟響應式程式設計的模式,來對目前系統進行一個新的架構,RxJava其實就是借鑒微軟響應式程式設計支援Java語言擴充出來的一個庫.
           

8.3RxJava特點

它是一個比較輕量級的一個庫,同時jar包比較小,小于1MB,它支援java 8 的lanboda表達式,同時它也支援同步與異步的互動。
           

8.4RxAndroid的概念

RxAndroid是RxJava在Android平台的一個擴充庫,這個主要是針對Android平台開發的一個庫。
           

8.5RxJava的排程器(Schedulers)

主要來解決線程之間的一個通信問題。一句很簡單的代碼就可以指定具體操作是在子線程還是在主線程。
           

8.6觀察者模式四大要素

被觀察者(Observable)、觀察者(Observer)、subscribe訂閱、事件

觀察者與被觀察者之間是通過訂閱來關聯的,關聯之間是些什麼東西呢?是通過事件來進行驅動的。
           

8.6RxJava擴充的觀察者模式

RxJava是根據觀察者模式來進行擴充的,它多了兩個回調方法,onCompleted(),onError().
           

9.RxJava hello world

package com.gdc.rxjava;
	import rx.Observable;
	import rx.Observable.OnSubscribe;
	import rx.Subscriber;
	
	public class HelloWorld {
	
		public static void main(String[] args) {
			//s1.建立被觀察者
			Observable myObservable = Observable.create(new OnSubscribe<String>(){
	
				@Override
				public void call(Subscriber<? super String> subscriber) {
					subscriber.onNext("hello world!!!");
					subscriber.onCompleted();
				}});
			
			//s2.建立觀察者
			Subscriber subscriber = new Subscriber<String>(){
	
				@Override
				public void onCompleted() {
					System.out.println("onCompleted()");
				}
	
				@Override
				public void onError(Throwable arg0) {
					System.out.println("onError()");
				}
	
				@Override
				public void onNext(String arg0) {
					System.out.println("onNext():" + arg0);
				}
				
			};
			
			//s3.訂閱事件
			myObservable.subscribe(subscriber);
			
		}
	}
           
onCompleted()與onError()能同時獲得調用嗎?
onCompleted()與onError()隻能回調其中的一個。回調onError()或者onCompleted()方法就代碼這個事件已經結束.
           

10.操作符

10.1操作符的分類

10.1.1 Creating Observables (建立Observable)

10.1.2 Transforming Observables(轉換Observable)

10.1.3 Filtering Observables(過濾Observable)

10.1.4 Combining Observables(組合Observable)

10.1.5 Error Handling Operators(處理錯誤)

很多操作符可以同時組合使用得到我們希望得到的結果。
           

10.2 Creating Observables (建立Observable)

10.2.1 Create

private static void create(){
		Observable.create(new OnSubscribe<String>(){

			@Override
			public void call(Subscriber<? super String> subscriber) {
				subscriber.onNext("中華人民共和國 RxJava學習");
			}}).subscribe(new Subscriber<String>() {

				@Override
				public void onCompleted() {
					System.out.println("onCompleted()");
				}

				@Override
				public void onError(Throwable arg0) {
					System.out.println("onError():" + arg0);
				}

				@Override
				public void onNext(String arg0) {
					System.out.println("onNext():" + arg0);
				}
			});
	}
           

10.2.2 Just

private static void just() {
		Observable.just("中華人民共和國 RxJava學習").subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.2.3 From

轉換成各種類型的其他的對象或者資料類型,轉換為Observables對象,它可以接收數組與清單.
           
private static void from() {
		//1: 數組、
		//2:清單
		//3:Iterable
		//4:Future 
		//5:Future,Scheduler
		//6:Future,timout,timeUnit
		Integer[] a = new Integer[] { 1, 2, 3, 4, 5, 6, 7 };
		//支援2
		ArrayList<Integer> items = new ArrayList<Integer>();
		items.add(1);
		items.add(2);
		items.add(3);
		items.add(4);
		items.add(5);
		items.add(6);
		Observable.from(items).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.2.4 Defer

在我們沒有去調用subscribe方法之前不會去建立Observable,直到調用了subscribe()這個方法之後才會建立Observable。
解說:簡單了解就是可以延遲的意思。
與前幾個操作符create,just,from的差別是,當調用subscribe()才會建立Observable對象。
           
private static String valuestr;

	private static void defer() {
		//Observable observable = Observable.just(valuestr);
		
		Observable observable = Observable.defer(new Func0<Observable<String>>() {

			@Override
			public Observable<String> call() {
				return Observable.just(valuestr);
			}
		});
		
		valuestr = "中華人民共和國RxJava學習"; 

		observable.subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
	
           

10.2.5 Empty/Never/Throw

(1)Empty 是建立一個空的,沒有任何資料項的一個Observable對象。
如果直接調用則會調用onCompleted()
           
private static void empty(){
		Observable.empty().subscribe(new Subscriber() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}			

			@Override
			public void onNext(Object arg0) {
				
				System.out.println("onNext()");
			}
		});
	}
           
(2)Never
是不進行任何回調觀察者的方法
           
private static void never(){
		Observable.never().subscribe(new Subscriber() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}			

			@Override
			public void onNext(Object arg0) {
				
				System.out.println("onNext()");
			}
		});
	}
           
(3)throw
是一個錯誤的Observable對象,它會回調onError()的方法。
           
private static void throwtest() {
		Observable.error(new NullPointerException()).subscribe(new Subscriber() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Object arg0) {

				System.out.println("onNext()");
			}
		});
	}
           

10.2.6 Interval

解說:即定時器,每間隔一段時間來發射一次,即為一個定時器。
           
private static void interval(){
		Observable.interval(1000,TimeUnit.MICROSECONDS).subscribe(new Subscriber() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Object arg0) {

				System.out.println("onNext()");
			}
		});
	}
           

10.2.7 Range

它是建立一個在一個範圍之内的一個整型的一個資料項的Observable對象。
           
private static void range(){
		Observable observable = Observable.range(1, 5);
		observable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext()" + arg0);
			}
		});
	}
           

10.2.8 Repeat

建立一個Observable對象,發射一個特定資料項,即重複發送。
           
private static void repeat(){
		Observable observable = Observable.range(1, 5).repeat(2);
		observable.subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext()" + arg0);
			}
		});
	}
           

10.2.9 Start

建立一個Observable對象,發送開始的資料類型.
           

####10.2.10 Timer

是一個定時器,每間隔一段時間進行一次發送。

10.3 Transforming Observables(轉換Observable)

10.3.1 Map

是将一個對象轉換成另外一個我們想轉換成的對象。即資料類型的轉換。
是一個一對一的轉換.傳回的是具體的轉換的資料類型.
           
private static void testTransform() {
		//将整型資料轉換成String類型輸出
		Observable.just(1,2,3).map(new Func1<Integer, String>() {

			@Override
			public String call(Integer arg0) {
				return arg0 + "";
			}
		}).subscribe(new Subscriber<String>(){

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()" );
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}});
	}
           

10.3.2 flatMap

是一個一對多的資料轉換.它傳回的是一個Observable對象,它的裡面包含的是我們具體想轉換的資料類型。
其實它是把一個對象
轉換後傳回Observable對象,
然後再把這個Observable對象flat成總的一個Observable對象,然後再将資料發送給觀察者。
在Android開發中經常遇到一個網絡請求依賴于另一個網絡請求,像經常會遇到一個需求,
通過擷取到token,然後才根據這個token點選擷取資料清單這樣的場景。
           
private static void testFlatMap() {
		
		Observable.just(1, 2, 3,4,5).flatMap(new Func1<Integer, Observable<? extends String>>() {

			@Override
			public Observable<? extends String> call(Integer arg0) {

				return Observable.just(arg0 + "");
			}
		}).subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.3.3 GroupBy

分組:它會通過我們指定的規則對資料清單進行一個分組,然後将它發送到觀察者這邊來。
即可以對資料進行分組。
           
private static void testGroupBy(){
		Observable.just(1, 2, 3,4,5).groupBy(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer arg0) {
				//分組規則
				return arg0 % 2;
			}
		}).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(GroupedObservable<Integer, Integer> arg0) {

				arg0.subscribe(new Subscriber<Integer>() {

					@Override
					public void onCompleted() {
						
					}

					@Override
					public void onError(Throwable arg0) {
						
					}

					@Override
					public void onNext(Integer data) {
						System.out.println("group:" + arg0.getKey() + " data:" + data);
					}
				});
			}
		});
	}
           

10.3.4 Buffer

它可以一次性将Observable對象轉換成多個資料項,然後将多個資料項發送給觀察者。也就是可以将六個資料項,可以訂閱3個,一次将3個資料發送給觀察者。然後通過觀察者列印出來。

//随機生成5個數字,一次性訂閱兩次,然後列印出來,通過運作得知,隻需要3次即可以把資料全部列印出來
           
Observable.range(1,5).buffer(2).subscribe(new Subscriber<List<Integer>>() {

		@Override
		public void onCompleted() {
			System.out.println("onCompleted()");
		}

		@Override
		public void onError(Throwable arg0) {
			System.out.println("onError():" + arg0);
		}

		@Override
		public void onNext(List<Integer> arg0) {
			System.out.println("onNext():" + arg0);
		}
	});
           

10.3.5 Scan

它是通過累加,即有一個序列的資料,它每一次把前面的資料累加,然後發送到觀察者。
           
private static void testScan() {
		// 1到5的和
		Observable.range(1, 5).scan(new Func2<Integer, Integer, Integer>() {

			@Override
			public Integer call(Integer sum, Integer arg1) {
				// sum:目前資料的和,arg1:每一次需要累加的資料
				return sum + arg1;
			}
		}).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.3.6 Window

它與buffer差不多,Window是傳回一個Observable對象,它是必須根據指定的規則然後将資料聚集到一個清單,然後可以通過時間間隔将資料發送給觀察者。
           

10.4 Filtering Observables(過濾Observable)

10.4.1 Debounce

在操作間隔一定的時間内沒有做任何操作,那麼這個資料才會發送給觀察者。
在一個資料項清單中,這個資料項空閑多長時間之後,如果沒有發生操作,那麼就發送該資料項給觀察者。
           
private static void testDebounce(){
		Observable.create(new OnSubscribe<Integer>(){

			@Override
			public void call(Subscriber<? super Integer> arg0) {
				try {
					for(int i = 0 ; i < 10; i++){
						Thread.sleep(1000);
						arg0.onNext(i);
					}
					arg0.onCompleted();
				} catch (InterruptedException e) {
					arg0.onError(e);
				}
			}}).debounce(1,TimeUnit.SECONDS).subscribe(new Subscriber<Integer>(){

				@Override
				public void onCompleted() {
					System.out.println("onCompleted()");
				}

				@Override
				public void onError(Throwable arg0) {
					System.out.println("onError():" + arg0);
				}

				@Override
				public void onNext(Integer arg0) {
					System.out.println("onNext():" + arg0);
				}
				
			});
	}
           

10.4.2 Distinct

這是一個去重的操作符,即去除重複資料的操作符,當資料清單中存在重複資料,需要去除重複資料的情況,就可以使用該操作符。
資料項的某一個操作,當産生1的時候,它有一個時間間隔,然後2、3、4、5它們之間的時間間隔還沒有到我們指定的時間,我們一個資料項之後的時間間隔,必須要在我們指定時間内沒有發生第二個操作之後,我們才發送資料項到觀察者。
           
private static void testDistinct(){
		Observable.just(1,2,3,2,3).distinct().subscribe(new Subscriber<Integer>(){

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
			
		});
	}
           

10.4.3 ElementAt

這個操作符用于取指定位置的資料,這個操作符類似于Java清單List的get方法擷取指定位置的資料。
           
private static void testElementAt(){
		Observable.just(1,2,3,2,3).elementAt(3).subscribe(new Subscriber<Integer>(){

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
			
		});
	}
           

10.4.4 Filter

是過濾的意思,這個操作符按照我們指定的一個規則進行條件過濾,得到我們想要的資料,過濾的條件是我們可以自定義的。
           
private static void testFilter() {
		Observable.just(1, 2, 3, 2, 3).distinct().filter(new Func1<Integer, Boolean>() {

			@Override
			public Boolean call(Integer arg0) {
				// 在此指定過濾規則
				return arg0 > 2;
			}
		}).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.4.5 First

指的是清單中的第一個資料.
           
private static void testFirst() {
		Observable.just(9, 2, 3, 2, 3).distinct().first().subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.4.6 IgnoreElements

意思是忽略掉清單中的所有元素不向觀察者發送任何的資料項,隻回調我們觀察者中的onCompleted()或者是onError()方法。即不會執行onNext()這個方法.
           
private static void testIgnoreElements() {
		Observable.create(new OnSubscribe<Integer>() {

			@Override
			public void call(Subscriber<? super Integer> arg0) {
				arg0.onNext(123);
				throw new NullPointerException();
			}
		}).ignoreElements().subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}

    private static void testIgnoreElements() {
		Observable.just(123).ignoreElements().subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.4.7 Last

private static void testLast() {
		Observable.just(9, 2, 3, 2, 3).distinct().last().subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.4.8 Sample

是一個取樣,取樣操作符,這個操作符讓我們可以對資料源進行采集,當我們采集到這個資料之後,才發送到觀察者。
Sample操作符類似于一個定時器,是定時的對資料源進行定時的取樣,當取樣的資料放到清單中,然後一段時間後将取到的資料發送到觀察者進行處理。
           
private static void testSample() {
		Observable.create(new OnSubscribe<Integer>() {

			@Override
			public void call(Subscriber<? super Integer> arg0) {
				try {
					for (int i = 0; i < 10; i++) {
						Thread.sleep(1000);
						arg0.onNext(i);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.4.9 Skip

跳躍清單資料指定資料項。
例如跳過清單的前兩項,然後取後面的資料,發送給觀察者。
           
private static void testSkip() {
		Observable.just(1, 2, 3, 4, 5).skip(2).skipLast(2).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.4.10 SkipLast

跳過資料清單的最後幾項,比如跳過清單的後兩項資料然後取前面的清單資料,最後将資料發送給觀察者。
           
private static void testSkip() {
		Observable.just(1, 2, 3, 4, 5).skip(2).skipLast(2).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.4.11 Take

資料清單中有很多個資料項列,隻取列有中的前3個資料,那麼就是Take(1,3).
           
private static void testTake() {
		Observable.just(1, 2, 3, 4, 5).take(2).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.4.12 TakeLast

即取資料項清單中的最後幾位資料,可以指定幾項資料,如1、2、3、4、5,取後3位資料,即TakeLast(3,5)取的是最後3、4、5條資料.
           
private static void testTakeLast() {
		Observable.just(1, 2, 3, 4, 5).takeLast(2).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           
過濾型操作符的作用就是尋找出我們想要的資料項,然後再發送給觀察者。
           

10.5 Combining Observables(組合Observable)

10.5.1 Zip

将兩個資料源組合成一個資料源之後然後發送給觀察者。
1,2,3,4,5
A,B,C,D
組合之後
1A,2B,3C,4D
最終将此結果發送給觀察者.
           
private static void testZip(){
		Observable<Integer> observable1 = Observable.just(10,20,30);
		Observable<Integer> observable2 = Observable.just(4,8,12,16);
		Observable.zip(observable1,observable2,new Func2<Integer, Integer, Integer>() {

			@Override
			public Integer call(Integer arg0, Integer arg1) {
				return arg0 + arg1;
			}
		}).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.5.2 Merge

兩個資料源進行組合,組合之後将組合的資料發送給觀察者。組合的資料是無序的,是根據時間的先後順序進行組合的。
按時間的先後順序組合成新的資料源發送給觀察者。
           
private static void testMerge(){
		Observable<Integer> odds = Observable.just(1,3,5);
		Observable<Integer> events = Observable.just(2,4,6);
		Observable.merge(odds,events).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
           

10.5.3 StartWith

在一個資料源的最前面進行插入一些資料。
           
private static void startWith() {
		Observable<Integer> first = Observable.just(1, 3, 5);
		Observable<Integer> second = Observable.just(2, 4, 6);
		first.startWith(second).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.5.4 CombineLatest

組合兩個資料源的資料,然後發送給觀察者。按照發送點的最新基數來進行組合,而産生一個新的Observable資料源對象,然後發送給觀察者。
           
private static void testCombineLatest() {
		Observable<Integer> first = Observable.just(1, 3, 5);
		Observable<Integer> second = Observable.just(2, 4, 6);
		first.combineLatest(first, second, new Func2<Integer, Integer, Integer>() {

			@Override
			public Integer call(Integer arg0, Integer arg1) {
				System.out.println("arg0:" + arg0 + "arg1:" + arg1);
				return arg0 + arg1;
			}
		}).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}

		});
	}
           

10.5.5 Join

兩個發射對象在一定的間隔之内進行按指定規則結合,然後産生新的資料對象,然後發送給觀察者。
類似于數組排列一樣,比如前面的是第一個對象是0,1,2,3,第二個中繼資料對象是a,b,c,就像排列組合一樣,在某一個時間點,我們向觀察者發送資料的時候,可以與1、2、3進行組合,a,b,c可以與它們進行組合,然後産生一個新的資料對象,之後再向觀察者發送資料。
           

10.5.6 SwitchOnNext

有一個資料源對象,它裡面有很多小分支,即有一個大的Observable對象,這個大的Observable對象它産生了很多個小的Observable對象,這個Switch操作符就是要把擁有很多小的Observable資料源對象的對象組合成一個新的資料源對象,然後再發送給觀察者。
會有資料覆寫現象産生.
           

10.6 Error Handling Operators(處理錯誤)

10.6.1 Catch

捕獲異常
操作符方法:
onErrorReturn():
	資料源在某一位置出現異常,當在項目代碼中使用onErrorReturn()這個方法,它會正常的結束這次資料流,它會回調onCompleted()方法。

onErrorResumeNext():
如果某資料項出現異常了,如果使用onErrorResumeNext(),它可以用正常的資料來代替我們要發送給觀察者的資料。差別就是可以用正常的資料做替換,然後發送。

onExceptionResumeNext():
可以讓我們捕獲操作異常的。
           

10.6.2 Retry

重新嘗試修複完善
當錯誤發生的時候,嘗試去恢複這個錯誤以保證正常的發送資料到觀察者。

Retry():
當錯誤發生的時候,嘗試去恢複這個錯誤以保證正常的發送資料到觀察者
retryWhen():
當異常發生的時候,retryWhen()可以設定延長多少時間後,再來繼續報這個錯誤,即發生異常的時候,不報這個錯誤,然後直到資料發送完成之後,我們才把異常資訊回調給觀察者,然後才捕獲到異常資訊。
           

11.Schedulers(排程器)

11.1 Schedulers(排程器)是什麼?

排程器(Schedulers)是RxJava以一種極其簡單的方式解決多線程問題機制.
           

11.2 Schedulers(排程器)種類

11.2.1 io()

這個排程器用于I/O操作
解說:在Android開發中,有很多種耗時的操作,比如網絡操作,對一個磁盤的讀寫操作,這類都屬于耗時的操作,這種耗時的操作就将其指定到I/O操作中。
           

11.2.2 computation()

這個是計算工作預設的排程器,它與I/O操作無關。
例如:buffer(),debounce(),delay(),interval(),sample(),skip();
           

11.2.3 immediate()

這個排程器允許你立即在目前線程執行你指定的工作。
           

11.2.4 newThread()

它為指定任務啟動一個新的線程來執行我們的任務.
           

11.2.5 trampoline()

排程器将會按序列處理隊列,并運作隊列中每一個任務。
例如:應用操作符repeat()與retry()時。
           

12.Android Schedulers

12.1 Android Schedulers是什麼?

AndroidSchedulers是RxAndroid庫提供在安卓平台的排程器(指定觀察者在主線程)
可以指定觀察者在主線程中運作.
将資料的更新指定在主線程中進行綁定顯示。
           

12.2 代碼示例

getApps()
	.onBackpressureBuffer()
	.subscribeOn(Schedulers.io())
	.observeOn(AndroidSchedulers.mainThread())
	.subscribe(new Observer<AppInfo>(){[…]})
           

13.非阻塞I/O操作

13.1 非阻塞I/O操作圖檔儲存

Public static void storeBitmap(Context context,Bitmap bitmap,String filename){
   		Schedulers.io().createWorker().schedule(()->{
      		blockingStoreBitmap(context,bitmap,filename);
		});
	}
           
阻塞 是同步,非阻塞是異步,在不同的線程中操作。
           

14.SubscribeOn and ObserveOn

解說:這兩個操作符可以指定代碼操作在什麼線程上面。指定耗時操作在什麼線程上執行。
           

14.1 SubscribeOn

RxJava提供了subscribeOn()方法來用于每個Observable對象.
即被觀察者對象産生的操作具體的運作在哪個線程上面。
           

14.2 ObserveOn

RxJava提供了subscribeOn()方法來用于每個Subscriber(Observer)對象.
指定觀察者對象的操作所在的線程.
           
DataManager.getRetrofitService()
	.getHomeData(“homeDataVersion”,new HashMap<String,String>())
	.subscribeOn(Schedulers.io())//指定網絡請求在io線程上
	.observeOn(AndroidSchedulers.mainThread())//指定資料傳回在主線程上 
	.subscribe(subscriber);
           
4個東西,被觀察者、觀察者、操作符、排程器
操作符是對我們被觀察者的資料進行的一些操作,操作完成之後将這些資料不管是變化也好,還是過濾也好,等等得到所想要的資料,
所想要的被觀察者對象,然後将其發送給觀察者。
           

繼續閱讀