天天看点

CompletionService 源码解析

​CompletionService​

​​的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在​

​ExecutorCompletionService​

​中。

类图

CompletionService 源码解析

核心内部类

private class QueueingFuture extends FutureTask<Void> {
  QueueingFuture(RunnableFuture<V> task) {
    super(task, null);
    this.task = task;
  }
  protected void done() { completionQueue.add(task); }
  private final Future<V> task;
}      

在​

​CompletionService​

​​的实现中,将任务​

​FutureTask​

​​做了扩展,实现了​

​FutureTask​

​​的​

​done​

​方法。当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,就实现了按照异步任务完成的顺序,逐个处理任务的结果了。

核心属性

// 执行任务的线程池
private final Executor executor;
// 存放已完成的异步任务的阻塞队列,默认使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;      

构造函数

public ExecutorCompletionService(Executor executor) {
  if (executor == null)
    throw new NullPointerException();
  this.executor = executor;
  this.aes = (executor instanceof AbstractExecutorService) ?
    (AbstractExecutorService) executor : null;
  this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                 BlockingQueue<Future<V>> completionQueue) {
  if (executor == null || completionQueue == null)
    throw new NullPointerException();
  this.executor = executor;
  this.aes = (executor instanceof AbstractExecutorService) ?
    (AbstractExecutorService) executor : null;
  this.completionQueue = completionQueue;
}      

在构造函数中我们至少需要传入一个​

​Executor​

​​线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的​

​LinkedBlockingQueue​

​是一个无界队列,有内存溢出的风险。

submit 提交任务

public Future<V> submit(Callable<V> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<V> f = newTaskFor(task);
  executor.execute(new QueueingFuture(f));
  return f;
}      

我们可以看到,在提交任务给线程池之前,我们会将任务封装成​

​QueueingFuture​

​​任务。当该任务执行完成后会回调执行​

​done​

​方法,将任务放到队列。

获取已完成的任务

public Future<V> take() throws InterruptedException {
  return completionQueue.take();
}

public Future<V> poll() {
  return completionQueue.poll();
}      
  • ​take​

    ​:如果没有任务,一直阻塞,直到有新任务进来
  • ​poll​

    ​:如果没有任务返回NULL

示例

public class CompletionServiceTest {

    @Test
    public void test() throws ExecutionException, InterruptedException {
        Random random = new Random();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
        for (int i = 0; i < 8; i++) {
            completionService.submit(() -> {
                int time = random.nextInt(1000);
                sleep(time);
                System.out.println(Thread.currentThread().getName() + " 执行异步任务执行耗时: " + time);
                return time;
            });
        }

        while (true) {
            System.out.println(Thread.currentThread().getName() + " 主线程获取到任务结果 " + completionService.take().get());
        }
    }


    public static void sleep(int probe) {
        try {
            Thread.sleep(probe);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}      
pool-1-thread-7 执行异步任务执行耗时: 153
main 主线程获取到任务结果 153
pool-1-thread-5 执行异步任务执行耗时: 208
main 主线程获取到任务结果 208
pool-1-thread-4 执行异步任务执行耗时: 242
main 主线程获取到任务结果 242
pool-1-thread-8 执行异步任务执行耗时: 456
main 主线程获取到任务结果 456
pool-1-thread-1 执行异步任务执行耗时: 567
main 主线程获取到任务结果 567
pool-1-thread-2 执行异步任务执行耗时: 782
main 主线程获取到任务结果 782
pool-1-thread-6 执行异步任务执行耗时: 796
main 主线程获取到任务结果 796
pool-1-thread-3 执行异步任务执行耗时: 976
main 主线程获取到任务结果 976      

我的是8核机器,所以任务的结束时间一定会按照任务的结束时间排序。

源码

​​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​​

layering-cache

继续阅读