CompletionService
-
- 前言
- 1.CompletionService原理:
- 2.如何建立CompletionService?
- 3.使用CompletionService完成詢價系統的優化。
- 4.CompletionService接口介紹
- 5.利用CompletionService實作dubbo中的Forking Cluster
前言
怎麼個批量擷取異步任務?
看下面詢價系統的執行個體代碼:
如果異步查詢電商S2的時間非常短,因為get()方法是阻塞方法,是以必須要等待f1.get()擷取成功之後才可以向下執行。
ExecutorService executor = Executors.newFixedThreadPool(3); //建立線程池
// 異步向電商S1詢價
Future<Integer> f1 = executor.submit( ()->getPriceByS1());
// 異步向電商S2詢價
Future<Integer> f2 = executor.submit( ()->getPriceByS2());
// 異步向電商S3詢價
Future<Integer> f3 = executor.submit( ()->getPriceByS3());
// 擷取電商S1報價并儲存
r=f1.get();
executor.execute(()->save(r));
// 擷取電商S2報價并儲存
r=f2.get();
executor.execute(()->save(r));
// 擷取電商S3報價并儲存
r=f3.get();
executor.execute(()->save(r));
那麼怎麼才可以獲得先執行結束的任務結果呢?
- 你可能會想到阻塞隊列,把get()擷取結果異步執行,并存放到阻塞隊列中,這樣每次從阻塞隊列中取出先執行結束的線程結果。
- JAVA SDK并發包中提供了的CompletionService就可以完成上面的操作,下面就來介紹CompletionService工具。
1.CompletionService原理:
其實CompletionService内部也是維護了一個阻塞隊列,當任務執行結束就把任務的執行結果加入到阻塞隊列中,不同的是它是把任務執行結果的Future對象加入到阻塞隊列中了,而上面的代碼是把任務的執行結果放入了隊列中。
2.如何建立CompletionService?
CompletionService的接口實作類是ExecutorCompletionService,這個實作類的構造方法有兩個:
1. ExecutorCompletionService(Executor executor)
2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<v>> completionQueue)
1.需要傳入線程池。
2.除了需要傳入一個線程池,還需要傳入一個阻塞隊列,任務的執行結果就是放在completionQueue隊列中的。
如果不指定阻塞隊列,預設使用無界的LinkedBlockQueue,任務執行結果的Future就是放在這裡面的。
3.使用CompletionService完成詢價系統的優化。
// 建立線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 異步向電商S1詢價
cs.submit(()->getPriceByS1());
// 異步向電商S2詢價
cs.submit(()->getPriceByS2());
// 異步向電商S3詢價
cs.submit(()->getPriceByS3());
// 将詢價結果異步儲存到資料庫
for (int i=0; i<3; i++) {
Integer r = cs.take().get(); //擷取cs阻塞隊列中的Future對象,然後獲得結果int。
executor.execute(()->save(r));
}
4.CompletionService接口介紹
CompletionService接口提供5個方法:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
兩個submit送出方法。
- 一個參數是Callable,前面執行個體代碼中送出任務就是用的它;
- 另外一個參數是Runnalbe,result,和Future一文中的其中一種submit方法類似。
剩下的三個方法都與阻塞隊列有關。take(),poll()方法都是從阻塞隊列中擷取并移除一個元素.
- take():如果阻塞隊列是空,調用take方法的線程會陷入阻塞。
- poll(): 如果阻塞隊列是空,調用poll方法會傳回null值,
- 并且poll()方法還支援逾時等待的方式,如果等待逾時後,隊列還是空,就還是傳回null值。
5.利用CompletionService實作dubbo中的Forking Cluster
Dubbo中有一種Forking叢集,在這種叢集模式下,支援并行地調用多個查詢服務,隻要有一個服務傳回結果,整個服務就可以傳回了。
例如你需要一個地圖的服務,為了保證高可用和高性能,你可以并行調用三個地圖服務商的API,然後隻要有一個服務傳回,這個服務就可以傳回了。
下面看代碼執行個體:
geocoder(addr) {
//并行執行以下3個查詢服務,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//隻要r1,r2,r3有一個傳回
//則傳回
return r1|r2|r3;
}
利用CompletionService實作Forking這種叢集模式:
// 建立線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于儲存Future對象
List<Future<Integer>> futures = new ArrayList<>(3);
//送出異步任務,并儲存future到futures
futures.add( cs.submit(()->geocoderByS1()));//添加 異步任務傳回的Future結果
futures.add( cs.submit(()->geocoderByS2()));
futures.add( cs.submit(()->geocoderByS3()));
// 擷取最快傳回的任務執行結果
Integer r = 0;
try {
// 隻要有一個成功傳回,則break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//簡單地通過判空來檢查是否成功傳回
if (r != null) {
break;
}
}
} finally {
//取消所有任務
for(Future<Integer> f : futures)
f.cancel(true);
}
// 傳回結果
return r;
更多:鄧新