天天看點

AQS 和 ReetrantLcok 特征和使用介紹

AbstractQueuedSynchronizer 簡介

AbstractQueuedSynchronizer 為實作依賴于先進先出 (FIFO) 等待隊列的阻塞鎖定和相關同步器(信号量、事件,等等)提供一個架構。此類的設計目标是成為依靠單個原子 int 值來表示狀态的大多數同步器的一個有用基礎。子類必須定義更改此狀态的受保護方法,并定義哪種狀态對于此對象意味着被擷取或被釋放。假定這些條件之後,此類中的其他方法就可以實作所有排隊和阻塞機制。但隻是為了獲得同步而隻追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。

AQS 的特征:

  1. 阻塞等待隊列
  2. 共享/獨占
  3. 公平/非公平
  4. 可重入
  5. 允許中斷

核心屬性 int sate

  1. state 表示共享屬性被 volatile 修飾
AQS 和 ReetrantLcok 特征和使用介紹

三個核心方法:

  1. getState()
  2. setState()
  3. compareAndSetState()
AQS 和 ReetrantLcok 特征和使用介紹

兩種資源共享方式:

  • Exclusive-獨占,隻有一個線程可以通路,如 ReetrantLock
  • Share 共享,多個線程可以同時執行,如:Semaphore/CountDownLatch

AQS 定義兩種隊列

  • 同步等待隊列:主要是用于維護擷取互斥失敗時入隊的線程
  • 條件等待隊列:調用 await() 的時候會釋放鎖,點燃後線程會加入到套件隊列,調用 signal() 喚醒的時候把條件隊列的節點移動到同步隊列中,等待再次擷取鎖。

AQS 隊列節點中的 5 種狀态

  1. 值為 0 表示初始化狀态,表示目前節點在 sync 隊列中,等待擷取鎖。
  2. CANCELLED , 值為1 , 表示目前的線程被取消;
  3. SIGNAL,值為 -1,表示目前的線程被取消;
  4. CONDITION,值為 -2,表示目前節點的後繼節點的線程需要運作,也就是 unpark;
  5. PROPAGAGTE 值為-3,表示目前場景下後續的 acquireShard 能夠繼續執行。

不同的自定義同步器競争共享資源的方式也不同,自定義同步器在實作時自需要實作共享資源 state 的擷取與實作方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊、出隊等),AQS 已經實作好了,自定義同步器時主要實作一下幾個方法(AQS 其實是一個典型的模闆方法模式的運用):

  • isHeldExclusively() 該線程是否正在獨占資源。隻有使用到 condition 才需要去實作。
  • tryAcquire(int):獨占方式。嘗試擷取資源,成功傳回 true,失敗傳回 false。
AQS 和 ReetrantLcok 特征和使用介紹
  • tryRelease(int):獨占方式。嘗試釋放資源,成功傳回 true,失敗傳回 false。
  • tryAcquireShared(int): 共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int): 共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點傳回 true , 否則傳回 false。

自定義獨占鎖

public class Liu666Lock extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int arg) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public void unlock() {
        release(1);
    }
}      

測試一下:

AQS 和 ReetrantLcok 特征和使用介紹

輸出結果如下:

AQS 和 ReetrantLcok 特征和使用介紹

多執行幾次我們可以觀察,雖然執行的順序不一定有序,但是我們最終結果是始終 idx = 10

同步等待

AQS 當中的同步等待隊列也稱為 CLH 隊列,CLH隊列是 Craig、Landin、Hagersten 三人發明的一種基于雙向連結清單資料結構的隊列,是 FIFO 先進先出等待隊列,Java 的 CLH 隊列是原自 CLH 隊列的一個變種實作,線程由原自旋機制改為阻塞機制。

AQS 依賴 CLH 同步隊列來完成同步狀态的管理:

  • 目前線程如果擷取同步狀态失敗時,AQS 則會将目前線程已經等待狀态資訊構造成一個節點(Node)并将其加入到 CLH 同步隊列,同時會阻塞目前線程
  • 當同步狀态釋放時,會把首節點喚醒(公平鎖),使其再次嘗試擷取同步狀态。
  • 通過 signal 或者 signalAll 将條件隊列中的節點轉移到同步隊列。(由條件隊列轉換為同步隊列)
AQS 和 ReetrantLcok 特征和使用介紹

條件等待隊列

AQS 中條件隊列是使用單向連結清單儲存的,用 nextWaiter 屬性來連接配接

  • 調用 await 方法阻塞線程;
  • 目前線程存儲同步隊列頭節點,調用 await 方法進行阻塞(從同步隊列轉換到條件隊列)

Condition 接口

AQS 和 ReetrantLcok 特征和使用介紹
  1. 調用 Condition#await 方法會釋放目前持有的鎖,然後阻塞目前線程,同時像 Condition 隊列尾部添加一個節點,是以調用 Condition#await 方法的時候必須持有鎖。
  2. 調用 Condition#signal 方法會将 Condition 隊列的首節點移動到隊列尾部,然後喚醒調用 Condition#awite 方法而阻塞的線程(喚醒之後這個線程就可以去競争鎖了),是以調用 Condition#signal 方法必須持有鎖,持有鎖的線程喚醒被因調用 Condition#await 方法而阻塞的線程。

等待喚醒機制 await/signal 實驗

@Slf4j
public class ConditionTest {

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                log.info(Thread.currentThread().getName() + "開始執行任務");
                condition.await();
                log.info(Thread.currentThread().getName() + "任務執行結束");
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "t1").start();
        new Thread(() -> {
            lock.lock();
            try {
                log.info(Thread.currentThread().getName() + "開始執行任務");
                TimeUnit.SECONDS.sleep(2);
                condition.signal();
                log.info(Thread.currentThread().getName() + "任務執行結束");
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "t2").start();
    }
}      

