天天看點

Java —— Executors、ExecutorService、ThreadPoolExecutor線程池

Executors類

工具類、線程池的工廠類,用于建立并傳回不同類型的線程池。

  • Executors.newCachedThreadPool():建立一個可根據需要建立新線程的線程池
  • Executors.newFixedThreadPool(n); 建立一個可重用固定線程數的線程池
  • Executors.newSingleThreadExecutor() :建立一個隻有一個線程的線程池
  • Executors.newScheduledThreadPool(n):建立一個線程池,它可安排在給定延遲後運 行指令或者定期地執行。

 方法

參數和類型 方法和描述

static Callable<Object>

callable(PrivilegedAction<?> action)

傳回一個

Callable

對象,當被調用時,它運作給定的特權動作并傳回其結果。

static Callable<Object>

callable(PrivilegedExceptionAction<?> action)

Callable

對象,該對象在被調用時運作給定的特權異常操作并傳回其結果。

static Callable<Object>

callable(Runnable task)

Callable

對象,當被調用時,它運作給定的任務并傳回

null

static <T> Callable<T>

callable(Runnable task, T result)

Callable

對象,當被調用時,它運作給定的任務并傳回給定的結果。

static ThreadFactory

defaultThreadFactory()

傳回用于建立新線程的預設線程工廠。

static ExecutorService

newCachedThreadPool()

建立一個根據需要建立新線程的線程池,但在可用時将重新使用以前構造的線程。

static ExecutorService

newCachedThreadPool(ThreadFactory threadFactory)

建立一個根據需要建立新線程的線程池,但在可用時将重新使用以前構造的線程,并在需要時使用提供的ThreadFactory建立新線程。

static ExecutorService

newFixedThreadPool(int nThreads)

建立一個線程池,該線程池重用固定數量的從共享無界隊列中運作的線程。

static ExecutorService

newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

建立一個線程池,重用固定數量的線程,從共享無界隊列中運作,使用提供的ThreadFactory在需要時建立新線程。

static ScheduledExecutorService

newScheduledThreadPool(int corePoolSize)

建立一個線程池,可以排程指令在給定的延遲之後運作,或定期執行。

static ScheduledExecutorService

newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

static ExecutorService

newSingleThreadExecutor()

建立一個使用從無界隊列運作的單個工作線程的執行程式。

static ExecutorService

newSingleThreadExecutor(ThreadFactory threadFactory)

建立一個使用單個工作線程運作無界隊列的執行程式,并在需要時使用提供的ThreadFactory建立一個新線程。

static ScheduledExecutorService

newSingleThreadScheduledExecutor()

建立一個單線程執行器,可以排程指令在給定的延遲之後運作,或定期執行。

static ScheduledExecutorService

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

static ExecutorService

newWorkStealingPool()

建立使用所有

available processors

作為其目标并行級别的工作竊取線程池。

static ExecutorService

newWorkStealingPool(int parallelism)

建立一個維護足夠的線程以支援給定的并行級别的線程池,并且可以使用多個隊列來減少争用。

static <T> Callable<T>

privilegedCallable(Callable<T> callable)

Callable

對象,當被調用時,将在目前通路控制上下文中執行給定的

callable

static <T> Callable<T>

privilegedCallableUsingCurrentClassLoader(Callable<T> callable)

Callable

callable

,目前上下文類加載器作為上下文類加載器。

static ThreadFactory

privilegedThreadFactory()

傳回一個用于建立與目前線程具有相同權限的新線程的線程工廠。

static ExecutorService

unconfigurableExecutorService(ExecutorService executor)

傳回一個将所有定義的

ExecutorService

方法委托給給定執行程式的對象,但不能以其他方式使用轉換方式通路。

static ScheduledExecutorService

unconfigurableScheduledExecutorService(ScheduledExecutorService executor)

ScheduledExecutorService

ExecutorService接口

一個

ExecutorService

可以關閉,這将導緻它拒絕新的任務。 提供了兩種不同的方法來關閉

ExecutorService

shutdown()

方法将允許先前送出的任務在終止之前執行,而

shutdownNow()

方法可以防止等待任務啟動并嘗試停止目前正在執行的任務。 一旦終止,執行者沒有任務正在執行,沒有任務正在等待執行,并且不能送出新的任務。 應關閉未使用的

ExecutorService

以允許資源的回收。

方法

submit

延伸的基方法

Executor.execute(Runnable)

通過建立并傳回一個

Future

可用于取消執行和/或等待完成。 方法

invokeAny

invokeAll

執行

invokeAll

執行最常用的形式,執行任務集合,然後等待至少一個或全部完成。 (類别

ExecutorCompletionService

可用于編寫這些方法的自定義變體。)

Executors

類為此包中提供的執行程式服務提供了工廠方法。

