天天看點

線程池學習筆記

線程

Callable

public class CallableDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        });
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result:{}", result);
    }
}      

FutureTask

多線程執行任務時,有比較耗時操作,但又需要其傳回結果時,可以使用FutureTask

public class FutureTaskDemo {

    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        // 擷取耗時操作的傳回結果,這裡是堵塞操作
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}      

Fork/Join

用于并行執行任務,将大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。使用工作竊取算法,某個線程從其他隊列裡竊取任務來執行

  • fork:将大任務切割成若幹個子任務并行執行
  • join:合并子任務的執行結果得到大任務的結果

局限性

  1. 不能執行I/O操作(讀寫資料檔案)
  2. 不能抛出檢查異常,必須通過必要的代碼來處理他們
  3. 任務隻能使用fork和join操作來作為同步機制,如果使用其他同步機制,那麼執行任務時,工作線程就不能執行其他任務
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/***
 * 繼承RecursiveTask,傳回值為Integer類型
 * 覆寫computer方法
 */
@Slf4j
public class ForkJoinTaskDemo extends RecursiveTask<Integer> {

    // 門檻值
    public static final int threshold = 2;
    private             int start;
    private             int end;

    public ForkJoinTaskDemo(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int retSum = 0;
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            // 如果任務足夠小就計算任務
            for (int i = start; i <= end; i++) {
                retSum += i;
            }
        } else {
            // 如果任務大于門檻值,就不斷遞歸分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskDemo leftTask = new ForkJoinTaskDemo(start, middle);
            ForkJoinTaskDemo rightTask = new ForkJoinTaskDemo(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待任務執行結束合并其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任務
            retSum = leftResult + rightResult;
        }
        return retSum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        // 生成一個計算任務,計算1+2+3+4
        ForkJoinTaskDemo task = new ForkJoinTaskDemo(1, 100);

        // 執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}      

歸并排序

package test.thread.pool.merge;
 
import java.util.Arrays;
import java.util.Random;
 
/**
 * 歸并排序
 * @author yinwenjie
 */
public class Merge1 {
 
    private static int MAX = 10000;
 
    private static int inits[] = new int[MAX];
 
    // 這是為了生成一個數量為MAX的随機整數集合,準備計算資料
    // 和算法本身并沒有什麼關系
    static {
        Random r = new Random();
        for(int index = 1 ; index <= MAX ; index++) {
            inits[index - 1] = r.nextInt(10000000);
        }
    }
 
    public static void main(String[] args) {
        long beginTime = System.currentTimeMillis();
        int results[] = forkits(inits); 
        long endTime = System.currentTimeMillis();
        // 如果參與排序的資料非常龐大,記得把這種列印方式去掉
        System.out.println("耗時=" + (endTime - beginTime) + " | " + Arrays.toString(results));       
    }
 
    // 拆分成較小的元素或者進行足夠小的元素集合的排序
    private static int[] forkits(int source[]) {
        int sourceLen = source.length;
        if(sourceLen > 2) {
            int midIndex = sourceLen / 2;
            int result1[] = forkits(Arrays.copyOf(source, midIndex));
            int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));
            // 将兩個有序的數組,合并成一個有序的數組
            int mer[] = joinInts(result1 , result2);
            return mer;
        } 
        // 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序了
        else {
            // 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置了
            if(sourceLen == 1
                || source[0] <= source[1]) {
                return source;
            } else {
                int targetp[] = new int[sourceLen];
                targetp[0] = source[1];
                targetp[1] = source[0];
                return targetp;
            }
        }
    }
 
    /**
     * 這個方法用于合并兩個有序集合
     * @param array1
     * @param array2
     */
    private static int[] joinInts(int array1[] , int array2[]) {
        int destInts[] = new int[array1.length + array2.length];
        int array1Len = array1.length;
        int array2Len = array2.length;
        int destLen = destInts.length;
 
        // 隻需要以新的集合destInts的長度為标準,周遊一次即可
        for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {
            int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];
            int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];
            // 如果條件成立,說明應該取數組array1中的值
            if(value1 < value2) {
                array1Index++;
                destInts[index] = value1;
            }
            // 否則取數組array2中的值
            else {
                array2Index++;
                destInts[index] = value2;
            }
        }
 
        return destInts;
    }
}      

歸并排序

/**
 * 使用Fork/Join架構的歸并排序算法
 * @author yinwenjie
 */
