天天看點

【java并發工具-分工】CompletionService:批量執行異步任務

CompletionService

    • 前言
    • 1.CompletionService原理:
    • 2.如何建立CompletionService?
    • 3.使用CompletionService完成詢價系統的優化。
    • 4.CompletionService接口介紹
    • 5.利用CompletionService實作dubbo中的Forking Cluster

前言

怎麼個批量擷取異步任務?

看下面詢價系統的執行個體代碼:

如果異步查詢電商S2的時間非常短,因為get()方法是阻塞方法,是以必須要等待f1.get()擷取成功之後才可以向下執行。

ExecutorService executor = Executors.newFixedThreadPool(3); //建立線程池
// 異步向電商S1詢價
Future<Integer> f1 =   executor.submit(    ()->getPriceByS1());
// 異步向電商S2詢價
Future<Integer> f2 =   executor.submit(    ()->getPriceByS2());
// 異步向電商S3詢價
Future<Integer> f3 =   executor.submit(    ()->getPriceByS3());    
// 擷取電商S1報價并儲存
r=f1.get();
executor.execute(()->save(r));  
// 擷取電商S2報價并儲存
r=f2.get();
executor.execute(()->save(r));  
// 擷取電商S3報價并儲存  
r=f3.get();
executor.execute(()->save(r));
           

那麼怎麼才可以獲得先執行結束的任務結果呢?

  1. 你可能會想到阻塞隊列,把get()擷取結果異步執行,并存放到阻塞隊列中,這樣每次從阻塞隊列中取出先執行結束的線程結果。
  2. JAVA SDK并發包中提供了的CompletionService就可以完成上面的操作,下面就來介紹CompletionService工具。

1.CompletionService原理:

其實CompletionService内部也是維護了一個阻塞隊列,當任務執行結束就把任務的執行結果加入到阻塞隊列中,不同的是它是把任務執行結果的Future對象加入到阻塞隊列中了,而上面的代碼是把任務的執行結果放入了隊列中。

2.如何建立CompletionService?

CompletionService的接口實作類是ExecutorCompletionService,這個實作類的構造方法有兩個:

1.  ExecutorCompletionService(Executor executor)
2.  ExecutorCompletionService(Executor executor, BlockingQueue<Future<v>> completionQueue)
           

1.需要傳入線程池。

2.除了需要傳入一個線程池,還需要傳入一個阻塞隊列,任務的執行結果就是放在completionQueue隊列中的。

如果不指定阻塞隊列,預設使用無界的LinkedBlockQueue,任務執行結果的Future就是放在這裡面的。

3.使用CompletionService完成詢價系統的優化。

// 建立線程池
ExecutorService executor =   Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs = new   ExecutorCompletionService<>(executor);
// 異步向電商S1詢價
cs.submit(()->getPriceByS1());
// 異步向電商S2詢價
cs.submit(()->getPriceByS2());
// 異步向電商S3詢價
cs.submit(()->getPriceByS3());
// 将詢價結果異步儲存到資料庫
for (int i=0; i<3; i++) {
  Integer r = cs.take().get();   //擷取cs阻塞隊列中的Future對象,然後獲得結果int。
  executor.execute(()->save(r));
}
           

4.CompletionService接口介紹

CompletionService接口提供5個方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take()  throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit)   throws InterruptedException;
           

兩個submit送出方法。

  1. 一個參數是Callable,前面執行個體代碼中送出任務就是用的它;
  2. 另外一個參數是Runnalbe,result,和Future一文中的其中一種submit方法類似。

剩下的三個方法都與阻塞隊列有關。take(),poll()方法都是從阻塞隊列中擷取并移除一個元素.

  1. take():如果阻塞隊列是空,調用take方法的線程會陷入阻塞。
  2. poll(): 如果阻塞隊列是空,調用poll方法會傳回null值,
  3. 并且poll()方法還支援逾時等待的方式,如果等待逾時後,隊列還是空,就還是傳回null值。

5.利用CompletionService實作dubbo中的Forking Cluster

Dubbo中有一種Forking叢集,在這種叢集模式下,支援并行地調用多個查詢服務,隻要有一個服務傳回結果,整個服務就可以傳回了。

例如你需要一個地圖的服務,為了保證高可用和高性能,你可以并行調用三個地圖服務商的API,然後隻要有一個服務傳回,這個服務就可以傳回了。

下面看代碼執行個體:

geocoder(addr) {
  //并行執行以下3個查詢服務, 
  r1=geocoderByS1(addr);
  r2=geocoderByS2(addr);
  r3=geocoderByS3(addr);
  //隻要r1,r2,r3有一個傳回
  //則傳回
  return r1|r2|r3;
}
           

利用CompletionService實作Forking這種叢集模式:

// 建立線程池
ExecutorService executor =  Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs =  new ExecutorCompletionService<>(executor);
// 用于儲存Future對象
List<Future<Integer>> futures =  new ArrayList<>(3);
//送出異步任務,并儲存future到futures 
futures.add(  cs.submit(()->geocoderByS1()));//添加 異步任務傳回的Future結果
futures.add(  cs.submit(()->geocoderByS2()));
futures.add(  cs.submit(()->geocoderByS3()));
// 擷取最快傳回的任務執行結果
Integer r = 0;
try {
  // 隻要有一個成功傳回,則break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    //簡單地通過判空來檢查是否成功傳回
    if (r != null) {
      break;
    }
  }
} finally {
  //取消所有任務
  for(Future<Integer> f : futures)
    f.cancel(true);
}
// 傳回結果
return r;
           

更多:鄧新