天天看點

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

先來回顧一下上一篇文章:并發程式設計面試必備:JUC中的Atomic原子類總結

本節面試中常見問題:AQS 原理?;CountDownLatch和CyclicBarrier了解嗎,兩者的差別是什麼?用過Semaphore嗎?

本節思維導圖:

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

1 AQS 簡單介紹

AQS的全稱為(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包下面。

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

enter image description here

AQS是一個用來建構鎖和同步器的架構,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器。

2 AQS 原理

在面試中被問到并發知識的時候,大多都會被問到“請你說一下自己對于AQS原理的了解”。下面給大家一個示例供大家參加,面試不是背題,大家一定要假如自己的思想,即使加入不了自己的思想也要保證自己能夠通俗的講出來而不是背出來。

下面大部分内容其實在AQS類注釋上已經給出了,不過是英語看着比較吃力一點,感興趣的話可以看看源碼。

2.1 AQS 原理概覽

AQS核心思想是,如果被請求的共享資源空閑,則将目前請求資源的線程設定為有效的工作線程,并且将共享資源設定為鎖定狀态。如果被請求的共享資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖配置設定的機制,這個機制AQS是用CLH隊列鎖實作的,即将暫時擷取不到鎖的線程加入到隊列中。

CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列(虛拟的雙向隊列即不存在隊列執行個體,僅存在結點之間的關聯關系)。AQS是将每條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node)來實作鎖的配置設定。

看個AQS(AbstractQueuedSynchronizer)原理圖:

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

enter image description here

AQS使用一個int成員變量來表示同步狀态,通過内置的FIFO隊列來完成擷取資源線程的排隊工作。AQS使用CAS對該同步狀态進行原子操作實作對其值的修改。

private volatile int state;//共享變量,使用volatile修飾保證線程可見性
           

複制

狀态資訊通過procted類型的getState,setState,compareAndSetState進行操作

//傳回同步狀态的目前值
protected final int getState() {  
        return state;
}
 // 設定同步狀态的值
protected final void setState(int newState) { 
        state = newState;
}
//原子地(CAS操作)将同步狀态值設定為給定值update如果目前同步狀态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
           

複制

2.2 AQS 對資源的共享方式

AQS定義兩種資源共享方式

  • Exclusive(獨占):隻有一個線程能執行,如ReentrantLock。又可分為公平鎖和非公平鎖:
    • 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
    • 非公平鎖:當線程要擷取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
  • Share(共享):多個線程可同時執行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我們都會在後面講到。

ReentrantReadWriteLock 可以看成是組合式,因為ReentrantReadWriteLock也就是讀寫鎖允許多個線程同時對某一資源進行讀。

不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源 state 的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在上層已經幫我們實作好了。

2.3 AQS底層使用了模闆方法模式

同步器的設計是基于模闆方法模式的,如果需要自定義同步器一般的方式是這樣(模闆方法模式很經典的一個應用):

  1. 使用者繼承AbstractQueuedSynchronizer并重寫指定的方法。(這些重寫方法很簡單,無非是對于共享資源state的擷取和釋放)
  2. 将AQS組合在自定義同步元件的實作中,并調用其模闆方法,而這些模闆方法會調用使用者重寫的方法。

這和我們以往通過實作接口的方式有很大差別,這是模闆方法模式很經典的一個運用。下面簡單的給大家介紹一下模闆方法模式,模闆方法模式是一個很容易了解的設計模式之一。

模闆方法模式是基于”繼承“的,主要是為了在不改變模闆結構的前提下在子類中重新定義模闆中的内容以實作複用代碼。舉個很簡單的例子假如我們要去一個地方的步驟是:購票

buyTicket()

->安檢

securityCheck()

->乘坐某某工具回家

ride()

->到達目的地

arrive()

。我們可能乘坐不同的交通工具回家比如飛機或者火車,是以除了

ride()

方法,其他方法的實作幾乎相同。我們可以定義一個包含了這些方法的抽象類,然後使用者根據自己的需要繼承該抽象類然後修改

ride()

方法。

AQS使用了模闆方法模式,自定義同步器時需要重寫下面幾個AQS提供的模闆方法:

isHeldExclusively()//該線程是否正在獨占資源。隻有用到condition才需要去實作它。
tryAcquire(int)//獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。
tryRelease(int)//獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
tryAcquireShared(int)//共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。
           

複制

預設情況下,每個方法都抛出

UnsupportedOperationException

。 這些方法的實作必須是内部線程安全的,并且通常應該簡短而不是阻塞。AQS類中的其他方法都是final ,是以無法被其他類使用,隻有這幾個方法可以被其他類使用。

以ReentrantLock為例,state初始化為0,表示未鎖定狀态。A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會擷取該鎖。當然,釋放鎖之前,A線程自己是可以重複擷取此鎖的(state會累加),這就是可重入的概念。但要注意,擷取多少次就要釋放多麼次,這樣才能保證state是能回到零态的。