public class Merge2 {
 
    private static int MAX = 100000000;
 
    private static int inits[] = new int[MAX];
 
    // 同樣進行随機隊列初始化,這裡就不再贅述了
    static {
        ......
    }
 
    public static void main(String[] args) throws Exception {   
        // 正式開始
        long beginTime = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        MyTask task = new MyTask(inits);
        ForkJoinTask<int[]> taskResult = pool.submit(task);
        try {
            taskResult.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace(System.out);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("耗時=" + (endTime - beginTime));      
    }
 
    /**
     * 單個排序的子任務
     * @author yinwenjie
     */
    static class MyTask extends RecursiveTask<int[]> {
 
        private int source[];
 
        public MyTask(int source[]) {
            this.source = source;
        }
 
        /* (non-Javadoc)
         * @see java.util.concurrent.RecursiveTask#compute()
         */
        @Override
        protected int[] compute() {
            int sourceLen = source.length;
            // 如果條件成立,說明任務中要進行排序的集合還不夠小
            if(sourceLen > 2) {
                int midIndex = sourceLen / 2;
                // 拆分成兩個子任務
                MyTask task1 = new MyTask(Arrays.copyOf(source, midIndex));
                task1.fork();
                MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen));
                task2.fork();
                // 将兩個有序的數組,合并成一個有序的數組
                int result1[] = task1.join();
                int result2[] = task2.join();
                int mer[] = joinInts(result1 , result2);
                return mer;
            } 
            // 否則說明集合中隻有一個或者兩個元素,可以進行這兩個元素的比較排序了
            else {
                // 如果條件成立,說明數組中隻有一個元素,或者是數組中的元素都已經排列好位置了
                if(sourceLen == 1
                    || source[0] <= source[1]) {
                    return source;
                } else {
                    int targetp[] = new int[sourceLen];
                    targetp[0] = source[1];
                    targetp[1] = source[0];
                    return targetp;
                }
            }
        }
 
        private int[] joinInts(int array1[] , int array2[]) {
            // 和上文中出現的代碼一緻
        }
    }
}      

BlockingQueue

堵塞隊列,有兩種情況會堵塞

  1. 隊列滿時,入隊線程會被堵塞
  2. 隊列空時,出對線程會被堵塞
操作 Throws Exception Special Value Blocks Times Out
添加 add(o) offer(o) put(o) offer(0,timeout,timeunit)
移除 remove(o) poll() take() poll(timeout,timeunit)
檢查 element() peek()

線程池

ThreadPoolExecutor

Executors      

使用場景

想要頻繁的建立和銷毀線程的時候

線程池的概念

線程池就是提前建立若幹個線程,如果有任務需要處理,線程池裡的線程就會處理任務,處理完之後線程并不會被銷毀,而是等待下一個任務。由于建立和銷毀線程都是消耗系統資源的

線程池的優勢

  • 降低建立線程和銷毀線程的性能開銷
  • 提高響應速度,當有新任務需要執行是不需要等待線程建立就可以立馬執行
  • 合理的設定線程池大小(限流)可以避免因為線程數超過硬體資源瓶頸帶來的問題

Api Executors

newFixedThreadPool

該方法傳回一個固定數量的線程池,當有一個任務送出時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行,用途:FixedThreadPool 用于負載比較大的伺服器,為了資源的合理利用,需要限制目前線程數量。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);

ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>());      

newSingleThreadExecutor

建立一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());      

newCachedThreadPool

根據實際情況調整線程個數,不限制最大線程數,若用空閑的線程則執行任務,若無任務則不建立線程。并且每一個空閑線程會在 60 秒後自動回收

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());      

newScheduledThreadPool

建立一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和周期性執行任務的功能,類似定時器

ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);      

