CompletionService
的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在
ExecutorCompletionService
中。
类图
核心内部类
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
在
CompletionService
的实现中,将任务
FutureTask
做了扩展,实现了
FutureTask
的
done
方法。当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,就实现了按照异步任务完成的顺序,逐个处理任务的结果了。
核心属性
// 执行任务的线程池
private final Executor executor;
// 存放已完成的异步任务的阻塞队列,默认使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;
构造函数
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
在构造函数中我们至少需要传入一个
Executor
线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的
LinkedBlockingQueue
是一个无界队列,有内存溢出的风险。
submit 提交任务
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
我们可以看到,在提交任务给线程池之前,我们会将任务封装成
QueueingFuture
任务。当该任务执行完成后会回调执行
done
方法,将任务放到队列。
获取已完成的任务
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
-
:如果没有任务,一直阻塞,直到有新任务进来take
-
:如果没有任务返回NULLpoll
示例
public class CompletionServiceTest {
@Test
public void test() throws ExecutionException, InterruptedException {
Random random = new Random();
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
for (int i = 0; i < 8; i++) {
completionService.submit(() -> {
int time = random.nextInt(1000);
sleep(time);
System.out.println(Thread.currentThread().getName() + " 执行异步任务执行耗时: " + time);
return time;
});
}
while (true) {
System.out.println(Thread.currentThread().getName() + " 主线程获取到任务结果 " + completionService.take().get());
}
}
public static void sleep(int probe) {
try {
Thread.sleep(probe);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-7 执行异步任务执行耗时: 153
main 主线程获取到任务结果 153
pool-1-thread-5 执行异步任务执行耗时: 208
main 主线程获取到任务结果 208
pool-1-thread-4 执行异步任务执行耗时: 242
main 主线程获取到任务结果 242
pool-1-thread-8 执行异步任务执行耗时: 456
main 主线程获取到任务结果 456
pool-1-thread-1 执行异步任务执行耗时: 567
main 主线程获取到任务结果 567
pool-1-thread-2 执行异步任务执行耗时: 782
main 主线程获取到任务结果 782
pool-1-thread-6 执行异步任务执行耗时: 796
main 主线程获取到任务结果 796
pool-1-thread-3 执行异步任务执行耗时: 976
main 主线程获取到任务结果 976
我的是8核机器,所以任务的结束时间一定会按照任务的结束时间排序。
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases