一、什麼是線程池?
線程池,其實就是一個容納多個線程的容器,其中的線程可以反複使用,省去了頻繁建立線程對象的操作,無需反複建立線程而消耗過多資源。
二、線程池的優勢
- 第一:降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。
- 第二:提高響應速度。當任務到達時,任務可以不需要的等到線程建立就能立即執行。
- 第三:提高線程的可管理性。線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。但是要做到合理的利用線程池,必須對其原理了如指掌。
三、ThreadPoolExecutor參數認知
- corePoolSize : 線程池的基本大小,當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任務數大于線程池基本大小時就不再建立。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前建立并啟動所有基本線程。
- runnableTaskQueue:任務對列,用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQueue,靜态工廠方法Executors.newCachedThreadPool使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
- maximumPoolSize:線程池最大大小,線程池允許建立的最大線程數。如果隊列滿了,并且已建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什麼效果。
- ThreadFactory:用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字,Debug和定位問題時非常又幫助。
- RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法處理新任務時抛出異常。
- CallerRunsPolicy:隻用調用者所線上程來運作任務。
- DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。
- DiscardPolicy:不處理,丢棄掉。
- 當然也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄日志或持久化不能處理的任務。
- keepAliveTime :線程活動保持時間,線程池的工作線程空閑後,保持存活的時間。是以如果任務很多,并且每個任務執行的時間比較短,可以調大這個時間,提高線程的使用率。
- TimeUnit:線程活動保持時間的機關,可選的機關有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
四、ThreadPoolExecutor使用demo
封裝Executors
package com.gm.thread.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Executors {
/**
* 可控最大并發數線程池: 建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待
* @param name 線程作用
* @param corePoolSize 線程池的基本大小
* @return
*/
public static ExecutorService newFixedThreadPool(String name, int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 可回收緩存線程池: 建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。
* @param name 線程作用
* @param corePoolSize 線程池的基本大小
* @param maximumPoolSize 線程池最大大小
* @return
*/
public static ExecutorService newCachedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 180L, TimeUnit.SECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 有界線程池:此線程池一直增長,直到上限,增長後不收縮(因為池子裡面的線程是永生的)
* @param name 線程作用
* @param corePoolSize 線程池的基本大小
* @param maximumPoolSize 線程池最大大小
* @return
*/
public static ExecutorService newLimitedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2147483647L, TimeUnit.SECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 單線程化線程池:建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行
* @param name 線程作用
* @return
*/
public static ExecutorService newSingleThreadExecutor(String name) {
return java.util.concurrent.Executors.newSingleThreadExecutor(new NamedThreadFactory(name, true));
}
}
封裝自定義線程工廠
package com.gm.thread.demo;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo) {
this.mPrefix = (prefix + "-thread-");
this.mDaemo = daemo;
/*
* SecurityManager(安全管理器)應用場景 當運作未知的Java程式的時候,該程式可能有惡意代碼(删除系統檔案、重新開機系統等),
* 為了防止運作惡意代碼對系統産生影響,需要對運作的代碼的權限進行控制, 這時候就要啟用Java安全管理器。
*/
SecurityManager s = System.getSecurityManager();
this.mGroup = (s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup());
}
public Thread newThread(Runnable runnable) {
String name = this.mPrefix + this.mThreadNum.getAndIncrement();
Thread ret = new Thread(this.mGroup, runnable, name, 0L);
ret.setDaemon(this.mDaemo);
return ret;
}
public ThreadGroup getThreadGroup() {
return this.mGroup;
}
}
封裝自定義RejectedExecutionHandler(飽和政策)
package com.gm.thread.demo;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private static final String ERROR = "Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)!";
public AbortPolicyWithReport(String threadName) {
this.threadName = threadName;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format(
ERROR,
new Object[] { this.threadName,
Integer.valueOf(e.getPoolSize()),
Integer.valueOf(e.getActiveCount()),
Integer.valueOf(e.getCorePoolSize()),
Integer.valueOf(e.getMaximumPoolSize()),
Integer.valueOf(e.getLargestPoolSize()),
Long.valueOf(e.getTaskCount()),
Long.valueOf(e.getCompletedTaskCount()),
Boolean.valueOf(e.isShutdown()),
Boolean.valueOf(e.isTerminated()),
Boolean.valueOf(e.isTerminating())});
logger.warn(msg);
throw new RejectedExecutionException(msg);
}
}
使用demo
package com.gm.thread.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
public class ThreadPoolDemo {
private int corePoolSize = 10;
private int maximumPoolSize = 10;
private ExecutorService exe = null;
private Semaphore available = null;
public void execute(){
if(exe==null){
exe = Executors.newCachedThreadPool("this thread is a demo", corePoolSize,
maximumPoolSize);
}
if(available==null){
available = new Semaphore(maximumPoolSize);
}
/*
* 模拟從資料庫中查詢出的資料,比如發生退款時,對接方沒有異步通知,
* 是以需要從第三方擷取退款狀态
*/
List<Integer> list = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// list.add(i++);
// }
if (list != null) {
for (Integer integer : list) {
//方法acquireUninterruptibly()的作用是使等待進入acquire()方法的線程,不允許被中斷。
available.acquireUninterruptibly();
exe.execute(new Runnable() {
@Override
public void run() {
System.out.println("目前線程"+Thread.currentThread().getName()+"請求第三方接口");
}
});
}
}
}
public static void main(String[] args) {
new ThreadPoolDemo().execute();
}
}
如此,我們一個小小的ThreadPoolExecutor的例子完成了。