天天看點

ThreadPoolExecutor線程池的簡單應用

一、什麼是線程池?

線程池,其實就是一個容納多個線程的容器,其中的線程可以反複使用,省去了頻繁建立線程對象的操作,無需反複建立線程而消耗過多資源。

二、線程池的優勢

  • 第一:降低資源消耗。通過重複利用已建立的線程降低線程建立和銷毀造成的消耗。
  • 第二:提高響應速度。當任務到達時,任務可以不需要的等到線程建立就能立即執行。
  • 第三:提高線程的可管理性。線程是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的配置設定,調優和監控。但是要做到合理的利用線程池,必須對其原理了如指掌。

三、ThreadPoolExecutor參數認知

  1. corePoolSize : 線程池的基本大小,當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任務數大于線程池基本大小時就不再建立。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前建立并啟動所有基本線程。
  2. runnableTaskQueue:任務對列,用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
  • ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
  • LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool()使用了這個隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQueue,靜态工廠方法Executors.newCachedThreadPool使用了這個隊列。
  • PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
  1. maximumPoolSize:線程池最大大小,線程池允許建立的最大線程數。如果隊列滿了,并且已建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什麼效果。
  2. ThreadFactory:用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字,Debug和定位問題時非常又幫助。
  3. RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法處理新任務時抛出異常。
  • CallerRunsPolicy:隻用調用者所線上程來運作任務。
  • DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。
  • DiscardPolicy:不處理,丢棄掉。
  • 當然也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄日志或持久化不能處理的任務。
  1. keepAliveTime :線程活動保持時間,線程池的工作線程空閑後,保持存活的時間。是以如果任務很多,并且每個任務執行的時間比較短,可以調大這個時間,提高線程的使用率。
  2. TimeUnit:線程活動保持時間的機關,可選的機關有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

四、ThreadPoolExecutor使用demo

封裝Executors

package com.gm.thread.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Executors {

	/**
	 * 可控最大并發數線程池: 建立一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待
	 * @param name 線程作用
	 * @param corePoolSize 線程池的基本大小
	 * @return
	 */
	public static ExecutorService newFixedThreadPool(String name, int corePoolSize) {
		return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 可回收緩存線程池: 建立一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則建立線程。
	 * @param name 線程作用 
	 * @param corePoolSize 線程池的基本大小
	 * @param maximumPoolSize 線程池最大大小
	 * @return
	 */
	public static ExecutorService newCachedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
		return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 180L, TimeUnit.SECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 有界線程池:此線程池一直增長,直到上限,增長後不收縮(因為池子裡面的線程是永生的)
	 * @param name 線程作用
	 * @param corePoolSize 線程池的基本大小
	 * @param maximumPoolSize 線程池最大大小
	 * @return
	 */
	public static ExecutorService newLimitedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
		return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2147483647L, TimeUnit.SECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 單線程化線程池:建立一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行
	 * @param name 線程作用
	 * @return
	 */
	public static ExecutorService newSingleThreadExecutor(String name) {
		return java.util.concurrent.Executors.newSingleThreadExecutor(new NamedThreadFactory(name, true));
	}
}
           

封裝自定義線程工廠

package com.gm.thread.demo;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {
	private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
	
	private final AtomicInteger mThreadNum = new AtomicInteger(1);
	private final String mPrefix;
	private final boolean mDaemo;
	private final ThreadGroup mGroup;

	public NamedThreadFactory() {
		this("pool-" + POOL_SEQ.getAndIncrement(), false);
	}

	public NamedThreadFactory(String prefix) {
		this(prefix, false);
	}

	public NamedThreadFactory(String prefix, boolean daemo) {
		this.mPrefix = (prefix + "-thread-");
		this.mDaemo = daemo;
		
		/*
		 * SecurityManager(安全管理器)應用場景 當運作未知的Java程式的時候,該程式可能有惡意代碼(删除系統檔案、重新開機系統等),
		 * 為了防止運作惡意代碼對系統産生影響,需要對運作的代碼的權限進行控制, 這時候就要啟用Java安全管理器。
		 */
		SecurityManager s = System.getSecurityManager();
		this.mGroup = (s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup());
	}

	public Thread newThread(Runnable runnable) {
		String name = this.mPrefix + this.mThreadNum.getAndIncrement();
		Thread ret = new Thread(this.mGroup, runnable, name, 0L);
		ret.setDaemon(this.mDaemo);
		return ret;
	}

	public ThreadGroup getThreadGroup() {
		return this.mGroup;
	}
}
           

封裝自定義RejectedExecutionHandler(飽和政策)

package com.gm.thread.demo;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
	protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
	private final String threadName;
	private static final String ERROR = "Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)!";

	public AbortPolicyWithReport(String threadName) {
		this.threadName = threadName;
	}

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		String msg = String.format(
				ERROR,
				new Object[] { this.threadName, 
						Integer.valueOf(e.getPoolSize()), 
						Integer.valueOf(e.getActiveCount()),
						Integer.valueOf(e.getCorePoolSize()), 
						Integer.valueOf(e.getMaximumPoolSize()),
						Integer.valueOf(e.getLargestPoolSize()), 
						Long.valueOf(e.getTaskCount()),
						Long.valueOf(e.getCompletedTaskCount()), 
						Boolean.valueOf(e.isShutdown()),
						Boolean.valueOf(e.isTerminated()), 
						Boolean.valueOf(e.isTerminating())});

		logger.warn(msg);
		throw new RejectedExecutionException(msg);
	}
}
           

使用demo

package com.gm.thread.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class ThreadPoolDemo {

	private int corePoolSize = 10;
	private int maximumPoolSize = 10;

	private ExecutorService exe = null;

	private Semaphore available = null;
	
	public void execute(){
		
		if(exe==null){
			exe = Executors.newCachedThreadPool("this thread is a demo", corePoolSize,
					maximumPoolSize);
		}
		
		if(available==null){
			available = new Semaphore(maximumPoolSize);
		}
		/*
		 * 模拟從資料庫中查詢出的資料,比如發生退款時,對接方沒有異步通知,
		 * 是以需要從第三方擷取退款狀态
		 */
		List<Integer> list = new ArrayList<>();
//		for (int i = 0; i < 10; i++) {
//			list.add(i++);
//		}
		if (list != null) {
			for (Integer integer : list) {
				//方法acquireUninterruptibly()的作用是使等待進入acquire()方法的線程,不允許被中斷。
				available.acquireUninterruptibly();
				exe.execute(new Runnable() {
					
					@Override
					public void run() {
						System.out.println("目前線程"+Thread.currentThread().getName()+"請求第三方接口");
					}
					
				});
			}
		}
	}
	
	public static void main(String[] args) {
		new ThreadPoolDemo().execute();
	}
}
           

如此,我們一個小小的ThreadPoolExecutor的例子完成了。

繼續閱讀