天天看點

CompletionService 源碼解析

​CompletionService​

​​的主要作用是:按照異步任務的完成順序,逐個擷取到已經完成的異步任務。主要實作是在​

​ExecutorCompletionService​

​中。

類圖

CompletionService 源碼解析

核心内部類

private class QueueingFuture extends FutureTask<Void> {
  QueueingFuture(RunnableFuture<V> task) {
    super(task, null);
    this.task = task;
  }
  protected void done() { completionQueue.add(task); }
  private final Future<V> task;
}      

在​

​CompletionService​

​​的實作中,将任務​

​FutureTask​

​​做了擴充,實作了​

​FutureTask​

​​的​

​done​

​方法。當任務完成後會回調這個方法,這時我們在這個方法中将完成的任務放到隊列中,就實作了按照異步任務完成的順序,逐個處理任務的結果了。

核心屬性

// 執行任務的線程池
private final Executor executor;
// 存放已完成的異步任務的阻塞隊列,預設使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;      

構造函數

public ExecutorCompletionService(Executor executor) {
  if (executor == null)
    throw new NullPointerException();
  this.executor = executor;
  this.aes = (executor instanceof AbstractExecutorService) ?
    (AbstractExecutorService) executor : null;
  this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                 BlockingQueue<Future<V>> completionQueue) {
  if (executor == null || completionQueue == null)
    throw new NullPointerException();
  this.executor = executor;
  this.aes = (executor instanceof AbstractExecutorService) ?
    (AbstractExecutorService) executor : null;
  this.completionQueue = completionQueue;
}      

在構造函數中我們至少需要傳入一個​

​Executor​

​​線程池的實作來執行異步任務,但是建議再傳入一個阻塞隊列,預設的​

​LinkedBlockingQueue​

​是一個無界隊列,有記憶體溢出的風險。

submit 送出任務

public Future<V> submit(Callable<V> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<V> f = newTaskFor(task);
  executor.execute(new QueueingFuture(f));
  return f;
}      

我們可以看到,在送出任務給線程池之前,我們會将任務封裝成​

​QueueingFuture​

​​任務。當該任務執行完成後會回調執行​

​done​

​方法,将任務放到隊列。

擷取已完成的任務

public Future<V> take() throws InterruptedException {
  return completionQueue.take();
}

public Future<V> poll() {
  return completionQueue.poll();
}      
  • ​take​

    ​:如果沒有任務,一直阻塞,直到有新任務進來
  • ​poll​

    ​:如果沒有任務傳回NULL

示例

public class CompletionServiceTest {

    @Test
    public void test() throws ExecutionException, InterruptedException {
        Random random = new Random();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
        for (int i = 0; i < 8; i++) {
            completionService.submit(() -> {
                int time = random.nextInt(1000);
                sleep(time);
                System.out.println(Thread.currentThread().getName() + " 執行異步任務執行耗時: " + time);
                return time;
            });
        }

        while (true) {
            System.out.println(Thread.currentThread().getName() + " 主線程擷取到任務結果 " + completionService.take().get());
        }
    }


    public static void sleep(int probe) {
        try {
            Thread.sleep(probe);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}      
pool-1-thread-7 執行異步任務執行耗時: 153
main 主線程擷取到任務結果 153
pool-1-thread-5 執行異步任務執行耗時: 208
main 主線程擷取到任務結果 208
pool-1-thread-4 執行異步任務執行耗時: 242
main 主線程擷取到任務結果 242
pool-1-thread-8 執行異步任務執行耗時: 456
main 主線程擷取到任務結果 456
pool-1-thread-1 執行異步任務執行耗時: 567
main 主線程擷取到任務結果 567
pool-1-thread-2 執行異步任務執行耗時: 782
main 主線程擷取到任務結果 782
pool-1-thread-6 執行異步任務執行耗時: 796
main 主線程擷取到任務結果 796
pool-1-thread-3 執行異步任務執行耗時: 976
main 主線程擷取到任務結果 976      

我的是8核機器,是以任務的結束時間一定會按照任務的結束時間排序。

源碼

​​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​​

layering-cache

繼續閱讀