線程
Callable
public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(() -> {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
});
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
FutureTask
多線程執行任務時,有比較耗時操作,但又需要其傳回結果時,可以使用FutureTask
public class FutureTaskDemo {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(() -> {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
// 擷取耗時操作的傳回結果,這裡是堵塞操作
String result = futureTask.get();
log.info("result:{}", result);
}
}
Fork/Join
用于并行執行任務,将大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。使用工作竊取算法,某個線程從其他隊列裡竊取任務來執行
- fork:将大任務切割成若幹個子任務并行執行
- join:合并子任務的執行結果得到大任務的結果
- …
局限性
- 不能執行I/O操作(讀寫資料檔案)
- 不能抛出檢查異常,必須通過必要的代碼來處理他們
- 任務隻能使用fork和join操作來作為同步機制,如果使用其他同步機制,那麼執行任務時,工作線程就不能執行其他任務
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/***
* 繼承RecursiveTask,傳回值為Integer類型
* 覆寫computer方法
*/
@Slf4j
public class ForkJoinTaskDemo extends RecursiveTask<Integer> {
// 門檻值
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskDemo(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int retSum = 0;
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
// 如果任務足夠小就計算任務
for (int i = start; i <= end; i++) {
retSum += i;
}
} else {
// 如果任務大于門檻值,就不斷遞歸分裂成兩個子任務計算
int middle = (start + end) / 2;
ForkJoinTaskDemo leftTask = new ForkJoinTaskDemo(start, middle);
ForkJoinTaskDemo rightTask = new ForkJoinTaskDemo(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待任務執行結束合并其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任務
retSum = leftResult + rightResult;
}
return retSum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
// 生成一個計算任務,計算1+2+3+4
ForkJoinTaskDemo task = new ForkJoinTaskDemo(1, 100);
// 執行一個任務
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
歸并排序
package test.thread.pool.merge;
import java.util.Arrays;
import java.util.Random;
/**
* 歸并排序
* @author yinwenjie
*/
public class Merge1 {
private static int MAX = 10000;
private static int inits[] = new int[MAX];
// 這是為了生成一個數量為MAX的随機整數集合,準備計算資料
// 和算法本身并沒有什麼關系
static {
Random r = new Random();
for(int index = 1 ; index <= MAX ; index++) {
inits[index - 1] = r.nextInt(10000000);
}
}
public static void main(String[] args) {
long beginTime = System.currentTimeMillis();
int results[] = forkits(inits);
long endTime = System.currentTimeMillis();
// 如果參與排序的資料非常龐大,記得把這種列印方式去掉
System.out.println("耗時=" + (endTime - beginTime) + " | " + Arrays.toString(results));
}
// 拆分成較小的元素或者進行足夠小的元素集合的排序
private static int[] forkits(int source[]) {
int sourceLen = source.length;
if(sourceLen > 2) {
int midIndex = sourceLen / 2;
int result1[] = forkits(Arrays.copyOf(source, midIndex));
int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));
// 将兩個有序的數組,合并成一個有序的數組
int mer[] = joinInts(result1 , result2);
return mer;
}
// 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序了
else {
// 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置了
if(sourceLen == 1
|| source[0] <= source[1]) {
return source;
} else {
int targetp[] = new int[sourceLen];
targetp[0] = source[1];
targetp[1] = source[0];
return targetp;
}
}
}
/**
* 這個方法用于合并兩個有序集合
* @param array1
* @param array2
*/
private static int[] joinInts(int array1[] , int array2[]) {
int destInts[] = new int[array1.length + array2.length];
int array1Len = array1.length;
int array2Len = array2.length;
int destLen = destInts.length;
// 隻需要以新的集合destInts的長度為标準,周遊一次即可
for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {
int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];
int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];
// 如果條件成立,說明應該取數組array1中的值
if(value1 < value2) {
array1Index++;
destInts[index] = value1;
}
// 否則取數組array2中的值
else {
array2Index++;
destInts[index] = value2;
}
}
return destInts;
}
}
歸并排序
/**
* 使用Fork/Join架構的歸并排序算法
* @author yinwenjie
*/
public class Merge2 {
private static int MAX = 100000000;
private static int inits[] = new int[MAX];
// 同樣進行随機隊列初始化,這裡就不再贅述了
static {
......
}
public static void main(String[] args) throws Exception {
// 正式開始
long beginTime = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
MyTask task = new MyTask(inits);
ForkJoinTask<int[]> taskResult = pool.submit(task);
try {
taskResult.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
long endTime = System.currentTimeMillis();
System.out.println("耗時=" + (endTime - beginTime));
}
/**
* 單個排序的子任務
* @author yinwenjie
*/
static class MyTask extends RecursiveTask<int[]> {
private int source[];
public MyTask(int source[]) {
this.source = source;
}
/* (non-Javadoc)
* @see java.util.concurrent.RecursiveTask#compute()
*/
@Override
protected int[] compute() {
int sourceLen = source.length;
// 如果條件成立,說明任務中要進行排序的集合還不夠小
if(sourceLen > 2) {
int midIndex = sourceLen / 2;
// 拆分成兩個子任務
MyTask task1 = new MyTask(Arrays.copyOf(source, midIndex));
task1.fork();
MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen));
task2.fork();
// 将兩個有序的數組,合并成一個有序的數組
int result1[] = task1.join();
int result2[] = task2.join();
int mer[] = joinInts(result1 , result2);
return mer;
}
// 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序了
else {
// 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置了
if(sourceLen == 1
|| source[0] <= source[1]) {
return source;
} else {
int targetp[] = new int[sourceLen];
targetp[0] = source[1];
targetp[1] = source[0];
return targetp;
}
}
}
private int[] joinInts(int array1[] , int array2[]) {
// 和上文中出現的代碼一緻
}
}
}
BlockingQueue
堵塞隊列,有兩種情況會堵塞
- 隊列滿時,入隊線程會被堵塞
- 隊列空時,出對線程會被堵塞
操作 | Throws Exception | Special Value | Blocks | Times Out |
添加 | add(o) | offer(o) | put(o) | offer(0,timeout,timeunit) |
移除 | remove(o) | poll() | take() | poll(timeout,timeunit) |
檢查 | element() | peek() |
線程池
ThreadPoolExecutor
Executors
使用場景
想要頻繁的建立和銷毀線程的時候
線程池的概念
線程池就是提前建立若幹個線程,如果有任務需要處理,線程池裡的線程就會處理任務,處理完之後線程并不會被銷毀,而是等待下一個任務。由于建立和銷毀線程都是消耗系統資源的
線程池的優勢
- 降低建立線程和銷毀線程的性能開銷
- 提高響應速度,當有新任務需要執行是不需要等待線程建立就可以立馬執行
- 合理的設定線程池大小(限流)可以避免因為線程數超過硬體資源瓶頸帶來的問題
Api Executors
newFixedThreadPool
該方法傳回一個固定數量的線程池,當有一個任務送出時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行,用途:FixedThreadPool 用于負載比較大的伺服器,為了資源的合理利用,需要限制目前線程數量。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newSingleThreadExecutor
建立一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newCachedThreadPool
根據實際情況調整線程個數,不限制最大線程數,若用空閑的線程則執行任務,若無任務則不建立線程。并且每一個空閑線程會在 60 秒後自動回收
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newScheduledThreadPool
建立一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和周期性執行任務的功能,類似定時器
ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
線程池參數
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免擷取全局鎖(那将會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後 (目前運作的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而 步驟2不需要擷取全局鎖。
線程池中線程總數、運作線程數、空閑線程數、任務隊列等之間的關系
- 當【運作的線程數 < corePoolSize】,則直接建立新線程來處理任務,即使線程池中的其他線程是空閑的
- 當【corePoolSize <= 線程池中線程數 < maximumPoolSize】,則隻有當workQueue滿時,才建立新線程處理
- 當【corePoolSize = maximumPoolSize】,在workQueue沒滿時,那麼請求會放入workQueue,等待空閑線程去除任務處理
- 當【運作的線程數 > maximumPoolSize】,如果workQueue已滿,那麼會根據指定政策來處理送出過來的任務
ThreadPoolExecutor(int corePoolSize, //核心線程數
int maximumPoolSize, //最大線程數
long keepAliveTime, //超出核心線程數量以外的線程空餘存活時間
TimeUnit unit, //存活時間機關
BlockingQueue<Runnable> workQueue, //儲存執行任務的隊列
ThreadFactory threadFactory, //建立新線程使用的工廠
RejectedExecutionHandler handler) //當任務無法執行的時候的處理方式
飽和政策
RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀 态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法 處理新任務時抛出異常。在JDK 1.5中Java線程池架構提供了以下4種政策。
- AbortPolicy:直接抛出異常。
- CallerRunsPolicy:隻用調用者所線上程來運作任務。
- DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。
- DiscardPolicy:不處理,丢棄掉。
當然,也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄日志或持久化存儲不能處理的任務。
任務送出
- execute();//任務送出
- submit(); //帶有傳回值的任務送出
- …
最佳線程數
最佳線程數目 = (線程等待時間+任務執行時間)/任務執行時間 * CPU數目
備注:這個公式也是前輩們分享的,當然之前看了淘寶前台系統優化實踐的文章,和上面的公式很類似,不過在CPU數目那邊,他們更細化了,上面的公式隻是參考。不過不管什麼公式,最終還是在生産環境中運作後,再優化調整。
例如伺服器CPU核數為4核,一個任務線程cpu耗時為20ms,線程等待(網絡IO、磁盤IO)耗時80ms,那最佳線程數目:( 80 + 20 )/20 * 4 = 20。也就是設定20個線程數最佳。
合理地配置線程池
CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,将其拆分成一個CPU密集型任務和一個IO密集型任務,隻要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量 将高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過
Runtime.getRuntime().availableProcessors()
方法獲得目前裝置的CPU個數。
依賴資料庫連接配接池的任務,因為線程送出SQL後需要等待資料庫傳回結果,等待的時間越長,則CPU空閑時間就越長,那麼線程數應該設定得越大,這樣才能更好地利用CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點 兒,比如幾千。有一次,我們系統裡背景任務線程池的隊列和線程池全滿了,不斷抛出抛棄任 務的異常,通過排查發現是資料庫出現了問題,導緻執行SQL變得非常緩慢,因為背景任務線 程池裡的任務全是需要向資料庫查詢和插入資料的,是以導緻線程池裡的工作線程全部阻 塞,任務積壓線上程池裡。如果當時我們設定成無界隊列,那麼線程池的隊列就會越來越多, 有可能會撐滿記憶體,導緻整個系統不可用,而不隻是背景任務出現問題。當然,我們的系統所 有的任務是用單獨的伺服器部署的,我們使用不同規模的線程池完成不同類型的任務,但是 出現這樣問題時也會影響到其他任務。
任務的性質
- CPU密集型任務、
- IO密集型任務和混合型任務
- …
多線程最佳實踐
- 使用本變量地
- 使用不可變類
- 最小化鎖的作用域範圍:S=1/(1-a+a/n)
- 使用線程池,而不是直接new Thread()
- 使用同步也不要使用wait和notify
- 使用BlockingQueue實作生産消費模式
- 使用并發集合而不是加鎖的同步集合
- 使用Semaphore建立有界通路
- 使用同步塊而不是同步方法
- 避免使用靜态變量,否則用final等不可變類
使用案例
package com.insightfullogic.java8.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @description: 線程測試類
* @author: tiger
* @create: 2022-10-07 11:33
*/
public class MutilThread {
// 建立一個線程池,注意要放在外面,不要每次執行代碼就建立一個,具體線程池的使用就不展開了
public static ExecutorService commonThreadPool = new ThreadPoolExecutor(5, 5, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) {
// 開始多線程調用
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 12; i++) {
int finalI = i;
Future<String> future = (Future<String>) commonThreadPool.submit(() -> {
System.out.println(finalI);
});
futures.add(future);
}
// 擷取結果
List<String> list = new ArrayList<>();
try {
for (int i = 0; i < futures.size(); i++) {
list.add(futures.get(i).get());
}
} catch (Exception e) {
// LOGGER.error("出現錯誤:", e);
}
}
}
順序調用
CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA());
CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB());
CompletableFuture.allOf(futureA,futureB) // 等a b 兩個任務都執行完成
C c = doC(futureA.join(), futureB.join());
CompletableFuture<D> futureD = CompletableFuture.supplyAsync(() -> doD(c));
CompletableFuture<E> futureE = CompletableFuture.supplyAsync(() -> doE(c));
CompletableFuture.allOf(futureD,futureE) // 等d e兩個任務都執行完成
return doResult(futureD.join(),futureE.join());
線程池的監控
如果在系統中大量使用線程池,則有必要對線程池進行監控,友善在出現問題時,可以根 據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的 時候可以使用以下屬性。
- taskCount:線程池需要執行的任務數量。
- completedTaskCount:線程池在運作過程中已完成的任務數量,小于或等于taskCount。
- largestPoolSize:線程池裡曾經建立過的最大線程數量。通過這個資料可以知道線程池是否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。
- getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自動銷 毀,是以這個大小隻增不減。
- getActiveCount:擷取活動的線程數。
死鎖
産生死鎖的原因
- 因為系統資源不足
- 程序運作推進的順序不合适
- 資源配置設定不當等
産生死鎖的必要條件
- 互斥條件
- 請求和保持條件
- 不剝奪條件
- 環路等待條件
package com.tiger.deadLock;
import java.util.concurrent.TimeUnit;
/**
* 死鎖
* @author tiger
* @Date 2017年7月27日
*/
public class DeadLock {
public static void main(String[] args) {
Park task = new Park();
// 章魚線程
Thread th1 = new Thread(task,"章魚");
// 光頭線程
Thread th2 = new Thread(task,"光頭");
th1.start();
th2.start();
}
}
class Park implements Runnable{
//兩人共同擁有相同的兩把鎖
String[] locks = {"0","1"};
@Override
public void run() {
String name = Thread.currentThread().getName();
switch( name ){
//光頭用 0 号卡進尖叫地帶"。
case "光頭":尖叫地帶( locks[0] ); break;
//章魚用 1 号卡進海底世界"。
case "章魚":海底世界( locks[1] );break;
}
}
/**
* 光頭:持0号卡進外圍的尖叫地帶,玩一陣子後,想持另外一張卡(1号卡)進恐怖森林,但此時0号卡被占用。
* @param card
*/
public void 尖叫地帶(String card){
String name = Thread.currentThread().getName();
//card 1 先進尖叫地帶
synchronized (card) {
System.out.println(name+" 進到尖叫地帶");
//進去玩耍2秒
try {TimeUnit.SECONDS.sleep(3);}
catch (InterruptedException e) {}
/*在外圍玩耍完後,想進一步進到恐怖森林時,
手頭隻有1号卡可以使用,此時1号卡被其他人(線程)持有還沒有釋放,
是以進不了,隻能在外頭幹等,此時另外一個人也是這種情況,是以造成死鎖。*/
System.out.println(name+" 準備進到恐怖盛林");
synchronized (locks[1]) {
System.out.println(name+" 進到恐怖盛林");
}
}
}
/**
* 章魚:持1号卡進外圍的海底世界,玩一陣子後,想持另外一張卡(0号卡)進東海龍宮,但此時0号卡被占用。
* @param card
*/
public void 海底世界(String card){
String name = Thread.currentThread().getName();
// 持1号卡先進海底世界
synchronized (card) {
System.out.println(name+" 進到海底世界");
// 進去玩耍2秒
try {TimeUnit.SECONDS.sleep(3);}
catch (InterruptedException e) {}
/*在外圍玩耍完後,想進一步進到東海龍宮時,
手頭隻有0号卡可以使用,此時0号卡被其他人(線程)持有還沒有釋放,
是以進不了,隻能在外頭幹等,此時另外一個人也是這種情況,是以造成死鎖。*/
System.out.println(name+" 準備進到東海龍宮");
synchronized (locks[0]) {
System.out.println(name+" 進到東海龍宮");
}
}
}
}