從下面的結果我們可以看到 t1 線程擷取鎖過後,調用 ​

​await​

​方法進入阻塞狀态并且釋放鎖,然後 t2 線程擷取鎖,并且去喚醒 t1 線程繼續執行,這就是一個簡單的條件隊列例子。

輸出結果如下:

AQS 和 ReetrantLcok 特征和使用介紹

ReetrantLock 與 synchroinzed 比較

  • synchroinzed 是 JVM 層次的鎖實作,ReetrantLock 是 JDK 層次的鎖實作;
  • synchroinzed 的鎖狀态是無法在 Java 代碼中直接判斷的,但是 ReetrantLock 可以通過 ReetrantLock#isLock 判斷;
  • synchroinzed 是非公平鎖,ReetrantLock 是可以公平的也可以是非公平的;
  • synchroinzed 是不可以被中斷的,而 ReetrantLock#lockInterruptibly 方法是可以中斷鎖的;
  • 在發生異常的時候 synchroinzed 會自動釋放鎖,而 ReetrentLock 需要開發者在 finaly 代碼塊中顯示釋放鎖;
  • ReetrantLock 擷取鎖的形式有很多中:如立即傳回是否成功的 tryLock(),以及等嗲指定時長的擷取,更加靈活;
  • synchroinzed 在特定的情況下已經在等待的線程是後來的線程先獲得鎖(回顧一下 synchroinzed 喚醒政策),而 ReetranLock 對于已經正在等待的線程是先來的先擷取鎖。

ReetrantLcok 特征

reetrantlock 是一種基于 aqs 架構的應用實作。是基于 JDK 的一種線程同步手段,他的功能類似與 synchronized 是一種互斥鎖,相對于 synchronized 具備一下特點:

  • 可中斷
  • 可設定逾時時間
  • 可設定公平鎖
  • 支援多個條件變量
  • 與 synchronized 一樣,都支援可重入

ReetrantLcok 部分源碼:

AQS 和 ReetrantLcok 特征和使用介紹

ReetrantLock 使用範式

使用方式:

// 1. 建立鎖(預設非公平鎖)
ReentrantLock lock = new ReentrantLock(false);
// 2. 加鎖
lock.lock();
try {
   // 3. todo 原子操作
} finally {
    // 4. 解鎖
    lock.unlock();
}      

可重入特征

下面我們來測試一下 ReetrantLock 的幾個特征:

  • 可重入,就是說在一個線程内可以多次擷取鎖。下面是一個簡單的例子:
public static void lockReentrant() {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        log.info("main 線程擷取鎖 1 次");
        lock.lock();
        log.info("main 線程擷取鎖 1 次");
        lock.unlock();
        log.info("main 線程解鎖 1 次");
        lock.unlock();
        log.info("main 線程解鎖 1 次");
    }      

輸出結果如下:

AQS 和 ReetrantLcok 特征和使用介紹

可中斷特征

代碼如下:

ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
    log.info(Thread.currentThread().getName() + " 啟動。。。");
    try {
        lock.lockInterruptibly();
        log.info(Thread.currentThread().getName() + " 成功擷取鎖。。。");
        lock.unlock();
    } catch (InterruptedException e) {
        e.printStackTrace();
        log.info(Thread.currentThread().getName() + " 等待鎖的過程中被中斷。。。");
    }
}, "t1");

lock.lock();
try {
    t1.start();

    log.info(Thread.currentThread().getName() + " 成功擷取鎖。。。");
    Thread.sleep(2000);

    t1.interrupt();
    log.info("t1 執行中斷。。。");
} catch (InterruptedException e) {
    e.printStackTrace();
    log.info(Thread.currentThread().getName() + " 等待鎖的過程中被中斷。。。");
} finally {
    lock.unlock();
}      

這個場景主要是模仿,對于線程中斷的場景,然後放棄鎖的擷取,減少鎖的無效競争者。

輸出結果如下:

AQS 和 ReetrantLcok 特征和使用介紹
  • 設定擷取鎖的逾時時間,比如我們對于一些互斥操作, 隻能讓一個線程擷取成功,但是允許其他線程在允許的時候内重試,來保證最大的并發執行。
@Slf4j
public class ReentrantLockTest {


    public static void main(String[] args) {
        lockTimeOut();
    }

    public static void lockTimeOut() {
        // 1. 建立一個 ReentrantLock 執行個體
        ReentrantLock lock = new ReentrantLock();
        // 2. 建立線程 t1
        Thread t1 = new Thread(() -> {
            log.debug("t1 線程啟動。。。。");
            try {
                // t1 嘗試擷取鎖,鎖擷取逾時時間 1s
                if (lock.tryLock(1, TimeUnit.SECONDS)) {
                    log.debug("t1 線程等待 1s 後 擷取鎖失敗");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 如果是自己擷取鎖才去解鎖
                if (lock.isHeldByCurrentThread()) { lock.unlock(); }
            }
        }, "t1");

        // 3. 主線程擷取鎖
        lock.lock();
        try {
            log.debug("main 線程擷取鎖成功");
            // 4. 啟動 t1 線程
            t1.start();
            // 5. 休眠 2s 
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 如果是自己擷取鎖才去解鎖
            if (lock.isHeldByCurrentThread()) { lock.unlock(); }
        }
    }
}      

公平與非公平特征

Condition 總結

參考資料

  1. ​​baike.baidu.com/item/Abstra…​​

繼續閱讀