天天看點

java進階筆記線程與并發之ExecutorCompletionService簡介使用示例源碼簡析

簡介

  • ExecutorCompletionService類是CompletionService接口的實作,ExecutorCompletionService内部管理者一個已完成任務的阻塞隊列(可以自定義),ExecutorCompletionService引用了一個Executor, 用來執行任務
  • submit()方法最終會委托給内部的executor去執行任務
  • take/poll方法的工作都委托給内部的已完成任務阻塞隊列
    • take 如果阻塞隊列中有已完成的任務, take方法就傳回任務的結果, 否則阻塞等待任務完成
    • poll方法不同
      • poll方法不會阻塞等待結果,擷取時如果是沒有結果,則傳回null,可以執行在指定時間内等待

使用場景與注意:

  • 在送出一個批次任務後,擷取并使用這些任務的結果時使用。
  • 和自己記錄所有送出的任務的future不同,ExecutorCompletionService傾向于按照任務的完成先後順序給予結果,而碼者一般按照任務的先後順序,是以理論上ExecutorCompletionService等待的時間更少,處理效率更高。
  • 但是: 因為其會記錄線程執行的結果,是以如果執行了但是沒有使用,則會造成線程結果的堆積,造成記憶體洩露。
  • 為什麼會洩露 ?
    • 因為結果不是強制處理的,是可以堆積在記憶體中持續增長的。

使用示例

@Test
    public void test(){
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService(executor);
        IntStream.range(0,10).forEach(it -> executorCompletionService.submit(() -> it));
        IntStream.range(0,10).forEach(it -> {
            try {
                int value = executorCompletionService.take().get();
                System.out.print(value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
           
//結果輸出的值并不确定,和執行完成的順序有關
1203457689
           

源碼簡析

基于jdk1.8.

CompletionService

ps:// ExecutorCompletionService 完成的結果隊列如果自定義,可以考慮加入一個最大長度的限制。在生成環境中給予提示、警告、錯誤三種門檻值,并記錄相應日志或者提示,防止記憶體洩露并及時處理。

/**
* 将異步任務的生産和完成的結果處理做解耦,按照任務的完成順序
* 處理,例如來管理異步i/o.
* 記憶體一緻性: 任務送出先與線程中的操作,線程中操作閑魚take傳回
*/
public interface CompletionService<V> {
    /**
     * 送出一個任務
     */
    Future<V> submit(Callable<V> task);
 
    /**
     * 執行任務并傳回預置結果
     */
    Future<V> submit(Runnable task, V result);
 
    /**
     * 等待直到任務完成傳回結果,同時移除原存儲的結果
     * @throws InterruptedException if interrupted while waiting
     */
    Future<V> take() throws InterruptedException;
 
    /**
     * 等待并移出下一個Future,或者傳回null
     *
     * @return the Future representing the next completed task, or
     *         {@code null} if none are present
     */
    Future<V> poll();
 
    /**
     * 同poll,但是運作定義有最大等待時間
     */
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
           

ExecutorCompletionService

package java.util.concurrent;
 
/**
* 該類是輕量的,适合處理臨時任務
*/
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    //ps:完成結果隊列,容易記憶體洩露
    private final BlockingQueue<Future<V>> completionQueue;
 
    /**
     * QueueingFuture 加入了完成任務時加入隊列的功能,見down()
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
 
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
 
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }
 
    /**
     * ps: 特别判斷了參數是否是 AbstractExecutorService 類型
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
 
    /**
     * 自定義的完成隊列,但是可能在使用add加入是報錯(如果實作的不比對的話)
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
    //ps: 統一原則,統一轉為RunnableFuture,統一處理
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
 
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    //ps: 直接使用了隊列的take
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
 
    public Future<V> poll() {
        return completionQueue.poll();
    }
 
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
 
}