天天看點

從源碼了解線程池

Executor接口

源碼非常簡單,隻有一個execute(Runnable command)回調接口 

public interface Executor {
    void execute(Runnable command);
}           

執行已送出的 

Runnable

 任務對象。此接口提供一種将任務送出與每個任務将如何運作的機制(包括線程使用的細節、排程等)分離開來的方法。通常使用 Executor 而不是顯式地建立線程。例如,可能會使用以下方法

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
 ...           

不過,Executor 接口并沒有嚴格地要求執行是異步的。在最簡單的情況下,執行程式可以在調用方的線程中立即運作已送出的任務:

class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }           

更常見的是,任務是在某個不是調用方線程的線程中執行的。以下執行程式将為每個任務生成一個新線程。

class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }           

許多 Executor 實作都對排程任務的方式和時間強加了某種限制。以下執行程式使任務送出與第二個執行程式保持連續,這展示了一個複合執行程式。

class SerialExecutor implements Executor {
     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
     final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) {
         this.executor = executor;
     }

     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }
 }           

ExecutorService接口

該接口提供了管理終止的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。先看下源碼

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}           

Executors類為建立ExecutorService提供了便捷的工廠方法。它隻有一個直接實作類ThreadPoolExecutor和間接實作類ScheduledThreadPoolExecutor。

ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:

1、Future<?> submit(Runnable task)

2、<T> Future<T> submit(Callable<T> task)

通過建立并傳回一個可用于取消執行和/或等待完成的 Future,方法submit擴充了基本方法

Executor.execute(java.lang.Runnable)。

下面對ExecutorService的函數進行一下簡單介紹:

  • void shutdown():啟動一個關閉指令,不再接受新任務,當所有已送出任務執行完後,就關閉。
  • List<Runnable> shutdownNow():試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,并傳回等待執行的任務清單。它無法保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。例如,通過 Thread.interrupt() 來取消典型的實作,是以任何任務無法響應中斷都可能永遠無法終止。應該關閉未使用的 ExecutorService以允許回收其資源。
  • boolean isShutdown():如果此執行程式已關閉,則傳回 true。
  • boolean isTerminated():如果關閉後所有任務都已完成,則傳回 true。注意,除非首先調用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。
  • boolean awaitTermination(long timeout,TimeUnit unit) :如果此執行程式終止,則傳回 true;如果終止前逾時期滿,則傳回 false 
  • <T> Future<T> submit(Callable<T> task):送出一個有傳回值的任務用于執行,傳回一個表示任務結果的 Future。該 Future 的 get 方法在成功完成時将會傳回該任務的結果。
  • <T> Future<T> submit(Runnable task,T result):送出一個 Runnable 任務用于執行,并傳回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時将會傳回給定的結果。
  • Future<?> submit(Runnable task):送出一個 Runnable 任務用于執行,并傳回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時将會傳回 null。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):執行給定的任務,當所有任務完成時,傳回保持任務狀态和結果的 Future 清單。傳回清單的所有元素的 Future.isDone() 為 true。注意,該方法會一直阻塞直到所有任務完成。可以正常地或通過抛出異常來終止已完成任務。如果正在進行此操作時修改了給定的collection,則此方法的結果是不确定的。

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一個實作類,它使用可能的幾個線程池之一執行每個送出的任務,通常使用 Executors 工廠方法配置。

線程池可以解決兩個不同問題:由于減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,并且還可以提供綁定和管理資源(包括執行任務集時使用的線程)的方法。

每個 ThreadPoolExecutor 還維護着一些基本的統計資料,如完成的任務數。為了便于跨大量上下文使用,此類提供了很多可調整的參數和擴充鈎子 (hook)。

對于核心的幾個線程池,無論是 newFixedThreadPool()方法、 newSingleThreadExecutor()還是 newCachedThreadPool()方法, 雖然看起來建立的線程有着完全不同的功能特點, 但其内部實作均使用了 ThreadPoolExecutor實作。 下面給出了這三個線程池的實作方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
//使用一個基于FIFO排序的阻塞隊列,在所有corePoolSize線程都忙時新任務将在隊列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}           

由以上線程池的實作代碼可以看到, 它們都隻是 ThreadPoolExecutor 類的封裝 。 為何ThreadPoolExecutor有如此強大的功能呢? 來看一下 ThreadPoolExecutor最重要的構造函數:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //後兩個參數為可選參數           

參數說明:

  • corePoolSize:指定了線程池中的核心線程數
  • maximumPoolSize:最大可允許建立的線程數,corePoolSize和maximumPoolSize設定的邊界自動調整池大小:corePoolSize=運作的線程數= maximumPoolSize:建立固定大小的線程池
  • keepAliveTime:如果線程數多于corePoolSize,則這些多餘的線程的空閑時間超過keepAliveTime時将被終止
  • unit:keepAliveTime參數的時間機關
  • workQueue:儲存任務的阻塞隊列,被送出但尚未執行的任務,與線程池的大小有關:當運作的線程數少于corePoolSize時,在有新任務時直接建立新線程來執行任務而無需再進隊列;  當運作的線程數等于或多于corePoolSize,在有新任務添加時則先加入隊列,不直接建立線程;  當隊列滿時,在有新任務時就建立新線程
  • threadFactory:線程工廠,用于建立新線程,預設使用defaultThreadFactory建立線程
  • handle:定義處理被拒絕任務的政策,預設使用ThreadPoolExecutor.AbortPolicy,任務被拒絕時将抛出RejectExecutorException

ThreadPoolExecutor将根據corePoolSize和 maximumPoolSize設定的邊界自動調整線程池大小。當新任務在方法 execute(java.lang.Runnable) 中送出時,如果運作的線程少于corePoolSize,則建立新線程來執行新任務,即使線程池中的其他線程是空閑的; 如果運作的線程多于corePoolSize 而少于 maximumPoolSize,則僅當隊列滿時才建立新線程;如果設定的corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池;如果将 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許線程池适應任意數量的并發任務。

在大多數情況下,核心和最大池大小僅基于構造來設定,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動态更改。 

參考:

https://www.cnblogs.com/MOBIN/p/5436482.html http://cmsblogs.com/?p=2444 https://blog.csdn.net/linghu_java/article/details/17123057