天天看點

Java線程池 ThreadPoolExecutor 深入解析 任務列隊,拒絕政策,自定義線程池工廠,線程池擴充,線程核心最大列隊數值大小定

目錄

​​相關文章​​

​​介紹     ​​

​​ThreadPoolExecutor 構造方法​​

​​構造函數的參數含義如下​​

​​各項介紹​​

​​workQueue任務隊列​​

​​1.直接送出隊列設定為SynchronousQueue隊列​​

​​2.有界的任務隊列ArrayBlockingQueue實作​​

​​3.無界的任務隊列 使用LinkedBlockingQueue實作​​

​​4.優先任務隊列:優先任務隊列通過PriorityBlockingQueue實作​​

​​拒絕政策​​

​​自己擴充RejectedExecutionHandler接口,定義自己的拒絕政策,​​

​​ThreadFactory自定義線程建立​​

​​ThreadPoolExecutor擴充​​

​​線程池線程數量​​

​​任務類型 cpu 密集型 io密集型   ​​

​​CPU 密集型任務​​

​​IO 密集型任務​​

​​相關文章​​

相關文章

上篇:java 線程鎖 可重入鎖 可中斷鎖 公平鎖 非公平鎖ReentrantLock synchronized,條件Condition,讀寫鎖 ReentrantReadWriteLock寫寫互斥讀讀共享

關聯上篇:java 多線程 實作 無傳回值 有傳回值 Runnable Thread Callable<T> Future<String> FutureTask<String> 線程

介紹     

jdk 中提供了 Executor 可以設定幾種線程池:newFixedThreadPool()newSingleThreadExecutor()、newCachedThreadPool()等 比較比較局限了,是以這裡介紹下 ThreadPoolExecutor

ThreadPoolExecutor 構造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }      

構造函數的參數含義如下

  • corePoolSize:指定了線程池中的線程數量,它的數量決定了添加的任務是開辟新的線程去執行,還是放到workQueue任務隊列中去;
  • maximumPoolSize:指定了線程池中的最大線程數量,這個參數會根據你使用的workQueue任務隊列的類型,決定線程池會開辟的最大線程數量;
  • keepAliveTime:當線程池中空閑線程數量超過corePoolSize時,多餘的線程會在多長時間内被銷毀;
  • unit:keepAliveTime的機關
  • workQueue:任務隊列,被添加到線程池中,但尚未被執行的任務;它一般分為直接送出隊列、有界任務隊列、無界任務隊列、優先任務隊列幾種;
  • threadFactory:線程工廠,用于建立線程,一般用預設即可;
  • handler:拒絕政策;當任務太多來不及處理時,如何拒絕任務;

各項介紹

workQueue任務隊列

1.直接送出隊列設定為SynchronousQueue隊列

它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個删除操作才會被喚醒,反之每一個删除操作也都要等待對應的插入操作。

private static ExecutorService pool;      
//maximumPoolSize設定為2 ,拒絕政策為AbortPolic政策,直接抛出異常
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());      

2.有界的任務隊列ArrayBlockingQueue實作

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());      

有任務執行時候,線程池建立新線程,知道核心線程池大小,剩下的加入到等待列隊,等待列隊滿了,開始增加線程,直到最大線程池數

3.無界的任務隊列 使用LinkedBlockingQueue實作

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());      

當中最大線程數是無效的直到資源耗盡

4.優先任務隊列:優先任務隊列通過PriorityBlockingQueue實作

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());      

可以自定義優先級

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //優先任務隊列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //目前對象和其他對象做比較,目前優先級大就傳回-1,優先級小就傳回1,值越小優先級越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //讓線程阻塞,使後續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}      

日志

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1      

拒絕政策

一般我們建立線程池時,為防止資源被耗盡,任務隊列都會選擇建立有界任務隊列,但種模式下如果出現任務隊列已滿且線程池建立的線程數達到你設定的最大線程數時,這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler參數即合理的拒絕政策,來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕政策如下:

  • 1、AbortPolicy政策:該政策會直接抛出異常,阻止系統正常工作;
  • 2、CallerRunsPolicy政策:如果線程池的線程數量達到上限,該政策會把任務隊列中的任務放在調用者線程當中運作;
  • 3、DiscardOledestPolicy政策:該政策會丢棄任務隊列中最老的一個任務,也就是目前任務隊列中最先被添加進去的,馬上要被執行的那個任務,并嘗試再次送出;
  • 4、DiscardPolicy政策:該政策會默默丢棄無法處理的任務,不予任何處理。當然使用此政策,業務場景中需允許任務的丢失;

自己擴充RejectedExecutionHandler接口,定義自己的拒絕政策,

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義拒絕政策
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"執行了拒絕政策");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //讓線程阻塞,使後續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}
      
  • 日志
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@64616ca2執行了拒絕政策
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@13fee20c執行了拒絕政策
supply.chain.bp.upload.service.sliceupload.init.ThreadTask@4e04a765執行了拒絕政策
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2      

上邊 設定的時間延遲 是以多了後執行了拒絕政策 ,我們自己定義的拒絕政策

ThreadFactory自定義線程建立