再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一緻)。這N個子線程是并行執行的,每個子線程執行完後countDown()一次,state會CAS(Compare and Swap)減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數傳回,繼續後餘動作。

一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作

tryAcquire-tryRelease

tryAcquireShared-tryReleaseShared

中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如

ReentrantReadWriteLock

推薦兩篇 AQS 原理和相關源碼分析的文章:

  • http://www.cnblogs.com/waterystone/p/4920797.html
  • https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

3 Semaphore(信号量)-允許多個線程同時通路

synchronized 和 ReentrantLock 都是一次隻允許一個線程通路某個資源,Semaphore(信号量)可以指定多個線程同時通路某個資源。示例代碼如下:

/**
 * 
 * @author Snailclimb
 * @date 2018年9月30日
 * @Description: 需要一次性拿一個許可的情況
 */
public class SemaphoreExample1 {
    // 請求的數量
    private static final int threadCount = 550;

    public static void main(String[] args) throws InterruptedException {
        // 建立一個具有固定線程數量的線程池對象(如果這裡線程池的線程數量給太少的話你會發現執行的很慢)
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        // 一次隻能允許執行的線程數量。
        final Semaphore semaphore = new Semaphore(20);

        for (int i = 0; i < threadCount; i++) {
            final int threadnum = i;
            threadPool.execute(() -> {// Lambda 表達式的運用
                try {
                    semaphore.acquire();// 擷取一個許可,是以可運作線程數量為20/1=20
                    test(threadnum);
                    semaphore.release();// 釋放一個許可
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            });
        }
        threadPool.shutdown();
        System.out.println("finish");
    }

    public static void test(int threadnum) throws InterruptedException {
        Thread.sleep(1000);// 模拟請求的耗時操作
        System.out.println("threadnum:" + threadnum);
        Thread.sleep(1000);// 模拟請求的耗時操作
    }
}
           

複制

執行

acquire

方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個

release

方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實并沒有實際的許可證這個對象,Semaphore隻是維持了一個可獲得許可證的數量。 Semaphore經常用于限制擷取某種資源的線程數量。

當然一次也可以一次拿取和釋放多個許可,不過一般沒有必要這樣做:

semaphore.acquire(5);// 擷取5個許可,是以可運作線程數量為20/5=4
                    test(threadnum);
                    semaphore.release(5);// 擷取5個許可,是以可運作線程數量為20/5=4
           

複制

除了

acquire

方法之外,另一個比較常用的與之對應的方法是

tryAcquire

方法,該方法如果擷取不到許可就立即傳回false。

Semaphore 有兩種模式,公平模式和非公平模式。

  • 公平模式: 調用acquire的順序就是擷取許可證的順序,遵循FIFO;
  • 非公平模式: 搶占式的。

Semaphore 對應的兩個構造方法如下:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
           

複制

這兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。

由于篇幅問題,如果對 Semaphore 源碼感興趣的朋友可以看下面這篇文章:

  • https://blog.csdn.net/qq_19431333/article/details/70212663

4 CountDownLatch (倒計時器)

CountDownLatch是一個同步工具類,用來協調多個線程之間的同步。這個工具通常用來控制線程等待,它可以讓某一個線程等待直到倒計時結束,再開始執行。

4.1 CountDownLatch 的兩種典型用法

①某一線程在開始運作前等待n個線程執行完畢。将 CountDownLatch 的計數器初始化為n :

new CountDownLatch(n)

,每當一個任務線程執行完畢,就将計數器減1

countdownlatch.countDown()

,當計數器的值變為0時,在

CountDownLatch上 await()

的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個元件加載完畢,之後再繼續執行。

②實作多個線程開始執行任務的最大并行性。注意是并行性,不是并發,強調的是多個線程在某一時刻同時開始執行。類似于賽跑,将多個線程放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的

CountDownLatch

對象,将其計數器初始化為 1 :

new CountDownLatch(1)

,多個線程在開始執行任務前首先

coundownlatch.await()

,當主線程調用 countDown() 時,計數器變為0,多個線程同時被喚醒。

4.2 CountDownLatch 的使用示例

/**
 * 
 * @author SnailClimb
 * @date 2018年10月1日
 * @Description: CountDownLatch 使用方法示例
 */
public class CountDownLatchExample1 {
    // 請求的數量
    private static final int threadCount = 550;

    public static void main(String[] args) throws InterruptedException {
        // 建立一個具有固定線程數量的線程池對象(如果這裡線程池的線程數量給太少的話你會發現執行的很慢)
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadnum = i;
            threadPool.execute(() -> {// Lambda 表達式的運用
                try {
                    test(threadnum);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();// 表示一個請求已經被完成
                }

            });
        }
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("finish");
    }

