天天看點

RxJava2學習筆記(2)

上一篇

已經熟悉了Observable的基本用法,但是如果僅僅隻是“生産-消費”的模型,這就展現不出優勢了,java有100種辦法可以玩這個:)

一、更簡單的多線程

正常情況下,生産者與消費者都在同一個線程裡處理,參考下面的代碼:

final long start = System.currentTimeMillis();

Observable<String> fileSender = Observable.create(emitter -> {
    for (int i = 1; i < 6; i++) {
        Thread.sleep(1000);
        String temp = "thread:" + Thread.currentThread().getId() + " , file " + i + " 的内容";
        System.out.println(temp);
        emitter.onNext(temp);
    }
    emitter.onComplete();
});

Observer<String> fileHander = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("準備處理檔案...");
    }

    @Override
    public void onNext(@NonNull String s) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread:" + Thread.currentThread().getId() + " , [" + s + "] 已處理!");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("師傅,有妖怪!");
    }

    @Override
    public void onComplete() {
        System.out.println("總算完事兒,累屎大爺了!");
        long end = System.currentTimeMillis();
        System.out.println("耗時:" + (end - start));
    }
};

fileSender.subscribe(fileHander);

Thread.sleep(60000);
      

假設生産者在讀取一堆檔案,然後發給消費者處理,通常情況下,這類涉及IO的操作都是很耗時的,我們用sleep(1000)來模拟。

RxJava2學習筆記(2)

從輸出結果上看,生産者與消費者的thread id相同,耗時約為10s。

fileSender.subscribe(fileHander);
      

如果上面這行,換成

fileSender.subscribeOn(Schedulers.io()) //生産者處理時,放在io線程中
        .observeOn(Schedulers.newThread()) //消費者處理時,用新線程
        .subscribe(fileHander); 
      

注:subscribeOn() 是生産者發送子彈的線程, observeOn() 則是消費者(靶子)收子彈的線程,如果有多個消費者,每次調用observeOn() 消費者線程便會切換一次

這樣生産者、消費者就變成不同的線程了,跑一下看看:

RxJava2學習筆記(2)

可以看到二個線程id不一樣,說明分别在不同的線程裡,而且總耗時明顯縮短了。

二、更平滑的鍊式調用

假設我們有一個經典的線上電商場景:使用者送出訂單後,馬上跳到支付頁面付款。傳統寫法,通常是中規中矩的封裝2個方法,依次調用。用rxjava後,可以寫得更流暢,先做點準備工作:

RxJava2學習筆記(2)

先定義二個服務接口:訂單服務(OrderService)以及支付服務(PayService)

OrderService.java

public interface OrderService {
    Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws Exception;
}
      

PayService.java

public interface PayService {
    Observable<PayResponse> payOrder(PayRequest request) throws Exception;
}
      

然後來二個實作:

OrderServiceImpl

public class OrderServiceImpl implements OrderService {

    @Override
    public Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws InterruptedException {
        System.out.println("threadId:" + Thread.currentThread().getId() + ", 訂單建立中:" + request.toString());
        CreateOrderResponse response = new CreateOrderResponse();
        response.setOrderNo(UUID.randomUUID().toString().replace("-", ""));
        response.setOrderStatus("NEW");
        response.setOrderAmount(request.getOrderAmount());
        response.setOrderDesc(request.getOrderDesc());
        return Observable.create(emitter -> emitter.onNext(response));
    }
}
      

PayServiceImpl

public class PayServiceImpl implements PayService {

    @Override
    public Observable<PayResponse> payOrder(PayRequest request) throws InterruptedException {
        System.out.println("threadId:" + Thread.currentThread().getId() + ", 正在請求支付:" + request);
        PayResponse response = new PayResponse();
        response.setSuccess(true);
        response.setOrderNo(request.getOrderNo());
        response.setTradeNo(UUID.randomUUID().toString().replace("-", ""));
        return Observable.create(emitter -> emitter.onNext(response));
    }
}
      

