AbstractQueuedSynchronizer 簡介
AbstractQueuedSynchronizer 為實作依賴于先進先出 (FIFO) 等待隊列的阻塞鎖定和相關同步器(信号量、事件,等等)提供一個架構。此類的設計目标是成為依靠單個原子 int 值來表示狀态的大多數同步器的一個有用基礎。子類必須定義更改此狀态的受保護方法,并定義哪種狀态對于此對象意味着被擷取或被釋放。假定這些條件之後,此類中的其他方法就可以實作所有排隊和阻塞機制。但隻是為了獲得同步而隻追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。
AQS 的特征:
- 阻塞等待隊列
- 共享/獨占
- 公平/非公平
- 可重入
- 允許中斷
核心屬性 int sate
- state 表示共享屬性被 volatile 修飾
三個核心方法:
- getState()
- setState()
- compareAndSetState()
兩種資源共享方式:
- Exclusive-獨占,隻有一個線程可以通路,如 ReetrantLock
- Share 共享,多個線程可以同時執行,如:Semaphore/CountDownLatch
AQS 定義兩種隊列
- 同步等待隊列:主要是用于維護擷取互斥失敗時入隊的線程
- 條件等待隊列:調用 await() 的時候會釋放鎖,點燃後線程會加入到套件隊列,調用 signal() 喚醒的時候把條件隊列的節點移動到同步隊列中,等待再次擷取鎖。
AQS 隊列節點中的 5 種狀态
- 值為 0 表示初始化狀态,表示目前節點在 sync 隊列中,等待擷取鎖。
- CANCELLED , 值為1 , 表示目前的線程被取消;
- SIGNAL,值為 -1,表示目前的線程被取消;
- CONDITION,值為 -2,表示目前節點的後繼節點的線程需要運作,也就是 unpark;
- PROPAGAGTE 值為-3,表示目前場景下後續的 acquireShard 能夠繼續執行。
不同的自定義同步器競争共享資源的方式也不同,自定義同步器在實作時自需要實作共享資源 state 的擷取與實作方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊、出隊等),AQS 已經實作好了,自定義同步器時主要實作一下幾個方法(AQS 其實是一個典型的模闆方法模式的運用):
- isHeldExclusively() 該線程是否正在獨占資源。隻有使用到 condition 才需要去實作。
- tryAcquire(int):獨占方式。嘗試擷取資源,成功傳回 true,失敗傳回 false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功傳回 true,失敗傳回 false。
- tryAcquireShared(int): 共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
- tryReleaseShared(int): 共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點傳回 true , 否則傳回 false。
自定義獨占鎖
public class Liu666Lock extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public void unlock() {
release(1);
}
}
測試一下:
輸出結果如下:
多執行幾次我們可以觀察,雖然執行的順序不一定有序,但是我們最終結果是始終 idx = 10
同步等待
AQS 當中的同步等待隊列也稱為 CLH 隊列,CLH隊列是 Craig、Landin、Hagersten 三人發明的一種基于雙向連結清單資料結構的隊列,是 FIFO 先進先出等待隊列,Java 的 CLH 隊列是原自 CLH 隊列的一個變種實作,線程由原自旋機制改為阻塞機制。
AQS 依賴 CLH 同步隊列來完成同步狀态的管理:
- 目前線程如果擷取同步狀态失敗時,AQS 則會将目前線程已經等待狀态資訊構造成一個節點(Node)并将其加入到 CLH 同步隊列,同時會阻塞目前線程
- 當同步狀态釋放時,會把首節點喚醒(公平鎖),使其再次嘗試擷取同步狀态。
- 通過 signal 或者 signalAll 将條件隊列中的節點轉移到同步隊列。(由條件隊列轉換為同步隊列)
條件等待隊列
AQS 中條件隊列是使用單向連結清單儲存的,用 nextWaiter 屬性來連接配接
- 調用 await 方法阻塞線程;
- 目前線程存儲同步隊列頭節點,調用 await 方法進行阻塞(從同步隊列轉換到條件隊列)
Condition 接口
- 調用 Condition#await 方法會釋放目前持有的鎖,然後阻塞目前線程,同時像 Condition 隊列尾部添加一個節點,是以調用 Condition#await 方法的時候必須持有鎖。
- 調用 Condition#signal 方法會将 Condition 隊列的首節點移動到隊列尾部,然後喚醒調用 Condition#awite 方法而阻塞的線程(喚醒之後這個線程就可以去競争鎖了),是以調用 Condition#signal 方法必須持有鎖,持有鎖的線程喚醒被因調用 Condition#await 方法而阻塞的線程。
等待喚醒機制 await/signal 實驗
@Slf4j
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
log.info(Thread.currentThread().getName() + "開始執行任務");
condition.await();
log.info(Thread.currentThread().getName() + "任務執行結束");
} catch (Throwable e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.info(Thread.currentThread().getName() + "開始執行任務");
TimeUnit.SECONDS.sleep(2);
condition.signal();
log.info(Thread.currentThread().getName() + "任務執行結束");
} catch (Throwable e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}
從下面的結果我們可以看到 t1 線程擷取鎖過後,調用
await
方法進入阻塞狀态并且釋放鎖,然後 t2 線程擷取鎖,并且去喚醒 t1 線程繼續執行,這就是一個簡單的條件隊列例子。
輸出結果如下:
ReetrantLock 與 synchroinzed 比較
- synchroinzed 是 JVM 層次的鎖實作,ReetrantLock 是 JDK 層次的鎖實作;
- synchroinzed 的鎖狀态是無法在 Java 代碼中直接判斷的,但是 ReetrantLock 可以通過 ReetrantLock#isLock 判斷;
- synchroinzed 是非公平鎖,ReetrantLock 是可以公平的也可以是非公平的;
- synchroinzed 是不可以被中斷的,而 ReetrantLock#lockInterruptibly 方法是可以中斷鎖的;
- 在發生異常的時候 synchroinzed 會自動釋放鎖,而 ReetrentLock 需要開發者在 finaly 代碼塊中顯示釋放鎖;
- ReetrantLock 擷取鎖的形式有很多中:如立即傳回是否成功的 tryLock(),以及等嗲指定時長的擷取,更加靈活;
- synchroinzed 在特定的情況下已經在等待的線程是後來的線程先獲得鎖(回顧一下 synchroinzed 喚醒政策),而 ReetranLock 對于已經正在等待的線程是先來的先擷取鎖。
ReetrantLcok 特征
reetrantlock 是一種基于 aqs 架構的應用實作。是基于 JDK 的一種線程同步手段,他的功能類似與 synchronized 是一種互斥鎖,相對于 synchronized 具備一下特點:
- 可中斷
- 可設定逾時時間
- 可設定公平鎖
- 支援多個條件變量
- 與 synchronized 一樣,都支援可重入
ReetrantLcok 部分源碼:
ReetrantLock 使用範式
使用方式:
// 1. 建立鎖(預設非公平鎖)
ReentrantLock lock = new ReentrantLock(false);
// 2. 加鎖
lock.lock();
try {
// 3. todo 原子操作
} finally {
// 4. 解鎖
lock.unlock();
}
可重入特征
下面我們來測試一下 ReetrantLock 的幾個特征:
- 可重入,就是說在一個線程内可以多次擷取鎖。下面是一個簡單的例子:
public static void lockReentrant() {
ReentrantLock lock = new ReentrantLock();
lock.lock();
log.info("main 線程擷取鎖 1 次");
lock.lock();
log.info("main 線程擷取鎖 1 次");
lock.unlock();
log.info("main 線程解鎖 1 次");
lock.unlock();
log.info("main 線程解鎖 1 次");
}
輸出結果如下:
可中斷特征
代碼如下:
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.info(Thread.currentThread().getName() + " 啟動。。。");
try {
lock.lockInterruptibly();
log.info(Thread.currentThread().getName() + " 成功擷取鎖。。。");
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
log.info(Thread.currentThread().getName() + " 等待鎖的過程中被中斷。。。");
}
}, "t1");
lock.lock();
try {
t1.start();
log.info(Thread.currentThread().getName() + " 成功擷取鎖。。。");
Thread.sleep(2000);
t1.interrupt();
log.info("t1 執行中斷。。。");
} catch (InterruptedException e) {
e.printStackTrace();
log.info(Thread.currentThread().getName() + " 等待鎖的過程中被中斷。。。");
} finally {
lock.unlock();
}
這個場景主要是模仿,對于線程中斷的場景,然後放棄鎖的擷取,減少鎖的無效競争者。
輸出結果如下:
- 設定擷取鎖的逾時時間,比如我們對于一些互斥操作, 隻能讓一個線程擷取成功,但是允許其他線程在允許的時候内重試,來保證最大的并發執行。
@Slf4j
public class ReentrantLockTest {
public static void main(String[] args) {
lockTimeOut();
}
public static void lockTimeOut() {
// 1. 建立一個 ReentrantLock 執行個體
ReentrantLock lock = new ReentrantLock();
// 2. 建立線程 t1
Thread t1 = new Thread(() -> {
log.debug("t1 線程啟動。。。。");
try {
// t1 嘗試擷取鎖,鎖擷取逾時時間 1s
if (lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("t1 線程等待 1s 後 擷取鎖失敗");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 如果是自己擷取鎖才去解鎖
if (lock.isHeldByCurrentThread()) { lock.unlock(); }
}
}, "t1");
// 3. 主線程擷取鎖
lock.lock();
try {
log.debug("main 線程擷取鎖成功");
// 4. 啟動 t1 線程
t1.start();
// 5. 休眠 2s
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 如果是自己擷取鎖才去解鎖
if (lock.isHeldByCurrentThread()) { lock.unlock(); }
}
}
}
公平與非公平特征
Condition 總結
參考資料
- baike.baidu.com/item/Abstra…