線程産生是通過線程工廠實作的,我們這裡自定義線程工廠ThreadFactory,可以按需要對線程池中建立的線程進行一些特殊的設定,如命名、優先級等,

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義線程工廠
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"建立");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //輸出執行線程的名稱
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}      

日志

線程234698513建立
線程1595953398建立
ThreadName:threadPool234698513
線程998351292建立
ThreadName:threadPool234698513
ThreadName:threadPool1595953398
ThreadName:threadPool234698513
ThreadName:threadPool1595953398
ThreadName:threadPool998351292
ThreadName:threadPool234698513
ThreadName:threadPool998351292
ThreadName:threadPool1595953398
ThreadName:threadPool234698513      

線程池中,每個線程的建立我們都進行了記錄輸出與命名。

ThreadPoolExecutor擴充

ThreadPoolExecutor擴充主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口實作的,

  • 1、beforeExecute:線程池中任務運作前執行
  • 2、afterExecute:線程池中任務運作完畢後執行
  • 3、terminated:線程池退出後執行

通過這三個接口我們可以監控每個任務的開始和結束時間,或者其他一些功能。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //實作自定義接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"建立");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //輸出執行線程的名稱
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}      

日志

線程1121172875建立
線程998351292建立
線程1684106402建立
準備執行:Task1
TaskNameTask1---ThreadName:threadPool998351292
執行完畢:Task1
準備執行:Task0
線程335471116建立
TaskNameTask0---ThreadName:threadPool1121172875
準備執行:Task2
執行完畢:Task0
TaskNameTask2---ThreadName:threadPool998351292
準備執行:Task3
TaskNameTask3---ThreadName:threadPool1121172875
執行完畢:Task2
執行完畢:Task3
準備執行:Task7
準備執行:Task5
TaskNameTask5---ThreadName:threadPool1121172875
執行完畢:Task5
準備執行:Task4
TaskNameTask4---ThreadName:threadPool998351292
執行完畢:Task4
準備執行:Task6
TaskNameTask6---ThreadName:threadPool1121172875
準備執行:Task8
TaskNameTask8---ThreadName:threadPool335471116
TaskNameTask7---ThreadName:threadPool1684106402
執行完畢:Task8
執行完畢:Task6
準備執行:Task9
TaskNameTask9---ThreadName:threadPool998351292
執行完畢:Task9
執行完畢:Task7
線程池退出

程序已結束,退出代碼為 0      

線程池線程數量

任務類型 cpu 密集型 io密集型   

    CPU 密集型任務:加解密,壓縮、計算等一系列需要大量耗費 CPU 資源的任務,大部分場景下都是純 CPU 計算。     

    IO 密集型任務:比如像 MySQL 資料庫、檔案的讀寫、網絡通信等任務,這類任務不會特别消耗 CPU 資源,但是 IO 操作比較耗時,會占用比較多時間。     

CPU 密集型任務

對于 CPU 密集型計算,多線程本質上是提升多核 CPU 的使用率,是以對于一個 8 核的 CPU,每個核一個線程,理論上建立 8 個線程就可以了。

如果設定過多的線程數,實際上并不會起到很好的效果。此時假設我們設定的線程數量是 CPU 核心數的 2 倍,因為計算任務非常重,會占用大量的 CPU 資源,是以這時 CPU 的每個核心工作基本都是滿負荷的,

而我們又設定了過多的線程,每個線程都想去利用 CPU 資源來執行自己的任務,這就會造成不必要的上下文切換,此時線程數的增多并沒有讓性能提升,反而由于線程數量過多會導緻性能下降。

是以,對于 CPU 密集型的計算場景,理論上線程的數量 = CPU 核數就是最合适的,不過通常把線程的數量設定為CPU 核數 +1,會實作最優的使用率。

即使當密集型的線程由于偶爾的記憶體頁失效或其他原因導緻阻塞時,這個額外的線程也能確定 CPU 的時鐘周期不會被浪費,進而保證 CPU 的使用率。         

IO 密集型任務

對于 IO 密集型任務最大線程數一般會大于 CPU 核心數很多倍,因為 IO 讀寫速度相比于 CPU 的速度而言是比較慢的,如果我們設定過少的線程數,就可能導緻 CPU 資源的浪費。而如果我們設定更多的線程數,那麼當一部分線程正在等待 IO 的時候,它們此時并不需要 CPU 來計算,那麼另外的線程便可以利用 CPU 去執行其他的任務,互不影響,這樣的話在任務隊列中等待的任務就會減少,可以更好地利用資源。         

計算方法:         

線程數 = CPU 核心數 * (1 + IO 耗時/ CPU 耗時)       

通過這個公式,我們可以計算出一個合理的線程數量,如果任務的平均等待時間長,線程數就随之增加,而如果平均工作時間長,也就是對于我們上面的 CPU 密集型任務,線程數就随之減少。

可以采用 APM 工具統計到每個方法的耗時,便于計算 IO 耗時和 CPU 耗時。         

相關文章

上篇:java 線程鎖 可重入鎖 可中斷鎖 公平鎖 非公平鎖ReentrantLock synchronized,條件Condition,讀寫鎖 ReentrantReadWriteLock寫寫互斥讀讀共享

關聯上篇:java 多線程 實作 無傳回值 有傳回值 Runnable Thread Callable<T> Future<String> FutureTask<String> 線程