然後測試一把:

@Test
    public void test1() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //建立訂單
                //将"建立訂單的Response" 轉換成 "支付訂單的Response"
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                //支付完成的處理
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
        Thread.sleep(1000);//等待執行完畢
    }
      

鍊式的寫法,更符合閱讀習慣,注:flatMap這個操作,通俗點講,就是将一種口徑的子彈,轉換成另一種口徑的子彈,然後再繼續發射。

輸出:

threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:1, 正在請求支付:PayRequest(orderNo=81419b0580d547acbb53955978ace6b8, paymentAmount=8888)
threadId:1, 支付完成
      

可以看到,預設情況下,建立訂單/支付訂單在同一個線程中,結合前面學到的知識,也可以将它們劃分到不同的線程裡:(雖然就這個場景而言,這樣做的意義不大,因為支付前,肯定要等訂單先送出,這個沒辦法并發處理,這裡隻是意思一下,可以這樣做而已)

@Test
    public void test2() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                .subscribeOn(Schedulers.newThread())  //(生産者)建立訂單時,使用新線程
                .observeOn(Schedulers.newThread()) //(消費者1)接收訂單時,使用新線程
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .observeOn(Schedulers.newThread()) //(消費者2)接收支付結果時,使用新線程
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
        Thread.sleep(1000);//等待執行完畢
    }
      
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:13, 正在請求支付:PayRequest(orderNo=d5ff7890f22f486bb1bf8aa8e4f0a3bf, paymentAmount=8888)
threadId:14, 支付完成
      

從threadId看,已經是不同的線程了。

上面的代碼,都沒考慮到出錯的情況,如果支付時出異常了,rxjava如何處理呢?

先改下支付的實作,人為抛個異常:

public class PayServiceImpl implements PayService {

    @Override
    public Observable<PayResponse> payOrder(PayRequest request) throws Exception {
        throw new Exception("支付失敗!");
    }
}
      

rxjava裡有一個重載版本,見: io.reactivex.Observable

@CheckReturnValue
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
      

使用這個版本即可:

@Test
    public void test3() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                        //異常處理
                        err -> System.out.println("支付出錯啦:" + err.getMessage()));
        Thread.sleep(1000);//等待執行完畢
    }
      
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
支付出錯啦:支付失敗!        

如果想在訂單建立完後,先做些處理,再進行支付,可以這麼寫:

@Test
    public void test4() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                //訂單建立完成後的處理
                .doOnNext(response -> System.out.println("訂單建立完成:" + response))
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                        err -> System.out.println("支付出錯啦:" + err.getMessage()));
        Thread.sleep(1000);//等待執行完畢
    }
      
threadId:1, 訂單建立中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
訂單建立完成:CreateOrderResponse(orderNo=8c194b1d07c044a8af3771159e1bb2bf, orderDesc=iphone X, orderAmount=8888, orderStatus=NEW)
支付出錯啦:支付失敗!
      

最後再說下flatMap與concatMap,看下面二個示例就明白差異:

@Test
    public void flatMapTest() throws InterruptedException {
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(i);
            }
        }).flatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
                .delay(10, TimeUnit.MILLISECONDS)
        )
                .subscribe(s -> System.out.print(s + " "));
        Thread.sleep(5000);
    }
      

  輸出:0 1 5 9 2 3 7 4 6 8 

@Test
    public void concatMapTest() throws InterruptedException {
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(i);
            }
        }).concatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
                .delay(10, TimeUnit.MILLISECONDS)
        )
                .subscribe(s -> System.out.print(s + " "));
        Thread.sleep(5000);
    }
      

  輸出:0 1 2 3 4 5 6 7 8 9

結論:flatMap不保證順序,concatMap能保證順序

作者:

菩提樹下的楊過

出處:

http://yjmyzz.cnblogs.com

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。

繼續閱讀