線程池參數

ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免擷取全局鎖(那将會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後 (目前運作的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而 步驟2不需要擷取全局鎖。

線程池中線程總數、運作線程數、空閑線程數、任務隊列等之間的關系

  1. 當【運作的線程數 < corePoolSize】,則直接建立新線程來處理任務,即使線程池中的其他線程是空閑的
  2. 當【corePoolSize <= 線程池中線程數 < maximumPoolSize】,則隻有當workQueue滿時,才建立新線程處理
  3. 當【corePoolSize = maximumPoolSize】,在workQueue沒滿時,那麼請求會放入workQueue,等待空閑線程去除任務處理
  4. 當【運作的線程數 > maximumPoolSize】,如果workQueue已滿,那麼會根據指定政策來處理送出過來的任務
ThreadPoolExecutor(int corePoolSize,                  //核心線程數
                     int maximumPoolSize,               //最大線程數
                     long keepAliveTime,                //超出核心線程數量以外的線程空餘存活時間
                     TimeUnit unit,                     //存活時間機關
                     BlockingQueue<Runnable> workQueue, //儲存執行任務的隊列
              ThreadFactory threadFactory,       //建立新線程使用的工廠
                     RejectedExecutionHandler handler)  //當任務無法執行的時候的處理方式      

飽和政策

RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀 态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法 處理新任務時抛出異常。在JDK 1.5中Java線程池架構提供了以下4種政策。

  1. AbortPolicy:直接抛出異常。
  2. CallerRunsPolicy:隻用調用者所線上程來運作任務。
  3. DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。
  4. DiscardPolicy:不處理,丢棄掉。

當然,也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄日志或持久化存儲不能處理的任務。

任務送出

  1. execute();//任務送出
  2. submit(); //帶有傳回值的任務送出

最佳線程數

最佳線程數目 = (線程等待時間+任務執行時間)/任務執行時間 * CPU數目

備注:這個公式也是前輩們分享的,當然之前看了淘寶前台系統優化實踐的文章,和上面的公式很類似,不過在CPU數目那邊,他們更細化了,上面的公式隻是參考。不過不管什麼公式,最終還是在生産環境中運作後,再優化調整。

例如伺服器CPU核數為4核,一個任務線程cpu耗時為20ms,線程等待(網絡IO、磁盤IO)耗時80ms,那最佳線程數目:( 80 + 20 )/20 * 4 = 20。也就是設定20個線程數最佳。

合理地配置線程池

CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,将其拆分成一個CPU密集型任務和一個IO密集型任務,隻要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量 将高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過 ​

​Runtime.getRuntime().availableProcessors()​

​方法獲得目前裝置的CPU個數。

依賴資料庫連接配接池的任務,因為線程送出SQL後需要等待資料庫傳回結果,等待的時間越長,則CPU空閑時間就越長,那麼線程數應該設定得越大,這樣才能更好地利用CPU。

建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點 兒,比如幾千。有一次,我們系統裡背景任務線程池的隊列和線程池全滿了,不斷抛出抛棄任 務的異常,通過排查發現是資料庫出現了問題,導緻執行SQL變得非常緩慢,因為背景任務線 程池裡的任務全是需要向資料庫查詢和插入資料的,是以導緻線程池裡的工作線程全部阻 塞,任務積壓線上程池裡。如果當時我們設定成無界隊列,那麼線程池的隊列就會越來越多, 有可能會撐滿記憶體,導緻整個系統不可用,而不隻是背景任務出現問題。當然,我們的系統所 有的任務是用單獨的伺服器部署的,我們使用不同規模的線程池完成不同類型的任務,但是 出現這樣問題時也會影響到其他任務。

任務的性質

  1. CPU密集型任務、
  2. IO密集型任務和混合型任務

多線程最佳實踐

  1. 使用本變量地
  2. 使用不可變類
  3. 最小化鎖的作用域範圍:S=1/(1-a+a/n)
  4. 使用線程池,而不是直接new Thread()
  5. 使用同步也不要使用wait和notify
  6. 使用BlockingQueue實作生産消費模式
  7. 使用并發集合而不是加鎖的同步集合
  8. 使用Semaphore建立有界通路
  9. 使用同步塊而不是同步方法
  10. 避免使用靜态變量,否則用final等不可變類

使用案例

package com.insightfullogic.java8.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @description: 線程測試類
 * @author: tiger
 * @create: 2022-10-07 11:33
 */
public class MutilThread {

    // 建立一個線程池,注意要放在外面,不要每次執行代碼就建立一個,具體線程池的使用就不展開了
    public static ExecutorService commonThreadPool = new ThreadPoolExecutor(5, 5, 300L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) {
        // 開始多線程調用
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 12; i++) {
            int finalI = i;
            Future<String> future = (Future<String>) commonThreadPool.submit(() -> {
                System.out.println(finalI);
            });
            futures.add(future);
        }

        // 擷取結果
        List<String> list = new ArrayList<>();
        try {
            for (int i = 0; i < futures.size(); i++) {
                list.add(futures.get(i).get());
            }
        } catch (Exception e) {
//            LOGGER.error("出現錯誤:", e);
        }
    }
}      

順序調用

CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA());
CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB());
CompletableFuture.allOf(futureA,futureB) // 等a b 兩個任務都執行完成

