已經熟悉了Observable的基本用法,但是如果僅僅隻是“生産-消費”的模型,這就展現不出優勢了,java有100種辦法可以玩這個:)
一、更簡單的多線程
正常情況下,生産者與消費者都在同一個線程裡處理,參考下面的代碼:
final long start = System.currentTimeMillis();
Observable<String> fileSender = Observable.create(emitter -> {
for (int i = 1; i < 6; i++) {
Thread.sleep(1000);
String temp = "thread:" + Thread.currentThread().getId() + " , file " + i + " 的内容";
System.out.println(temp);
emitter.onNext(temp);
}
emitter.onComplete();
});
Observer<String> fileHander = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("準備處理檔案...");
}
@Override
public void onNext(@NonNull String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread:" + Thread.currentThread().getId() + " , [" + s + "] 已處理!");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("師傅,有妖怪!");
}
@Override
public void onComplete() {
System.out.println("總算完事兒,累屎大爺了!");
long end = System.currentTimeMillis();
System.out.println("耗時:" + (end - start));
}
};
fileSender.subscribe(fileHander);
Thread.sleep(60000);
假設生産者在讀取一堆檔案,然後發給消費者處理,通常情況下,這類涉及IO的操作都是很耗時的,我們用sleep(1000)來模拟。
從輸出結果上看,生産者與消費者的thread id相同,耗時約為10s。
fileSender.subscribe(fileHander);
如果上面這行,換成
fileSender.subscribeOn(Schedulers.io()) //生産者處理時,放在io線程中
.observeOn(Schedulers.newThread()) //消費者處理時,用新線程
.subscribe(fileHander);
注:subscribeOn() 是生産者發送子彈的線程, observeOn() 則是消費者(靶子)收子彈的線程,如果有多個消費者,每次調用observeOn() 消費者線程便會切換一次
這樣生産者、消費者就變成不同的線程了,跑一下看看:
可以看到二個線程id不一樣,說明分别在不同的線程裡,而且總耗時明顯縮短了。
二、更平滑的鍊式調用
假設我們有一個經典的線上電商場景:使用者送出訂單後,馬上跳到支付頁面付款。傳統寫法,通常是中規中矩的封裝2個方法,依次調用。用rxjava後,可以寫得更流暢,先做點準備工作:
先定義二個服務接口:訂單服務(OrderService)以及支付服務(PayService)
OrderService.java
public interface OrderService {
Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws Exception;
}
PayService.java
public interface PayService {
Observable<PayResponse> payOrder(PayRequest request) throws Exception;
}
然後來二個實作:
OrderServiceImpl
public class OrderServiceImpl implements OrderService {
@Override
public Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws InterruptedException {
System.out.println("threadId:" + Thread.currentThread().getId() + ", 訂單建立中:" + request.toString());
CreateOrderResponse response = new CreateOrderResponse();
response.setOrderNo(UUID.randomUUID().toString().replace("-", ""));
response.setOrderStatus("NEW");
response.setOrderAmount(request.getOrderAmount());
response.setOrderDesc(request.getOrderDesc());
return Observable.create(emitter -> emitter.onNext(response));
}
}
PayServiceImpl
public class PayServiceImpl implements PayService {
@Override
public Observable<PayResponse> payOrder(PayRequest request) throws InterruptedException {
System.out.println("threadId:" + Thread.currentThread().getId() + ", 正在請求支付:" + request);
PayResponse response = new PayResponse();
response.setSuccess(true);
response.setOrderNo(request.getOrderNo());
response.setTradeNo(UUID.randomUUID().toString().replace("-", ""));
return Observable.create(emitter -> emitter.onNext(response));
}
}
然後測試一把:
@Test
public void test1() throws Exception {
OrderService orderService = new OrderServiceImpl();
PayService payService = new PayServiceImpl();
orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //建立訂單
//将"建立訂單的Response" 轉換成 "支付訂單的Response"
.flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
//支付完成的處理
.subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
Thread.sleep(1000);//等待執行完畢
}
鍊式的寫法,更符合閱讀習慣,注:flatMap這個操作,通俗點講,就是将一種口徑的子彈,轉換成另一種口徑的子彈,然後再繼續發射。
輸出:
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:1, 正在請求支付:PayRequest(orderNo=81419b0580d547acbb53955978ace6b8, paymentAmount=8888)
threadId:1, 支付完成
可以看到,預設情況下,建立訂單/支付訂單在同一個線程中,結合前面學到的知識,也可以将它們劃分到不同的線程裡:(雖然就這個場景而言,這樣做的意義不大,因為支付前,肯定要等訂單先送出,這個沒辦法并發處理,這裡隻是意思一下,可以這樣做而已)
@Test
public void test2() throws Exception {
OrderService orderService = new OrderServiceImpl();
PayService payService = new PayServiceImpl();
orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
.subscribeOn(Schedulers.newThread()) //(生産者)建立訂單時,使用新線程
.observeOn(Schedulers.newThread()) //(消費者1)接收訂單時,使用新線程
.flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
.observeOn(Schedulers.newThread()) //(消費者2)接收支付結果時,使用新線程
.subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
Thread.sleep(1000);//等待執行完畢
}
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:13, 正在請求支付:PayRequest(orderNo=d5ff7890f22f486bb1bf8aa8e4f0a3bf, paymentAmount=8888)
threadId:14, 支付完成
從threadId看,已經是不同的線程了。
上面的代碼,都沒考慮到出錯的情況,如果支付時出異常了,rxjava如何處理呢?
先改下支付的實作,人為抛個異常:
public class PayServiceImpl implements PayService {
@Override
public Observable<PayResponse> payOrder(PayRequest request) throws Exception {
throw new Exception("支付失敗!");
}
}
rxjava裡有一個重載版本,見: io.reactivex.Observable
@CheckReturnValue
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
使用這個版本即可:
@Test
public void test3() throws Exception {
OrderService orderService = new OrderServiceImpl();
PayService payService = new PayServiceImpl();
orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
.flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
.subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
//異常處理
err -> System.out.println("支付出錯啦:" + err.getMessage()));
Thread.sleep(1000);//等待執行完畢
}
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
支付出錯啦:支付失敗!
如果想在訂單建立完後,先做些處理,再進行支付,可以這麼寫:
@Test
public void test4() throws Exception {
OrderService orderService = new OrderServiceImpl();
PayService payService = new PayServiceImpl();
orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
//訂單建立完成後的處理
.doOnNext(response -> System.out.println("訂單建立完成:" + response))
.flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
.subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
err -> System.out.println("支付出錯啦:" + err.getMessage()));
Thread.sleep(1000);//等待執行完畢
}
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
訂單建立完成:CreateOrderResponse(orderNo=8c194b1d07c044a8af3771159e1bb2bf, orderDesc=iphone X, orderAmount=8888, orderStatus=NEW)
支付出錯啦:支付失敗!
最後再說下flatMap與concatMap,看下面二個示例就明白差異:
@Test
public void flatMapTest() throws InterruptedException {
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
}).flatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
.delay(10, TimeUnit.MILLISECONDS)
)
.subscribe(s -> System.out.print(s + " "));
Thread.sleep(5000);
}
輸出:0 1 5 9 2 3 7 4 6 8
@Test
public void concatMapTest() throws InterruptedException {
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
}).concatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
.delay(10, TimeUnit.MILLISECONDS)
)
.subscribe(s -> System.out.print(s + " "));
Thread.sleep(5000);
}
輸出:0 1 2 3 4 5 6 7 8 9
結論:flatMap不保證順序,concatMap能保證順序
作者:
菩提樹下的楊過出處:
http://yjmyzz.cnblogs.com本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。