天天看點

Java并發工具合集JUC大爆發

并發工具類

通常我們所說的并發包也就是java.util.concurrent(JUC),集中了Java并發的各種工具類, 合理地使用它們能幫忙我們快速地完成功能 。

1. CountDownLatch

CountDownLatch是一個同步計數器,初始化的時候 傳入需要計數的線程等待數,可以是需要等待執行完成的線程數,或者大于 ,一般稱為發令槍。\

countdownlatch 是一個同步類工具,不涉及鎖定,當count的值為零時目前線程繼續運作,不涉及同步,隻涉及線程通信的時候,使用它較為合适

Java并發工具合集JUC大爆發

1.1 作用

用來協調多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用),是一組線程等待其他的線程完成工作以後在執行,相當于加強版join。

注意:這是一個一次性操作 - 計數無法重置。 如果你需要一個重置的版本計數,考慮使用CyclicBarrier。

1.2 舉例

​ 我們去組團遊玩一樣,總共30個人,來的人要等待還沒有到的人,一直等到第30個人到了,我們才開始出發,在等待過程中,其他人(線程)是等待狀态不做任何事情的,一直等所有人(線程)到齊了(準備完成)才開始執行。

1.3 概念

  • countDownLatch這個類使一個線程等待其他線程各自執行完畢後再執行。
  • 是通過一個計數器來實作的,計數器的初始值是線程的數量。每當一個線程執行完畢後,計數器的值就-1,當計數器的值為0時,表示所有線程都執行完畢,然後在閉鎖上等待的線程就可以恢複工作了。

我們打開CountDownLatch的源代碼分析,我們發現最重要的方法就是一下這兩個方法:

//阻塞目前線程,等待其他線程執行完成,直到計數器計數值減到0。public void await() throws InterruptedException;//阻塞目前線程指定的時間,如果達到時間就放行,等待其他線程執行完成,直到計數器計數值減到0。public boolean await(long timeout, TimeUnit unit) throws InterruptedException//負責計數器的減一。public void countDown():           

1.4 應用場景

1.4.1 多線程壓測

有時我們想同時啟動多個線程,實作最大程度的并行性。

​ 例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,并讓所有線程都在這個鎖上等待,那麼我們可以很輕松地完成測試。我們隻需調用 一次countDown()方法就可以讓所有的等待線程同時恢複執行。

1.4.2 等待其他線程

​ 例如應用程式啟動類要確定在處理使用者請求前,所有N個外部系統已經啟動和運作了,例如處理excel中多個表單,如果一個一個出來很耗IO和性能,我們可以等100或者1000個線程都完成了表單的操作後一下子寫進excel表單中。

Java并發工具合集JUC大爆發

注意:一個線程不一定隻能做countDown一次,也可以countDown多次

1.5 示例

1.5.1 準備完成後執行