C c = doC(futureA.join(), futureB.join());

CompletableFuture<D> futureD = CompletableFuture.supplyAsync(() -> doD(c));
CompletableFuture<E> futureE = CompletableFuture.supplyAsync(() -> doE(c));
CompletableFuture.allOf(futureD,futureE) // 等d e兩個任務都執行完成

return doResult(futureD.join(),futureE.join());      

線程池的監控

如果在系統中大量使用線程池,則有必要對線程池進行監控,友善在出現問題時,可以根 據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的 時候可以使用以下屬性。

  1. taskCount:線程池需要執行的任務數量。
  2. completedTaskCount:線程池在運作過程中已完成的任務數量,小于或等于taskCount。
  3. largestPoolSize:線程池裡曾經建立過的最大線程數量。通過這個資料可以知道線程池是否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。
  4. getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自動銷 毀,是以這個大小隻增不減。
  5. getActiveCount:擷取活動的線程數。

死鎖

産生死鎖的原因

  1. 因為系統資源不足
  2. 程序運作推進的順序不合适
  3. 資源配置設定不當等

産生死鎖的必要條件

  1. 互斥條件
  2. 請求和保持條件
  3. 不剝奪條件
  4. 環路等待條件
package com.tiger.deadLock;
 
import java.util.concurrent.TimeUnit;
/**
 * 死鎖 
 * @author tiger
 * @Date 2017年7月27日
 */
public class DeadLock {
 
  public static void main(String[] args) {
 
    Park task = new Park();
         // 章魚線程
    Thread th1 = new Thread(task,"章魚");
         // 光頭線程
    Thread th2 = new Thread(task,"光頭");
    th1.start();
    th2.start();
  }
}
class Park implements Runnable{ 
  //兩人共同擁有相同的兩把鎖
  String[] locks = {"0","1"};
  @Override
  public void run() {
    String name = Thread.currentThread().getName();
    switch( name ){
    //光頭用 0 号卡進尖叫地帶"。
    case "光頭":尖叫地帶( locks[0] ); break;
    //章魚用 1 号卡進海底世界"。
    case "章魚":海底世界( locks[1] );break;
    }
  }
  /**
   * 光頭:持0号卡進外圍的尖叫地帶,玩一陣子後,想持另外一張卡(1号卡)進恐怖森林,但此時0号卡被占用。
   * @param card
   */
  public void 尖叫地帶(String card){
    String name = Thread.currentThread().getName();
    //card 1 先進尖叫地帶
    synchronized (card) {
      System.out.println(name+" 進到尖叫地帶");
      //進去玩耍2秒
      try {TimeUnit.SECONDS.sleep(3);} 
      catch (InterruptedException e) {}
      /*在外圍玩耍完後,想進一步進到恐怖森林時,
      手頭隻有1号卡可以使用,此時1号卡被其他人(線程)持有還沒有釋放,
      是以進不了,隻能在外頭幹等,此時另外一個人也是這種情況,是以造成死鎖。*/
      System.out.println(name+" 準備進到恐怖盛林");
      synchronized (locks[1]) {
        System.out.println(name+" 進到恐怖盛林");
      }
    }
  }
  /**
   * 章魚:持1号卡進外圍的海底世界,玩一陣子後,想持另外一張卡(0号卡)進東海龍宮,但此時0号卡被占用。
   * @param card
   */
  public void 海底世界(String card){
    String name = Thread.currentThread().getName();
    // 持1号卡先進海底世界
    synchronized (card) {
      System.out.println(name+" 進到海底世界");
      // 進去玩耍2秒
      try {TimeUnit.SECONDS.sleep(3);} 
      catch (InterruptedException e) {}
      /*在外圍玩耍完後,想進一步進到東海龍宮時,
      手頭隻有0号卡可以使用,此時0号卡被其他人(線程)持有還沒有釋放,
      是以進不了,隻能在外頭幹等,此時另外一個人也是這種情況,是以造成死鎖。*/
      System.out.println(name+" 準備進到東海龍宮");
      synchronized (locks[0]) {
        System.out.println(name+" 進到東海龍宮");
      }
    }
  }
}