boolean

awaitTermination(long timeout, TimeUnit unit)

阻止所有任務在關閉請求完成後執行,或發生逾時,或目前線程中斷,以先到者為準。

<T> List<Future<T>>

invokeAll(Collection<? extends Callable<T>> tasks)

執行給定的任務,傳回持有他們的狀态和結果的所有完成的期貨清單。

<T> List<Future<T>>

invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

執行給定的任務,傳回在所有完成或逾時到期時持有其狀态和結果的期貨清單,以先發生者為準。

<T> T

invokeAny(Collection<? extends Callable<T>> tasks)

執行給定的任務,傳回一個成功完成的結果(即沒有抛出異常),如果有的話。

<T> T

invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

執行給定的任務,傳回一個已經成功完成的結果(即,不抛出異常),如果有的話在給定的逾時之前過去。

boolean

isShutdown()

如果此執行者已關閉,則傳回

true

boolean

isTerminated()

如果所有任務在關閉後完成,則傳回

true

void

shutdown()

啟動有序關閉,其中先前送出的任務将被執行,但不會接受任何新任務。

List<Runnable>

shutdownNow()

嘗試停止所有主動執行的任務,停止等待任務的處理,并傳回正在等待執行的任務清單。

<T> Future<T>

submit(Callable<T> task)

送出值傳回任務以執行,并傳回代表任務待處理結果的Future。

Future<?>

submit(Runnable task)

送出一個可運作的任務執行,并傳回一個表示該任務的未來。

<T> Future<T>

submit(Runnable task, T result)

void execute(Runnable command) :執行任務/指令,沒有傳回值,一般用來執行 Runnable

例如: 

package com.test;

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main8 {
    public static void main(String[] args) {
        //執行個體化線程池,并且指定固定連接配接數為15
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        //送出任務1
        executorService.submit(new Task());
        //送出任務2
        executorService.submit(new Task());
        //關閉任務執行
        executorService.shutdown();

    }
}
class Task implements Runnable{

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            if (i%20 == 0){
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "   "+i);
            }
        }
    }
}           

ThreadPoolExecutor線程池