在實際項目中可能有些線程需要資源準備完成後才能進行執行,這個時候就可以使用countDownLatch
package chapter02.countdownlatch; import java.util.Random;import java.util.concurrent.*; /** * countdownlatch 示例 */public class CountDownLatchTest {    private static ExecutorService executorService = Executors.newFixedThreadPool(10);    private static Random random = new Random();      public static void execute(CountDownLatch countDownLatch) {        //擷取一個随機數        long sleepTime = random.nextInt(10);        //擷取線程ID        long threadId = Thread.currentThread().getId();        System.out.println("線程ID" + threadId + ",開始執行--countDown");         try {            //睡眠随機秒            Thread.sleep(sleepTime * 1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        //計數器減1        countDownLatch.countDown();        System.out.println("線程ID" + threadId + ",準備任務完成耗時:" + sleepTime + "目前時間" + System.currentTimeMillis());        try {            //線程等待其他任務完成後喚醒            countDownLatch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("線程ID" + threadId + ",開始執行任務,目前時間:" + System.currentTimeMillis());    }     public static void main(String[] args) throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(5);        for (int i = 0; i < 5; i++) {            executorService.submit(() -> {                execute(countDownLatch);            });        }        //線程等待其他任務完成後喚醒        countDownLatch.await();        Thread.sleep(1000);        executorService.shutdown();        System.out.println("全部任務執行完成");    }}           

1.5.2 多線程壓測

在實戰項目中,我們除了使用 jemter 等工具進行壓測外,還可以自己動手使用 CountDownLatch 類編寫壓測代碼。

​ 可以說 jemter 的并發壓測背後也是使用的 CountDownLatch,可見掌握 CountDownLatch 類的使用是有多麼的重要, CountDownLatch是Java多線程同步器的四大金剛之一,CountDownLatch能夠使一個線程等待其他線程完成各自的工作後再執行。

package chapter02.countdownlatch; import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; /** * countDownLatch 壓測 */public class CountDownLatchPressure {     /**     * 壓測業務代碼     */    public void testLoad() {        System.out.println("壓測:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());    }     /**     * 壓測啟動     * 主線程負責壓測線程準備工作     * 壓測線程準備完成後 調用 start.countDown(); 啟動線程執行     * @throws InterruptedException     */    private void latchTest() throws InterruptedException {        //壓測線程數        int testThreads = 300;        final CountDownLatch start = new CountDownLatch(1);        final CountDownLatch end = new CountDownLatch(testThreads);        //建立線程池        ExecutorService exce = Executors.newFixedThreadPool(testThreads);        //準備線程準備        for (int i = 0; i < testThreads; i++) {            //添加到線程池            exce.submit(() -> {                try {                    //啟動後等待 喚醒                    start.await();                    testLoad();                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    //壓測完成                    end.countDown();                }            });         }         //連接配接池線程初始化完成 開始壓測        start.countDown();        //壓測完成後結束        end.await();        //關閉線程池        exce.shutdown();    }     public static void main(String[] args) throws InterruptedException {        CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();        //開始壓測        countDownLatchPressure.latchTest();    }}            

2. CyclicBarrier

Java并發工具合集JUC大爆發

2.1 簡介

CyclicBarrier,是JDK1.5的java.util.concurrent(JUC)并發包中提供的一個并發工具類

C yclicBarrier可以使一定數量的線程反複地在栅欄位置處彙集,當線程到達栅欄位置時将調用await方法,這個方法将阻塞直到所有線程都到達栅欄位置,如果所有線程都到達栅欄位置,那麼栅欄将打開,此時所有的線程都将被釋放,而栅欄将被重置以便下次使用。

2.2 舉例

就像生活中我們會約朋友們到某個餐廳一起吃飯,有些朋友可能會早到,有些朋友可能會晚到,但是這個餐廳規定必須等到所有人到齊之後才會讓我們進去。

​ 這裡的朋友們就是各個線程,餐廳就是CyclicBarrier,感覺和 CountDownLatch是一樣的,但是他們是有差別的,吃完飯之後可以選擇去玩一會,去處理任務,然後等待第二次聚餐,重複循環。

2.3 功能

CyclicBarrier和CountDownLatch是非常類似的,CyclicBarrier核心的概念是在于設定一個等待線程的數量邊界,到達了此邊界之後進行執行。

​ CyclicBarrier類是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點(Common Barrier Point)。

​ CyclicBarrier類是一種同步機制,它能夠對處理一些算法的線程實作同。換句話講,它就是一個所有線程必須等待的一個栅欄,直到所有線程都到達這裡,然後所有線程才可以繼續做其他事情。

​ 通過調用CyclicBarrier對象的await()方法,兩個線程可以實作互相等待,一旦N個線程在等待CyclicBarrier達成,所有線程将被釋放掉去繼續執行。

2.4 構造方法

我們可以看下 CyclicBarrier源碼的構造方法
public CyclicBarrier(int parties)public CyclicBarrier(int parties, Runnable barrierAction)           

2.4.1 參數介紹

  • parties : 是參與線程的個數 , 其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後目前線程被阻塞。
  • barrierAction : 優先執行線程 ,用于線上程到達屏障時,優先執行barrierAction,友善處理更複雜的業務場景,一般用于資料的整理以及彙總,例如excel插入一樣,等所有線程都插入完了,到達了屏障後,barrierAction線程開始進行儲存操作,完成後,接下來由其他線程開始進行插入,然後到達屏障接着就是儲存,不斷循環。
Java并發工具合集JUC大爆發

CyclicBarrier可以用于多線程計算資料,最後合并計算結果的場景。

2.5 重要方法

我們上面介紹了構造方法,下面我們介紹下CyclicBarrier中重要的方法
//阻塞目前線程,等待其他線程執行完成。public int await() throws InterruptedException, BrokenBarrierException//阻塞目前線程指定的時間,如果達到時間就放行,等待其他線程執行完成,public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException           
  • 線程調用 await() 表示自己已經到達栅欄
  • BrokenBarrierException 表示栅欄已經被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者逾時

2.6 基本使用

一個線程組的線程需要等待所有線程完成任務後再繼續執行下一次任務
package chapter02.cyclicbarrier; import java.util.Random;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class CyclicBarrierTest {     private static Random random = new Random();     /**     * 執行任務     *     * @param barrier     */    public static void execute(CyclicBarrier barrier) {        //擷取一個随機數        long sleepTime = random.nextInt(10);        //擷取線程id        long threadId = Thread.currentThread().getId();        try {            //睡眠随機秒            Thread.sleep(sleepTime * 1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("線程ID" + threadId + ",準備任務完成耗時:" + sleepTime + "目前時間" + System.currentTimeMillis());         //線程等待其他任務完成後喚醒        try {            barrier.await();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (BrokenBarrierException e) {            e.printStackTrace();        }        System.out.println("線程ID" + threadId + ",開始執行任務,目前時間:" + System.currentTimeMillis());    }      public static void main(String[] args) {        //初始化線程數量        int threadNum = 5;        //初始化一般的線程        CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任務開始..."));        ExecutorService executor = Executors.newFixedThreadPool(threadNum);        for (int i = 0; i < threadNum; i++) {            executor.submit(() -> {                execute(barrier);            });        }    }}           

2.7 CyclicBarrier 與 CountDownLatch 差別

  • CountDownLatch 是一次性的,CyclicBarrier 是可循環利用的
  • CountDownLatch.await一般阻塞工作線程,所有的進行預備工作的線程執行countDown,而CyclicBarrier通過工作線程調用await進而自行阻塞,直到所有工作線程達到指定屏障,再大家一起往下走。
  • CountDownLatch 參與的線程的職責是不一樣的,有的在倒計時,有的在等待倒計時結束。CyclicBarrier 參與的線程職責是一樣的。
  • 在控制多個線程同時運作上,CountDownLatch可以不限線程數量,而CyclicBarrier是固定線程數。
  • CyclicBarrier還可以提供一個barrierAction,合并多線程計算結果。

3. Semaphore

Java并發工具合集JUC大爆發

3.1 簡介

Semaphore也叫信号量,在JDK1.5被引入,可以用來控制同時通路特定資源的線程數量,通過協調各個線程,以保證合理的使用資源。

Semaphore内部維護了一組虛拟的許可,許可的數量可以通過構造函數的參數指定。

  • 通路特定資源前,必須使用acquire方法獲得許可,如果許可數量為0,該線程則一直阻塞,直到有可用許可。
  • 通路資源後,使用release釋放許可。

​ Semaphore是一種在多線程環境下使用的設施,該設施負責協調各個線程,以保證它們能夠正确、合理的使用公共資源的設施,也是作業系統中用于控制程序同步互斥的量。Semaphore是一種計數信号量,用于管理一組資源,内部是基于AQS的共享模式。它相當于給線程規定一個量進而控制允許活動的線程數。

​ 可以用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源

3.2 舉例

​ 這裡面令牌就像停車位一樣,來了十輛車,停車位隻有三個,隻有三輛車能夠進行,隻有等其他車開走後,其他車才能開進去,和鎖的不一樣的地方是,鎖一次隻能進入一輛車,但是Semaphore允許一次進入很多車,這個令牌是可以調整的,随時可以增減令牌。

3.3 應用場景

Semaphore 是 synchronized 的加強版,作用是控制線程的并發數量。就這一點而言,單純的synchronized 關鍵字是實作不了的。

​ Semaphore可以用于做流量控制,特别是公用資源有限的應用場景,比如資料庫連接配接。假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個線程并發地讀取,但是如果讀到記憶體後,還需要存儲到資料庫中,而資料庫的連接配接數隻有10個,這時我們必須控制隻有10個線程同時擷取資料庫連接配接儲存資料,否則會報錯無法擷取資料庫連接配接。這個時候,就可以使用Semaphore來做流量控制

3.4 工作原理

以一個停車場是運作為例,為了簡單起見,假設停車場隻有三個車位,一開始三個車位都是空的。

​ 這時如果同時來了五輛車,看門人允許其中三輛不受阻礙的進入,然後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。

​ 這時,有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,如果又離開兩輛,則又可以放入兩輛,如此往複。

​ 這個停車系統中,每輛車就好比一個線程,看門人就好比一個信号量,看門人限制了可以活動的線程,假如裡面依然是三個車位,但是看門人改變了規則,要求每次隻能停兩輛車,那麼一開始進入兩輛車,後面得等到有車離開才能有車進入,但是得保證最多停兩輛車。

​ 對于Semaphore類而言,就如同一個看門人,限制了可活動的線程數。

3.5 構造方法

建立具有給定許可數的計數信号量并設定為非公平信号量

檢視Semaphore源碼發現他有這兩個構造方法

public Semaphore(int permits)public Semaphore(int permits, boolean fair)           

3.5.1 參數介紹

  • permits 是設定同時允許通過的線程數
  • fair 等于true時,建立具有給定許可數的計數信号量并設定為公平信号量。

3.6 其他方法

Semaphore類裡面還有一些重要的方法
//從此信号量擷取一個許可前線程将一直阻塞。相當于一輛車占了一個車位public void acquire() throws InterruptedException    //從此信号量擷取給定數目許可,在提供這些許可前一直将線程阻塞。比如n=2,就相當于一輛車占了兩個車位。public void acquire(int permits) throws InterruptedException    //釋放一個許可,将其傳回給信号量。就如同車開走傳回一個車位。    public void release()       //擷取目前可用許可數    public void release(int permits)        //擷取目前可用許可數public int availablePermits()           

3.7 示例代碼

共有5個車位但是有100個線程進行占用,車停幾秒後會離開,釋放車位給其他線程。
package chapter02.semaphore; import java.util.Random;import java.util.concurrent.*; public class SemaphoreTest {    private static ExecutorService executorService = Executors.newCachedThreadPool();     private static Random random = new Random();     //阻塞隊列    private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);      public static void execute(Semaphore semaphore) {        //擷取一個随機數        long sleepTime = random.nextInt(10);        long threadId = Thread.currentThread().getId();        String park = null;        try {            /**             * 擷取許可,首先判斷semaphore内部的數字是否大于0,如果大于0,             * 才能獲得許可,然後将初始值5減去1,線程才會接着去執行;如果沒有             * 獲得許可(原因是因為已經有5個線程獲得到許可,semaphore内部的數字為0),             * 線程會阻塞直到已經獲得到許可的線程,調用release()方法,釋放掉許可,             * 也就是将semaphore内部的數字加1,該線程才有可能獲得許可。             */            semaphore.acquire();            /**             *  對應的線程會到阻塞對,對應車輛去擷取到車位,如果沒有拿到一緻阻塞,             *  直到其他車輛歸還車位。             */            park = parks.take();            System.out.println("線程ID" + threadId + ",開始占用車位:" + park + ",目前剩餘車位" + semaphore.availablePermits());         } catch (InterruptedException e) {            e.printStackTrace();        }        try {            //睡眠随機秒            Thread.sleep(sleepTime * 1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        //歸還車位        parks.offer(park);        System.out.println("線程ID" + threadId + ",開始歸還車位:" + park + ",共占用" + sleepTime + "秒");        //線程釋放掉許可,通俗來将就是将semaphore内部的數字加1        semaphore.release();    }     public static void main(String[] args) {        //初始化線程數量        int threadNum = 100;        parks.offer("車位一");        parks.offer("車位二");        parks.offer("車位三");        parks.offer("車位四");        parks.offer("車位五");          // 初始化5個許可證        Semaphore semaphore = new Semaphore(5);        //可以提前釋放但是車位就會被多個線程同時占用        //semaphore.release(5);        for (int i = 0; i < threadNum; i++) {            executorService.submit(() -> {                execute(semaphore);            });        }    }}           

3.8 注意事項

即使建立信号量的時候,指定了信号量的大小 ,但是在通過 release()操作釋放信号量仍然能釋放超過配置的大小,也就有可能同時執行的線程數量比最開始設定的要大,沒有任何線程擷取信号量的時候,依然能夠釋放并且釋放的有效。

​ 推薦的做法是一個線程先 acquire 然後 release,如果釋放線程和擷取線程不是同一個,那麼最好保證這種對應關系。不要釋放過多的許可證。

4. Fork/Join

Java并發工具合集JUC大爆發

4.1 簡介

java下多線程的開發可以我們自己啟用多線程,線程池,還可以使用forkjoin,forkjoin可以讓我們不去了解諸如Thread,Runnable等相關的知識,隻要遵循forkjoin的開發模式,就可以寫出很好的多線程并發程式

​ Fork/Join架構是Java7提供了的一個用于并行執行任務的架構, 是一個把大任務分割成若幹個小任務,最終彙總每個小任務結果後得到大任務結果的架構。

​ Fork/Join架構是一個實作了ExecutorService接口的多線程處理器。它可以把一個大的任務劃分為若幹個小的任務并發執行,充分利用可用的資源,進而提高應用的執行效率。

Fork/Join架構簡化了并行程式的原因有 :

  • 它簡化了線程的建立,在架構中線程是自動被建立和管理。
  • 它自動使用多個處理器,是以程式可以擴充到使用可用處理器。

4.2 舉例

​ 就像我需要處理一百萬行的excel,普通的處理是一個一個的excel進行處理,但是使用Fork/Join架構後的處理方式呢,加入我們定義100條資料為一個批次,那麼Fork/Join就會拆分這個excel先到中間拆分成各有50萬的資料,然後還比100大就繼續拆分,不斷的細分,最後分到了每一個線程分得到了100條然後才開始執行。

4.3 分而治之

“分而治之” 一直是一個有效的處理大量資料的方法。著名的 MapReduce 也是采取了分而治之的思想。

​ 簡單來說,就是如果你要處理1000個資料,但是你并不具備處理1000個資料的能力,那麼你可以隻處理其中的10個,然後,分階段處理100次,将100次的結果進行合成,那就是最終想要的對原始的1000個資料的處理結果。

​ 同時forkjoin在處理某一類問題時非常的有用,哪一類問題?分而治之的問題。十大計算機經典算法:快速排序、堆排序、歸并排序、二分查找、線性查找、深度優先、廣度優先、Dijkstra、動态規劃、樸素貝葉斯分類,有幾個屬于分而治之?3個,快速排序、歸并排序、二分查找,還有大資料中M/R都是。

4.3.1 分治法的設計思想

​ 将一個難以直接解決的大問題,分割成一些規模較小的相同問題,以便各個擊破,分而治之。

4.3.2 分治政策

​ 對于一個規模為n的問題,若該問題可以容易地解決(比如說規模n較小)則直接解決,否則将其分解為k個規模較小的子問題,這些子問題互相獨立且與原問題形式相同(子問題互相之間有聯系就會變為動态規範算法),遞歸地解這些子問題,然後将各子問題的解合并得到原問題的解。這種算法設計政策叫做分治法。

4.4 Fork-Join原理

​ Fork/Join實作了ExecutorService,是以它的任務也需要放線上程池中執行。它的不同在于它使用了工作竊取算法,空閑的線程可以從滿負荷的線程中竊取任務來幫忙執行。

​ 由于線程池中的每個線程都有一個隊列,而且線程間互不影響,那麼線程每次都從自己的任務隊列的頭部擷取一個任務出來執行。如果某個時候一個線程的任務隊列空了,而其餘的線程任務隊列中還有任務,那麼這個線程就會從其他線程的任務隊列中取一個任務出來幫忙執行。就像偷取了其他人的工作一樣

4.4.1 任務分割和合并

Fork/Join架構的基本思想就是将一個大任務分解(Fork)成一系列子任務,子任務可以繼續往下分解,當多個不同的子任務都執行完成後,可以将它們各自的結果合并(Join)成一個大結果,最終合并成大任務的結果
Java并發工具合集JUC大爆發

我們看下面這個圖

Java并發工具合集JUC大爆發

​ 首先main Task 先fork成 0,1兩個任務 接着,因為還是太大,繼續fork成 0-0,0-1,1-0,1-1 然後進行計算計算完成後進行join操作,0-0,1-1 join到0, 1-0,1-1 join到1 然後 0和1繼續join到mainTask,完成計算任務。

4.4.2 工作密取

即目前線程的Task已經全被執行完畢,則自動取到其他線程的Task池中取出Task繼續執行即如果一個工作線程沒有事情要做,它可以從其他仍然忙碌的線程竊取任務。

​ ForkJoinPool中維護着多個線程(一般為CPU核數)在不斷地執行Task,每個線程除了執行自己職務内的Task之外,還會根據自己工作線程的閑置情況去擷取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU使用率。

Java并發工具合集JUC大爆發

4.5 相關子類

​ 我們已經很清楚 Fork/Join 架構的需求了,那麼我們可以思考一下,如果讓我們來設計一個 Fork/Join 架構,該如何設計?這個思考有助于你了解 Fork/Join 架構的設計。

​ 第一步分割任務。首先我們需要有一個 fork 類來把大任務分割成子任務,有可能子任務還是很大,是以還需要不停的分割,直到分割出的子任務足夠小。

​ 第二步執行任務并合并結果。分割的子任務分别放在雙端隊列裡,然後幾個啟動線程分别從雙端隊列裡擷取任務執行。子任務執行完的結果都統一放在一個隊列裡,啟動一個線程從隊列裡拿資料,然後合并這些資料。

Fork/Join 使用兩個類來完成以上兩件事情:

4.5.1 ForkJoinTask

​ 我們要使用 ForkJoin 架構,必須首先建立一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而隻需要繼承它的子類,Fork/Join 架構提供了以下兩個子類:

4.5.1.1 RecursiveAction

用于沒有傳回結果的任務

4.5.1.2 RecursiveTask

用于有傳回結果的任務。

4.5.2 ForkJoinPool

​ ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到目前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列裡暫時沒有任務時,它會随機從其他工作線程的隊列的尾部擷取一個任務

4.6 Fork/Join使用

​ Task要通過ForkJoinPool來執行,使用submit 或 invoke 送出,兩者的差別是:invoke是同步執行,調用之後需要等待任務完成,才能執行後面的代碼;submit是異步執行,join()和get方法當任務完成的時候傳回計算結果

Java并發工具合集JUC大爆發

​ 在我們自己實作的compute方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看目前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行目前子任務并傳回結果。使用join方法會等待子任務執行完并得到其結果。

4.6.1 任務的送出邏輯

fork/join其實大部分邏輯處理操作都集中在送出任務和處理任務這兩塊,了解任務的送出基本上後面就很容易了解了, fork/join送出任務主要分為兩種:

4.6.1.1 第一次送出到forkJoinPool

//建立初始化任務SubmitTask submitTask = new SubmitTask(start, end);//将初始任務扔進連接配接池中執行forkJoinPool.invoke(submitTask);           

4.6.1.2 任務切分之後的送出

//沒有達到門檻值 計算一個中間值long mid = (start + end) / 2;//拆分 左邊的SubmitTask left = new SubmitTask(start, mid);//拆分右邊的SubmitTask right = new SubmitTask(mid + 1, end);//添加到任務清單invokeAll(left, right);           

4.6.1.3 合并任務

//合并結果并傳回return left.join() + right.join();           

4.6.1.4 代碼案例

package chapter02.forkjoin; import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask; /** * 計算 0-10000 階乘 */public class SubmitTask extends RecursiveTask<Long> {    /**     * 起始值     */    private long start;    /**     * 結束值     */    private long end;    /**     * 門檻值     */    private long threshold = 10L;     public SubmitTask(long start, long end) {        this.start = start;        this.end = end;    }     /**     * 計算邏輯     * 進行任務的拆分 以及 達到門檻值的計算     *     * @return     */    @Override    protected Long compute() {        //校驗是否達到了門檻值        if (isLessThanThreshold()) {            //處理并傳回結果            return handle();        } else {            //沒有達到門檻值 計算一個中間值            long mid = (start + end) / 2;            //拆分 左邊的            SubmitTask left = new SubmitTask(start, mid);            //拆分右邊的            SubmitTask right = new SubmitTask(mid + 1, end);            //添加到任務清單            invokeAll(left, right);            //合并結果并傳回            return left.join() + right.join();        }    }     /**     * 處理的任務     *     * @return     */    public Long handle() {        long sum = 0;        for (long i = start; i <= end; i++) {            sum += i;            try {                Thread.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        return sum;    }     /*是否達到了門檻值*/    private boolean isLessThanThreshold() {        return end - start <= threshold;    }     /**     * forkJoin 方式調用     *     * @param start     * @param end     */    public static void forkJoinInvok(long start, long end) {        long sum = 0;        long currentTime = System.currentTimeMillis();        //建立ForkJoinPool 連接配接池        ForkJoinPool forkJoinPool = new ForkJoinPool();        //建立初始化任務        SubmitTask submitTask = new SubmitTask(start, end);        //将初始任務扔進連接配接池中執行        forkJoinPool.invoke(submitTask);         //forkJoinPool.submit(submitTask);        // System.out.println("異步方式,任務結束才會調用該方法,目前耗時"+(System.currentTimeMillis() - currentTime));        //等待傳回結果        sum = submitTask.join();        //forkjoin調用方式耗時        System.out.println("forkJoin調用:result:" + sum);        System.out.println("forkJoin調用耗時:" + (System.currentTimeMillis() - currentTime));    }     /**     * 普通方式調用     *     * @param start     * @param end     */    public static void normalInvok(long start, long end) {        long sum = 0;        long currentTime = System.currentTimeMillis();        for (long i = start; i <= end; i++) {            sum += i;            try {                Thread.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        //普通調動方式耗時        System.out.println("普通調用:result:" + sum);        System.out.println("普通調用耗時:" + (System.currentTimeMillis() - currentTime));    }     public static void main(String[] args) {        //起始值的大小        long start = 0;        //結束值的大小        long end = 10000;        //forkJoin 調用        forkJoinInvok(start, end);        System.out.println("========================");        //普通調用        normalInvok(start, end);    }}