簡介
- ExecutorCompletionService類是CompletionService接口的實作,ExecutorCompletionService内部管理者一個已完成任務的阻塞隊列(可以自定義),ExecutorCompletionService引用了一個Executor, 用來執行任務
- submit()方法最終會委托給内部的executor去執行任務
- take/poll方法的工作都委托給内部的已完成任務阻塞隊列
- take 如果阻塞隊列中有已完成的任務, take方法就傳回任務的結果, 否則阻塞等待任務完成
- poll方法不同
- poll方法不會阻塞等待結果,擷取時如果是沒有結果,則傳回null,可以執行在指定時間内等待
使用場景與注意:
- 在送出一個批次任務後,擷取并使用這些任務的結果時使用。
- 和自己記錄所有送出的任務的future不同,ExecutorCompletionService傾向于按照任務的完成先後順序給予結果,而碼者一般按照任務的先後順序,是以理論上ExecutorCompletionService等待的時間更少,處理效率更高。
- 但是: 因為其會記錄線程執行的結果,是以如果執行了但是沒有使用,則會造成線程結果的堆積,造成記憶體洩露。
- 為什麼會洩露 ?
- 因為結果不是強制處理的,是可以堆積在記憶體中持續增長的。
使用示例
@Test
public void test(){
ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService(executor);
IntStream.range(0,10).forEach(it -> executorCompletionService.submit(() -> it));
IntStream.range(0,10).forEach(it -> {
try {
int value = executorCompletionService.take().get();
System.out.print(value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
//結果輸出的值并不确定,和執行完成的順序有關
1203457689
源碼簡析
基于jdk1.8.
CompletionService
ps:// ExecutorCompletionService 完成的結果隊列如果自定義,可以考慮加入一個最大長度的限制。在生成環境中給予提示、警告、錯誤三種門檻值,并記錄相應日志或者提示,防止記憶體洩露并及時處理。
/**
* 将異步任務的生産和完成的結果處理做解耦,按照任務的完成順序
* 處理,例如來管理異步i/o.
* 記憶體一緻性: 任務送出先與線程中的操作,線程中操作閑魚take傳回
*/
public interface CompletionService<V> {
/**
* 送出一個任務
*/
Future<V> submit(Callable<V> task);
/**
* 執行任務并傳回預置結果
*/
Future<V> submit(Runnable task, V result);
/**
* 等待直到任務完成傳回結果,同時移除原存儲的結果
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take() throws InterruptedException;
/**
* 等待并移出下一個Future,或者傳回null
*
* @return the Future representing the next completed task, or
* {@code null} if none are present
*/
Future<V> poll();
/**
* 同poll,但是運作定義有最大等待時間
*/
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
ExecutorCompletionService
package java.util.concurrent;
/**
* 該類是輕量的,适合處理臨時任務
*/
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
//ps:完成結果隊列,容易記憶體洩露
private final BlockingQueue<Future<V>> completionQueue;
/**
* QueueingFuture 加入了完成任務時加入隊列的功能,見down()
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
/**
* ps: 特别判斷了參數是否是 AbstractExecutorService 類型
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* 自定義的完成隊列,但是可能在使用add加入是報錯(如果實作的不比對的話)
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
//ps: 統一原則,統一轉為RunnableFuture,統一處理
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
//ps: 直接使用了隊列的take
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}