    public static void test(int threadnum) throws InterruptedException {
        Thread.sleep(1000);// 模拟請求的耗時操作
        System.out.println("threadnum:" + threadnum);
        Thread.sleep(1000);// 模拟請求的耗時操作
    }
}
           

複制

上面的代碼中,我們定義了請求的數量為550,當這550個請求被處理完成之後,才會執行

System.out.println("finish");

4.3 CountDownLatch 的不足

CountDownLatch是一次性的,計數器的值隻能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當CountDownLatch使用完畢後,它不能再次被使用。

5 CyclicBarrier(循環栅欄)

CyclicBarrier 和 CountDownLatch 非常類似,它也可以實作線程間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier預設的構造方法是

CyclicBarrier(int parties)

,其參數表示屏障攔截的線程數量,每個線程調用

await

方法告訴 CyclicBarrier 我已經到達了屏障,然後目前線程被阻塞。

5.1 CyclicBarrier 的應用場景

CyclicBarrier 可以用于多線程計算資料,最後合并計算結果的應用場景。比如我們用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多線程處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

5.2 CyclicBarrier 的使用示例

示例1:

/**
 * 
 * @author Snailclimb
 * @date 2018年10月1日
 * @Description: 測試 CyclicBarrier 類中帶參數的 await() 方法
 */
public class CyclicBarrierExample2 {
    // 請求的數量
    private static final int threadCount = 550;
    // 需要同步的線程數量
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        // 建立線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            threadPool.execute(() -> {
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            });
        }
        threadPool.shutdown();
    }

    public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
        System.out.println("threadnum:" + threadnum + "is ready");
        try {
            cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            System.out.println("-----CyclicBarrierException------");
        }
        System.out.println("threadnum:" + threadnum + "is finish");
    }

}
           

複制

運作結果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
           

複制

可以看到當線程數量也就是請求數量達到我們定義的 5 個的時候,

await

方法之後的方法才被執行。

另外,CyclicBarrier還提供一個更進階的構造函數

CyclicBarrier(int parties, Runnable barrierAction)

,用于線上程到達屏障時,優先執行

barrierAction

,友善處理更複雜的業務場景。示例代碼如下:

/**
 * 
 * @author SnailClimb
 * @date 2018年10月1日
 * @Description: 建立 CyclicBarrier 的時候指定一個 Runnable
 */
public class CyclicBarrierExample3 {
    // 請求的數量
    private static final int threadCount = 550;
    // 需要同步的線程數量
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
        System.out.println("------當線程數達到之後,優先執行------");
    });

    public static void main(String[] args) throws InterruptedException {
        // 建立線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            threadPool.execute(() -> {
                try {
                    test(threadNum);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            });
        }
        threadPool.shutdown();
    }

    public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
        System.out.println("threadnum:" + threadnum + "is ready");
        cyclicBarrier.await();
        System.out.println("threadnum:" + threadnum + "is finish");
    }

}
           

複制

運作結果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------當線程數達到之後,優先執行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------當線程數達到之後,優先執行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
           

複制

5.3 CyclicBarrier和CountDownLatch的差別

CountDownLatch是計數器,隻能使用一次,而CyclicBarrier的計數器提供reset功能,可以多次使用。但是我不那麼認為它們之間的差別僅僅就是這麼簡單的一點。我們來從jdk作者設計的目的來看,javadoc是這麼描述它們的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個線程,等待其他多個線程完成某件事情之後才能執行;)

CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個線程互相等待,直到到達同一個同步點,再繼續一起執行。)

對于CountDownLatch來說,重點是“一個線程(多個線程)等待”,而其他的N個線程在完成“某件事情”之後,可以終止,也可以等待。而對于CyclicBarrier,重點是多個線程,在任意一個線程沒有完成,所有的線程都必須等待。

CountDownLatch是計數器,線程完成一個記錄一個,隻不過計數不是遞增而是遞減,而CyclicBarrier更像是一個閥門,需要所有線程都到達,閥門才能打開,然後繼續執行。

并發程式設計面試必備:AQS 原理以及 AQS 同步元件總結

CyclicBarrier和CountDownLatch的差別

CyclicBarrier和CountDownLatch的差別這部分内容參考了如下兩篇文章:

  • https://blog.csdn.net/u010185262/article/details/54692886
  • https://blog.csdn.net/tolcf/article/details/50925145?utm_source=blogxgwz0

6 ReentrantLock 和 ReentrantReadWriteLock

ReentrantLock 和 synchronized 的差別在上面已經講過了這裡就不多做講解。另外,需要注意的是:讀寫鎖 ReentrantReadWriteLock 可以保證多個線程可以同時讀,是以在讀操作遠大于寫操作的時候,讀寫鎖就非常有用了。

由于篇幅問題,關于 ReentrantLock 和 ReentrantReadWriteLock 詳細内容可以檢視我的這篇原創文章。

  • ReentrantLock 和 ReentrantReadWriteLock