線程池解決兩個不同的問題:由于每個任務的調用開銷減少,它們通常在執行大量異步任務時提供改進的性能,并且它們提供了一種限制和管理資源(包括執行一個任務。 每個

ThreadPoolExecutor

還維護一些基本統計資訊,例如已完成任務的數量。

為了在廣泛的上下文中有用,此類提供了許多可調參數和可擴充性鈎子。 然而,程式員被敦促使用更友善的

Executors

工廠方法

Executors.newCachedThreadPool()

(無限線程池,具有自動線程回收),

Executors.newFixedThreadPool(int)

(固定大小的線程池)和

Executors.newSingleThreadExecutor()

(單個背景線程),可以預先配置最常用的使用場景設定。 否則,手動配置和調優此類時,請使用以下指南:

核心和最大池大小

ThreadPoolExecutor

将自動調整池大小和maximumPoolSize。 當方法

execute(Runnable)

中送出了新任務,并且運作的corePoolSize線程少于一個,即使其他工作線程處于空閑狀态,也會建立一個新的線程來處理該請求。 如果超過corePoolSize但小于maximumPoolSize線程運作,則僅當隊列已滿時才會建立一個新線程。 通過将corePoolSize和maximumPoolSize設定為相同,您将建立一個固定大小的線程池。 通過将maximumPoolSize設定為本質上無限制的值(如

Integer.MAX_VALUE

,您可以允許池容納任意數量的并發任務。 最典型的是,核心和最大池大小隻能在建構時進行設定,但也可以使用

setCorePoolSize(int)

setMaximumPoolSize(int)

進行動态 更改 。

按需施工

預設情況下,即使核心線程最初建立并且隻有在新任務到達時才啟動,但是可以使用方法

prestartCoreThread()

prestartAllCoreThreads()

動态地覆寫 。 如果您使用非空隊列建構池,則可能需要預先提供線程。

建立新線程

新線程使用

ThreadFactory

建立。 如果沒有另外指定,則使用

Executors.defaultThreadFactory()

,它建立所有線程與所有相同的

ThreadGroup

并且具有相同的優先級和非守護程序狀态

NORM_PRIORITY

。 通過提供不同的ThreadFactory,您可以更改線程的名稱,線程組,優先級,守護程序狀态等。如果

ThreadFactory

在從

newThread

傳回null請求時無法建立線程,則執行程式将繼續,但可能無法執行任務 線程應該擁有“modifyThread”

RuntimePermission

。 如果使用池的工作線程或其他線程不具有此權限,則服務可能會降級:配置更改可能不會及時生效,并且關閉池可能仍處于可能終止但未完成的狀态。

活着的時代

如果池目前具有多于corePoolSize線程,則如果空閑超過keepAliveTime(見

getKeepAliveTime(TimeUnit)

),則多餘的線程将被終止。 這提供了當池未被主動使用時減少資源消耗的方法。 如果稍後池變得更加活躍,将建構新的線程。 此參數也可以使用方法

setKeepAliveTime(long, TimeUnit)

動态更改 。 使用值

Long.MAX_VALUE

TimeUnit.NANOSECONDS

有效地禁用空閑線程在關閉之前終止。 預設情況下,僅當存在多于corePoolSize線程時,保持活動政策才适用。 但是方法

allowCoreThreadTimeOut(boolean)

也可以用于将這個逾時政策應用于核心線程,隻要keepAliveTime值不為零。

排隊

任何

BlockingQueue

可用于傳送和保留送出的任務。 這個隊列的使用與池大小互相作用:

  • 如果少于corePoolSize線程正在運作,Executor總是喜歡添加一個新線程,而不是排隊。
  • 如果corePoolSize或更多的線程正在運作,Executor總是喜歡排隊請求而不是添加一個新的線程。
  • 如果請求無法排隊,則會建立一個新線程,除非這将超出maximumPoolSize,否則任務将被拒絕。

排隊有三種一般政策:

  1. 直接切換 一個工作隊列的一個很好的預設選擇是一個

    SynchronousQueue

    ,将任務交給線程,無需另外控制。 在這裡,如果沒有線程可以立即運作,那麼嘗試排隊任務會失敗,是以将建構一個新的線程。 處理可能具有内部依賴關系的請求集時,此政策可避免鎖定。 直接切換通常需要無限制的maximumPoolSizes,以避免拒絕新送出的任務。 這反過來允許無限線程增長的可能性,當指令繼續以平均速度比他們可以處理的速度更快地到達時。
  2. 無界隊列 使用無界隊列(例如

    LinkedBlockingQueue

    沒有預定容量)會導緻新的任務,在隊列中等待,當所有corePoolSize線程都很忙。 是以,不會再建立corePoolSize線程。 (是以,最大值大小的值沒有任何影響。)每個任務完全獨立于其他任務時,這可能是适當的,是以任務不會影響其他執行; 例如,在網頁伺服器中。 雖然這種排隊風格可以有助于平滑瞬态突發的請求,但是當指令繼續達到的平均速度比可以處理的速度更快時,它承認無界工作隊列增長的可能性。
  3. 有邊界的隊列。 有限隊列(例如,

    ArrayBlockingQueue

    )有助于在使用有限maxPoolSizes時防止資源耗盡,但可能更難調整和控制。 隊列大小和最大池大小可能彼此交易:使用大隊列和小型池可以最大限度地減少CPU使用率,OS資源和上下文切換開銷,但可能導緻人為的低吞吐量。 如果任務頻繁阻塞(例如,如果它們是I / O綁定),則系統可能能夠安排比您允許的更多線程的時間。 使用小型隊列通常需要較大的池大小,這樣可以使CPU繁忙,但可能會遇到不可接受的排程開銷,這也降低了吞吐量。

被拒絕的任務

execute(Runnable)

中送出的新任務将在執行程式關閉時被拒絕 ,并且當執行程式對最大線程和工作隊列容量使用有限邊界并且飽和時。 在任一情況下,

execute

方法調用

RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)

其的方法

RejectedExecutionHandler

。 提供了四個預定義的處理程式政策:可以定義和使用其他類型的

RejectedExecutionHandler

類。 這樣做需要特别注意,特别是當政策被設計為僅在特定容量或排隊政策下工作時。

  1. 在預設

    ThreadPoolExecutor.AbortPolicy

    ,處理程式會引發運作

    RejectedExecutionException

    後排斥反應。
  2. ThreadPoolExecutor.CallerRunsPolicy

    中,調用

    execute

    本身的線程運作任務。 這提供了一個簡單的回報控制機制,将降低新任務送出的速度。
  3. ThreadPoolExecutor.DiscardPolicy

    中 ,簡單地删除無法執行的任務。
  4. ThreadPoolExecutor.DiscardOldestPolicy

    中 ,如果執行程式沒有關閉,則工作隊列頭部的任務被删除,然後重試執行(可能會再次失敗,導緻重複)。

鈎子方法

該類提供了在每個任務執行之前和之後調用的

protected

覆寫的

beforeExecute(Thread, Runnable)

afterExecute(Runnable, Throwable)

方法。 這些可以用來操縱執行環境; 例如,重新初始化ThreadLocals,收集統計資訊或添加日志條目。 另外,方法

terminated()

可以被覆寫,以執行執行程式完全終止後需要執行的任何特殊處理。

如果鈎子或回調方法抛出異常,内部工作線程可能會失敗并突然終止。

隊列維護

getQueue()

允許通路工作隊列以進行監視和調試。 強烈不鼓勵将此方法用于任何其他目的。 當提供大量排隊任務被取消時,兩種提供的方法

remove(Runnable)

purge()

可用于協助進行存儲回收。

定稿

即不再在程式中引用, 并沒有剩餘的線程将成為池

shutdown

自動。 如果您希望確定未引用的池被回收,即使使用者忘記調用

shutdown()

,則必須安排未使用的線程最終當機,通過設定适當的保持活動時間,使用零個核心線程的下限和/或設定

allowCoreThreadTimeOut(boolean)

構造方法

構造方法和描述

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

建立一個新的

ThreadPoolExecutor

與給定的初始參數和預設線程工廠和拒絕執行處理程式。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

ThreadPoolExecutor

與給定的初始參數和預設線程工廠。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

ThreadPoolExecutor

與給定的初始參數和預設拒絕執行處理程式。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

建立一個新

ThreadPoolExecutor

給定的初始參數。

protected void

afterExecute(Runnable r, Throwable t)

完成指定Runnable的執行後調用方法。

void

allowCoreThreadTimeOut(boolean value)

設定政策是否核心線程可能會逾時,如果任務沒有在活着的時間内到達,則在新任務到達時被替換。

boolean

allowsCoreThreadTimeOut()

如果此池允許核心線程逾時并終止,如果沒有任務在keepAlive時間内到達,則傳回true,如果新任務到達時需要更換。

boolean

awaitTermination(long timeout, TimeUnit unit)

protected void

beforeExecute(Thread t, Runnable r)

在給定的線程中執行給定的Runnable之前調用方法。

void

execute(Runnable command)

在将來某個時候執行給定的任務。

protected void

finalize()

當這個執行器不再被引用并且沒有線程時,調用

shutdown

int

getActiveCount()

傳回正在執行任務的線程的大概數量。

long

getCompletedTaskCount()

傳回完成執行的任務的大緻總數。

int

getCorePoolSize()

傳回核心線程數。

long

getKeepAliveTime(TimeUnit unit)

傳回線程保持活動時間,這是超過核心池大小的線程在終止之前可能保持空閑的時間量。

int

getLargestPoolSize()

傳回在池中同時進行的最大線程數。

int

getMaximumPoolSize()

傳回允許的最大線程數。

int

getPoolSize()

傳回池中目前的線程數。

BlockingQueue<Runnable>

getQueue()

傳回此執行程式使用的任務隊列。

RejectedExecutionHandler

getRejectedExecutionHandler()

傳回不可執行任務的目前處理程式。

long

getTaskCount()

傳回計劃執行的任務的大概總數。

ThreadFactory

getThreadFactory()

傳回用于建立新線程的線程工廠。

boolean

isShutdown()

true

boolean

isTerminated()

true

boolean

isTerminating()

如果此執行者在

shutdown()

shutdownNow()

之後 終止 ,但尚未完全終止,則傳回true。

int

prestartAllCoreThreads()

啟動所有核心線程,導緻他們等待工作。

boolean

prestartCoreThread()

啟動核心線程,使其無法等待工作。

void

purge()

嘗試從工作隊列中删除已取消的所有

Future

任務。

boolean

remove(Runnable task)

如果此任務存在,則從執行程式的内部隊列中删除此任務,進而導緻該任務尚未運作。

void

setCorePoolSize(int corePoolSize)

設定核心線程數。

void

setKeepAliveTime(long time, TimeUnit unit)

設定線程在終止之前可能保持空閑的時間限制。

void

setMaximumPoolSize(int maximumPoolSize)

設定允許的最大線程數。

void

setRejectedExecutionHandler(RejectedExecutionHandler handler)

為不可執行的任務設定一個新的處理程式。

void

setThreadFactory(ThreadFactory threadFactory)

設定用于建立新線程的線程工廠。

void

shutdown()

List<Runnable>

shutdownNow()

protected void

terminated()

執行程式已終止時調用方法。

String

toString()

package com.test;

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main8 {
    public static void main(String[] args) {
        //建立一個可根據需要建立新線程的線程池
        ExecutorService executorService = Executors.newCachedThreadPool();

        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
        //設定核心線程數
        threadPoolExecutor.setCorePoolSize(10);

        //送出任務1
        executorService.submit(new Task());
        //送出任務2
        executorService.submit(new Task());
        //關閉任務執行
        executorService.shutdown();

        //關閉線程池
        //threadPoolExecutor.shutdown();

    }
}
class Task implements Runnable{

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            if (i%20 == 0){
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "   "+i);
            }
        }
    }
}           

繼續閱讀