天天看點

Java線程池詳解及常用方法

Java線程池詳解及常用方法

前言

最近被問到了線程池的相關問題。于是準備開始寫一些多線程相關的文章。這篇将介紹一下線程池的基本使用。

Executors

Executors是concurrent包下的一個類,為我們提供了建立線程池的簡便方法。

Executors可以建立我們常用的四種線程池:

(1)newCachedThreadPool 建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。不設上限,送出的任務将立即執行。

(2)newFixedThreadPool 建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

(3)newScheduledThreadPool 建立一個定長線程池,支援定時及周期性任務執行。

(4)newSingleThreadExecutor 建立一個單線程化的線程池執行任務。

Executors的壞處

正常來說,我們不應該使用這種方式建立線程池,應該使用ThreadPoolExecutor來建立線程池。Executors建立的線程池也是調用的ThreadPoolExcutor的構造函數。通過原來可以看出。

我們也看到了這裡面的LinkedBlockingQueue并沒有指定隊列的大小是一個無界隊列,這樣可能會造成oom。是以我們要使用ThreadPoolExecutor這種方式。

ThreadPoolExecutor

通過源碼看到ThreadPoolExecutor比較全的構造函數如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}           

分别解釋一下參數的意義

corePoolSize:線程池長期維持的線程數,即使線程處于Idle狀态,也不會回收。

maximumPoolSize:線程數的上限

keepAliveTime:空閑的時間,超過這個空閑時間,線程将被回收

unit:空閑時間的時間機關

workQueue:任務的排隊隊列,當線程都運作的時候,有空的線程将從隊列彙總進行拿取

threadFactroy:當核心線程小于滿線程的時候,又需要多加線程,則需要從工廠中擷取線程

handler:拒絕政策,當線程過多的時候的政策

線程池針對于任務的執行順序

首先任務過來之後,看看corePoolSize是否有空閑的,有的話就執行。沒有的話,放入任務隊列裡面。然後任務隊列會通知線程工廠,趕緊造幾個線程,來執行。當任務超過了最大的線程數,就執行拒絕政策,拒絕執行。

submit方法

線程池建立完畢之後,我們就需要往線程池送出任務。通過線程池的submit方法即可。

submit方法接收兩種Runable和Callable。

差別如下:

Runable是實作該接口的run方法,callable是實作接口的call方法。

callable允許使用傳回值。

callable允許抛出異常。

送出任務的方式

Future submit(Callable task):這種方式可以拿到傳回的結果。

void execute(Runnable command):這種方式拿不到。

Future<?> submit(Runnable task):這種方式可以get,但是永遠是null。

blockqueue的限制

我們在建立線程池的時候,如果使用Executors。建立的是無界隊列,容易造成oom。是以我們要自己執行queue的大小。

BlockingQueue queue = new ArrayBlockingQueue<>(512)

拒絕政策

當任務隊列的queue滿了的時候,在送出任務,就要觸發拒絕政策。隊列中預設的拒絕政策是 AbortPolicy。是直接抛出異常的一種政策。

如果是想實作自定義的政策,可以實作RejectedExecutionHandler 接口。

線程池提供了如下的幾種政策供選擇。

AbortPolicy:預設政策,抛出RejectedExecutionException

DiscardPolicy:忽略目前送出的任務

DiscardOldestPolicy:丢棄任務隊列中最老的任務,給新任務騰出地方

CallerRunsPolicy:由送出任務者執行這個任務

ExecutorService executorService = new ThreadPoolExecutor(2, 2,

0, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<>(512), 
            new ThreadPoolExecutor.DiscardPolicy());           

捕捉異常

如之前所說Callable接口的實作,可以擷取到結果和異常。通過傳回的Future的get方法即可拿到。

ExecutorService executorService = Executors.newFixedThreadPool(5);

Future

@Override
    public Object call() throws Exception {
        throw new RuntimeException("exception");// 該異常會在調用Future.get()時傳遞給調用者
    }
});
           

try {

Object result = future.get();

} catch (InterruptedException e) {

} catch (ExecutionException e) {

e.printStackTrace();

}

正确構造線程池的方式

int poolSize = Runtime.getRuntime().availableProcessors() * 2;

BlockingQueue queue = new ArrayBlockingQueue<>(512);

RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();

executorService = new ThreadPoolExecutor(poolSize, poolSize,

0, TimeUnit.SECONDS,
        queue,
        policy);           

擷取單個結果

通過submit送出一個任務後,可以擷取到一個future,調用get方法會阻塞并等待執行結果。get(long timeout, TimeUnit unit)可以指定等待的逾時時間。

擷取多個結果

可以使用循環依次調用,也可以使用ExecutorCompletionService。該類的take方式,會阻塞等待某一任務完成。向CompletionService批量送出任務後,隻需調用相同次數的CompletionService.take()方法,就能擷取所有任務的執行結果,擷取順序是任意的,取決于任務的完成順序。

void solve(Executor executor, Collection> solvers)

throws InterruptedException, ExecutionException {

CompletionService ecs = new ExecutorCompletionService(executor);// 構造器

for (Callable s : solvers)// 送出所有任務

ecs.submit(s);
              

int n = solvers.size();

for (int i = 0; i < n; ++i) {// 擷取每一個完成的任務

Result r = ecs.take().get();
   if (r != null)
       use(r);           

這個類是對線程池的一個包裝,包裝完後,聽過他進行submit和take。

單個任務逾時

Future.get(long timeout, TimeUnit unit)。方法可以指定等待的逾時時間,逾時未完成會抛出TimeoutException。

多個任務逾時

等待多個任務完成,并設定最大等待時間,可以通過CountDownLatch完成:

public void testLatch(ExecutorService executorService, List tasks)

throws InterruptedException{
  
CountDownLatch latch = new CountDownLatch(tasks.size());
  for(Runnable r : tasks){
      executorService.submit(new Runnable() {
          @Override
          public void run() {
              try{
                  r.run();
              }finally {
                  latch.countDown();// countDown
              }
          }
      });
  }
  latch.await(10, TimeUnit.SECONDS); // 指定逾時時間           

await是總的時間,即使100個任務,需要跑20分鐘。我10s逾時了 也停止了。

原文位址

https://www.cnblogs.com/jichi/p/12560235.html