CompletionService
的主要作用是:按照異步任務的完成順序,逐個擷取到已經完成的異步任務。主要實作是在
ExecutorCompletionService
中。
類圖
核心内部類
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
在
CompletionService
的實作中,将任務
FutureTask
做了擴充,實作了
FutureTask
的
done
方法。當任務完成後會回調這個方法,這時我們在這個方法中将完成的任務放到隊列中,就實作了按照異步任務完成的順序,逐個處理任務的結果了。
核心屬性
// 執行任務的線程池
private final Executor executor;
// 存放已完成的異步任務的阻塞隊列,預設使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;
構造函數
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
在構造函數中我們至少需要傳入一個
Executor
線程池的實作來執行異步任務,但是建議再傳入一個阻塞隊列,預設的
LinkedBlockingQueue
是一個無界隊列,有記憶體溢出的風險。
submit 送出任務
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
我們可以看到,在送出任務給線程池之前,我們會将任務封裝成
QueueingFuture
任務。當該任務執行完成後會回調執行
done
方法,将任務放到隊列。
擷取已完成的任務
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
-
:如果沒有任務,一直阻塞,直到有新任務進來take
-
:如果沒有任務傳回NULLpoll
示例
public class CompletionServiceTest {
@Test
public void test() throws ExecutionException, InterruptedException {
Random random = new Random();
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
for (int i = 0; i < 8; i++) {
completionService.submit(() -> {
int time = random.nextInt(1000);
sleep(time);
System.out.println(Thread.currentThread().getName() + " 執行異步任務執行耗時: " + time);
return time;
});
}
while (true) {
System.out.println(Thread.currentThread().getName() + " 主線程擷取到任務結果 " + completionService.take().get());
}
}
public static void sleep(int probe) {
try {
Thread.sleep(probe);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-7 執行異步任務執行耗時: 153
main 主線程擷取到任務結果 153
pool-1-thread-5 執行異步任務執行耗時: 208
main 主線程擷取到任務結果 208
pool-1-thread-4 執行異步任務執行耗時: 242
main 主線程擷取到任務結果 242
pool-1-thread-8 執行異步任務執行耗時: 456
main 主線程擷取到任務結果 456
pool-1-thread-1 執行異步任務執行耗時: 567
main 主線程擷取到任務結果 567
pool-1-thread-2 執行異步任務執行耗時: 782
main 主線程擷取到任務結果 782
pool-1-thread-6 執行異步任務執行耗時: 796
main 主線程擷取到任務結果 796
pool-1-thread-3 執行異步任務執行耗時: 976
main 主線程擷取到任務結果 976
我的是8核機器,是以任務的結束時間一定會按照任務的結束時間排序。
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases