天天看點

java aqs詳解_Java AQS詳解

一、概述

談到并發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!

類如其名,抽象的隊列式的同步器,AQS定義了一套多線程通路共享資源的同步器架構,許多同步類實作都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。

以下是本文的目錄大綱:

概述

架構

源碼詳解

簡單應用

若有不正之處,請諒解和批評指正,不勝感激。

二、架構

java aqs詳解_Java AQS詳解

它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程争用資源被阻塞時會進入此隊列)。這裡volatile是核心關鍵詞,具體volatile的語義,在此不述。state的通路方式有三種:

getState()

setState()

compareAndSetState()

AQS定義兩種資源共享方式:Exclusive(獨占,隻有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。

不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:

isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。

tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。

tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。

tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

以ReentrantLock為例,state初始化為0,表示未鎖定狀态。A線程lock()時,會調用tryAcquire()獨占該鎖并将state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會擷取該鎖。當然,釋放鎖之前,A線程自己是可以重複擷取此鎖的(state會累加),這就是可重入的概念。但要注意,擷取多少次就要釋放多麼次,這樣才能保證state是能回到零态的。

再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一緻)。這N個子線程是并行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數傳回,繼續後餘動作。

一般來說,自定義同步器要麼是獨占方法,要麼是共享方式,他們也隻需實作tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實作獨占和共享兩種方式,如ReentrantReadWriteLock。

三、源碼詳解

本節開始講解AQS的源碼實作。依照acquire-release、acquireShared-releaseShared的次序來。

3.1 acquire(int)

此方法是獨占模式下線程擷取共享資源的頂層入口。如果擷取到資源,線程直接傳回,否則進入等待隊列,直到擷取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅隻限于lock()。擷取到資源後,線程就可以去執行其臨界區代碼了。下面是acquire()的源碼:

1 public final void acquire(int arg) {

2 if (!tryAcquire(arg) &&

3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

4 selfInterrupt();

5 }

函數流程如下:

tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;

addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;

acquireQueued()使線程在等待隊列中擷取資源,一直擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。

如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。

這時單憑這4個抽象的函數來看流程還有點朦胧,不要緊,看完接下來的分析後,你就會明白了。就像《大話西遊》裡唐僧說的:等你明白了舍生取義的道理,你自然會回來和我唱這首歌的。

3.1.1 tryAcquire(int)

此方法嘗試去擷取獨占資源。如果擷取成功,則直接傳回true,否則直接傳回false。這也正是tryLock()的語義,還是那句話,當然不僅僅隻限于tryLock()。如下是tryAcquire()的源碼:

1 protected boolean tryAcquire(int arg) {

2 throw new UnsupportedOperationException();

3 }

什麼?直接throw異常?說好的功能呢?好吧,還記得概述裡講的AQS隻是一個架構,具體資源的擷取/釋放方式交由自定義同步器去實作嗎?就是這裡了!!!AQS這裡隻定義了一個接口,具體資源的擷取交由自定義同步器去實作了(通過state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具體的自定義同步器怎麼去設計了!!!當然,自定義同步器在進行資源通路時要考慮線程安全的影響。

這裡之是以沒有定義成abstract,是因為獨占模式下隻用實作tryAcquire-tryRelease,而共享模式下隻用實作tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實作另一模式下的接口。說到底,Doug Lea還是站在咱們開發者的角度,盡量減少不必要的工作量。

3.1.2 addWaiter(Node)

此方法用于将目前線程加入到等待隊列的隊尾,并傳回目前線程所在的結點。還是上源碼吧:

java aqs詳解_Java AQS詳解

1 private Node addWaiter(Node mode) {

2 //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)

3 Node node = new Node(Thread.currentThread(), mode);

4

5 //嘗試快速方式直接放到隊尾。

6 Node pred = tail;

7 if (pred != null) {

8 node.prev = pred;

9 if (compareAndSetTail(pred, node)) {

10 pred.next = node;

11 return node;

12 }

13 }

14

15 //上一步失敗則通過enq入隊。

16 enq(node);

17 return node;

18 }

java aqs詳解_Java AQS詳解

不用再說了,直接看注釋吧。這裡我們說下Node。Node結點是對每一個通路同步代碼的線程的封裝,其包含了需要同步的線程本身以及線程的狀态,如是否被阻塞,是否等待喚醒,是否已經被取消等。變量waitStatus則表示目前被封裝成Node結點的等待狀态,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

CANCELLED:值為1,在同步隊列中等待的線程等待逾時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀态,進入該狀态後的結點将不會再變化。

SIGNAL:值為-1,被辨別為該等待喚醒狀态的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,将會通知該後繼結點的線程執行。說白了,就是處于喚醒狀态,隻要前繼結點釋放鎖,就會通知辨別為SIGNAL狀态的後繼結點的線程執行。

CONDITION:值為-2,與Condition相關,該辨別的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀态的結點将從等待隊列轉移到同步隊列中,等待擷取同步鎖。

PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀态辨別結點的線程處于可運作狀态。

0狀态:值為0,代表初始化狀态。

AQS在判斷狀态時,通過用waitStatus>0表示取消狀态,而waitStatus<0表示有效狀态。

3.1.2.1 enq(Node)

此方法用于将node加入隊尾。源碼如下:

java aqs詳解_Java AQS詳解

1 private Node enq(final Node node) {

2 //CAS"自旋",直到成功加入隊尾

3 for (;;) {

4 Node t = tail;

5 if (t == null) { // 隊列為空,建立一個空的标志結點作為head結點,并将tail也指向它。

6 if (compareAndSetHead(new Node()))

7 tail = head;

8 } else {//正常流程,放入隊尾

9 node.prev = t;

10 if (compareAndSetTail(t, node)) {

11 t.next = node;

12 return t;

13 }

14 }

15 }

16 }

java aqs詳解_Java AQS詳解

如果你看過AtomicInteger.getAndIncrement()函數源碼,那麼相信你一眼便看出這段代碼的精華。CAS自旋volatile變量,是一種很經典的用法。還不太了解的,自己去百度一下吧。

3.1.3 acquireQueued(Node, int)

OK,通過tryAcquire()和addWaiter(),該線程擷取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該幹什麼了吧:進入等待狀态休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿号有點相似~~acquireQueued()就是幹這件事:在等待隊列中排隊拿号(中間沒其它事幹可以休息),直到拿到号後再傳回。這個函數非常關鍵,還是上源碼吧:

java aqs詳解_Java AQS詳解

1 final boolean acquireQueued(final Node node, int arg) {

2 boolean failed = true;//标記是否成功拿到資源

3 try {

4 boolean interrupted = false;//标記等待過程中是否被中斷過

5

6 //又是一個“自旋”!

7 for (;;) {

8 final Node p = node.predecessor();//拿到前驅

9 //如果前驅是head,即該結點已成老二,那麼便有資格去嘗試擷取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。

10 if (p == head && tryAcquire(arg)) {

11 setHead(node);//拿到資源後,将head指向該結點。是以head所指的标杆結點,就是目前擷取到資源的那個結點或null。

12 p.next = null; // setHead中node.prev已置為null,此處再将head.next置為null,就是為了友善GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!

13 failed = false;

14 return interrupted;//傳回等待過程中是否被中斷過

15 }

16

17 //如果自己可以休息了,就進入waiting狀态,直到被unpark()

18 if (shouldParkAfterFailedAcquire(p, node) &&

19 parkAndCheckInterrupt())

20 interrupted = true;//如果等待過程中被中斷過,哪怕隻有那麼一次,就将interrupted标記為true

21 }

22 } finally {

23 if (failed)

24 cancelAcquire(node);

25 }

26 }

java aqs詳解_Java AQS詳解

到這裡了,我們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于檢查狀态,看看自己是否真的可以去休息了(進入waiting狀态,如果線程狀态轉換不熟,可以參考本人上一篇寫的Thread詳解),萬一隊列前邊的線程都放棄了隻是瞎站着,那也說不定,對吧!

java aqs詳解_Java AQS詳解

1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

2 int ws = pred.waitStatus;//拿到前驅的狀态

3 if (ws == Node.SIGNAL)

4 //如果已經告訴前驅拿完号後通知自己一下,那就可以安心休息了

5 return true;

6 if (ws > 0) {

7

11 do {

12 node.prev = pred = pred.prev;

13 } while (pred.waitStatus > 0);

14 pred.next = node;

15 } else {

16 //如果前驅正常,那就把前驅的狀态設定成SIGNAL,告訴它拿完号後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!

17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

18 }

19 return false;

20 }

java aqs詳解_Java AQS詳解

整個流程中,如果前驅結點的狀态不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿号。

3.1.3.2 parkAndCheckInterrupt()

如果線程找好安全休息點後,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀态。

1 private final boolean parkAndCheckInterrupt() {

2 LockSupport.park(this);//調用park()使線程進入waiting狀态

3 return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。

4 }

park()會讓目前線程進入waiting狀态。在此狀态下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。(再說一句,如果線程狀态轉換不熟,可以參考本人寫的Thread詳解)。需要注意的是,Thread.interrupted()會清除目前線程的中斷标記位。

3.1.3.3 小結

OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函數的具體流程:

結點進入隊尾後,檢查狀态,找到安全休息點;

調用park()進入waiting狀态,等待unpark()或interrupt()喚醒自己;

被喚醒後,看自己是不是有資格能拿到号。如果拿到,head指向目前結點,并傳回從入隊到拿到号的整個過程中是否被中斷過;如果沒拿到,繼續流程1。

3.1.4 小結

OKOK,acquireQueued()分析完之後,我們接下來再回到acquire()!再貼上它的源碼吧:

1 public final void acquire(int arg) {

2 if (!tryAcquire(arg) &&

3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

4 selfInterrupt();

5 }

再來總結下它的流程吧:

調用自定義同步器的tryAcquire()嘗試直接去擷取資源,如果成功則直接傳回;

沒成功,則addWaiter()将該線程加入等待隊列的尾部,并标記為獨占模式;

acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試擷取資源。擷取到資源後才傳回。如果在整個等待過程中被中斷過,則傳回true,否則傳回false。

如果線程在等待過程中被中斷過,它是不響應的。隻是擷取資源後才再進行自我中斷selfInterrupt(),将中斷補上。

由于此函數是重中之重,我再用流程圖總結一下:

java aqs詳解_Java AQS詳解

至此,acquire()的流程終于算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個函數就是一條acquire(1)!!!

3.2 release(int)

上一小節已經把acquire()說完了,這一小節就來講講它的反操作release()吧。此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。這也正是unlock()的語義,當然不僅僅隻限于unlock()。下面是release()的源碼:

java aqs詳解_Java AQS詳解

1 public final boolean release(int arg) {

2 if (tryRelease(arg)) {

3 Node h = head;//找到頭結點

4 if (h != null && h.waitStatus != 0)

5 unparkSuccessor(h);//喚醒等待隊列裡的下一個線程

6 return true;

7 }

8 return false;

9 }

java aqs詳解_Java AQS詳解

邏輯并不複雜。它調用tryRelease()來釋放資源。有一點需要注意的是,它是根據tryRelease()的傳回值來判斷該線程是否已經完成釋放掉資源了!是以自定義同步器在設計tryRelease()的時候要明确這一點!!

3.2.1 tryRelease(int)

此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:

1 protected boolean tryRelease(int arg) {

2 throw new UnsupportedOperationException();

3 }

跟tryAcquire()一樣,這個方法是需要獨占模式的自定義同步器去實作的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那麼它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的傳回值,上面已經提到了,release()是根據tryRelease()的傳回值來判斷該線程是否已經完成釋放掉資源了!是以自義定同步器在實作時,如果已經徹底釋放資源(state=0),要傳回true,否則傳回false。

3.2.2 unparkSuccessor(Node)

此方法用于喚醒等待隊列中下一個線程。下面是源碼:

java aqs詳解_Java AQS詳解

1 private void unparkSuccessor(Node node) {

2 //這裡,node一般為目前線程所在的結點。

3 int ws = node.waitStatus;

4 if (ws < 0)//置零目前線程所在的結點狀态,允許失敗。

5 compareAndSetWaitStatus(node, ws, 0);

6

7 Node s = node.next;//找到下一個需要喚醒的結點s

8 if (s == null || s.waitStatus > 0) {//如果為空或已取消

9 s = null;

10 for (Node t = tail; t != null && t != node; t = t.prev)

11 if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。

12 s = t;

13 }

14 if (s != null)

15 LockSupport.unpark(s.thread);//喚醒

16 }

java aqs詳解_Java AQS詳解

這個函數并不複雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這裡我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待隊列中最前邊的那個未放棄線程了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head标杆結點,表示自己已經擷取到資源了,acquire()也傳回了!!And then, DO what you WANT!

3.2.3 小結

release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列裡的其他線程來擷取資源。

3.3 acquireShared(int)

此方法是共享模式下線程擷取共享資源的頂層入口。它會擷取指定量的資源,擷取成功則直接傳回,擷取失敗則進入等待隊列,直到擷取到資源為止,整個過程忽略中斷。下面是acquireShared()的源碼:

1 public final void acquireShared(int arg) {

2 if (tryAcquireShared(arg) < 0)

3 doAcquireShared(arg);

4 }

這裡tryAcquireShared()依然需要自定義同步器去實作。但是AQS已經把其傳回值的語義定義好了:負值代表擷取失敗;0代表擷取成功,但沒有剩餘資源;正數表示擷取成功,還有剩餘資源,其他線程還可以去擷取。是以這裡acquireShared()的流程就是:

tryAcquireShared()嘗試擷取資源,成功則直接傳回;

失敗則通過doAcquireShared()進入等待隊列,直到擷取到資源為止才傳回。

3.3.1 doAcquireShared(int)

此方法用于将目前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才傳回。下面是doAcquireShared()的源碼:

java aqs詳解_Java AQS詳解

1 private void doAcquireShared(int arg) {

2 final Node node = addWaiter(Node.SHARED);//加入隊列尾部

3 boolean failed = true;//是否成功标志

4 try {

5 boolean interrupted = false;//等待過程中是否被中斷過的标志

6 for (;;) {

7 final Node p = node.predecessor();//前驅

8 if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的

9 int r = tryAcquireShared(arg);//嘗試擷取資源

10 if (r >= 0) {//成功

11 setHeadAndPropagate(node, r);//将head指向自己,還有剩餘資源可以再喚醒之後的線程

12 p.next = null; // help GC

13 if (interrupted)//如果等待過程中被打斷過,此時将中斷補上。

14 selfInterrupt();

15 failed = false;

16 return;

17 }

18 }

19

20 //判斷狀态,尋找安全點,進入waiting狀态,等着被unpark()或interrupt()

21 if (shouldParkAfterFailedAcquire(p, node) &&

22 parkAndCheckInterrupt())

23 interrupted = true;

24 }

25 } finally {

26 if (failed)

27 cancelAcquire(node);

28 }

29 }

java aqs詳解_Java AQS詳解

有木有覺得跟acquireQueued()很相似?對,其實流程并沒有太大差別。隻不過這裡将補中斷的selfInterrupt()放到doAcquireShared()裡了,而獨占模式是放到acquireQueued()之外,其實都一樣,不知道Doug Lea是怎麼想的。

跟獨占模式比,還有一點需要注意的是,這裡隻有線程是head.next時(“老二”),才會去嘗試擷取資源,有剩餘的話還會喚醒之後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨占模式,同一時刻隻有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這并不是問題,隻是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了并發)。

3.3.1.1 setHeadAndPropagate(Node, int)

java aqs詳解_Java AQS詳解

1 private void setHeadAndPropagate(Node node, int propagate) {

2 Node h = head;

3 setHead(node);//head指向自己

4 //如果還有剩餘量,繼續喚醒下一個鄰居線程

5 if (propagate > 0 || h == null || h.waitStatus < 0) {

6 Node s = node.next;

7 if (s == null || s.isShared())

8 doReleaseShared();

9 }

10 }

java aqs詳解_Java AQS詳解

此方法在setHead()的基礎上多了一步,就是自己蘇醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!

doReleaseShared()我們留着下一小節的releaseShared()裡來講。

3.3.2 小結

OK,至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:

tryAcquireShared()嘗試擷取資源,成功則直接傳回;

失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功擷取到資源才傳回。整個等待過程也是忽略中斷的。

其實跟acquire()的流程大同小異,隻不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作(這才是共享嘛)。

3.4 releaseShared()

上一小節已經把acquireShared()說完了,這一小節就來講講它的反操作releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列裡的其他線程來擷取資源。下面是releaseShared()的源碼:

java aqs詳解_Java AQS詳解

1 public final boolean releaseShared(int arg) {

2 if (tryReleaseShared(arg)) {//嘗試釋放資源

3 doReleaseShared();//喚醒後繼結點

4 return true;

5 }

6 return false;

7 }

java aqs詳解_Java AQS詳解

此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)後,才會傳回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程并發執行,那麼擁有資源的線程在釋放掉部分資源時就可以喚醒後繼等待結點。例如,資源總量是13,A(5)和B(7)分别擷取到資源并發運作,C(4)來時隻剩1個資源就需要等待。A在運作過程中釋放掉2個資源量,然後tryReleaseShared(2)傳回true喚醒C,C一看隻有3個仍不夠繼續等待;随後B又釋放2個,tryReleaseShared(2)傳回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起運作。而ReentrantReadWriteLock讀鎖的tryReleaseShared()隻有在完全釋放掉資源(state=0)才傳回true,是以自定義同步器可以根據需要決定tryReleaseShared()的傳回值。

3.4.1 doReleaseShared()

此方法主要用于喚醒後繼。下面是它的源碼:

java aqs詳解_Java AQS詳解

1 private void doReleaseShared() {

2 for (;;) {

3 Node h = head;

4 if (h != null && h != tail) {

5 int ws = h.waitStatus;

6 if (ws == Node.SIGNAL) {

7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

8 continue;

9 unparkSuccessor(h);//喚醒後繼

10 }

11 else if (ws == 0 &&

12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

13 continue;

14 }

15 if (h == head)// head發生變化

16 break;

17 }

18 }

java aqs詳解_Java AQS詳解

3.5 小結

本節我們詳解了獨占和共享兩種模式下擷取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認識了。值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支援響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這裡相應的源碼跟acquire()和acquireShared()差不多,這裡就不再詳解了。

四、簡單應用

通過前邊幾個章節的學習,相信大家已經基本了解AQS的原理了。這裡再将“架構”一節中的一段話複制過來:

不同的自定義同步器争用共享資源的方式也不同。自定義同步器在實作時隻需要實作共享資源state的擷取與釋放方式即可,至于具體線程等待隊列的維護(如擷取資源失敗入隊/喚醒出隊等),AQS已經在頂層實作好了。自定義同步器實作時主要實作以下幾種方法:

isHeldExclusively():該線程是否正在獨占資源。隻有用到condition才需要去實作它。

tryAcquire(int):獨占方式。嘗試擷取資源,成功則傳回true,失敗則傳回false。

tryRelease(int):獨占方式。嘗試釋放資源,成功則傳回true,失敗則傳回false。

tryAcquireShared(int):共享方式。嘗試擷取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點傳回true,否則傳回false。

OK,下面我們就以AQS源碼裡的Mutex為例,講一下AQS的簡單應用。

4.1 Mutex(互斥鎖)

Mutex是一個不可重入的互斥鎖實作。鎖資源(AQS裡的state)隻有兩種狀态:0表示未鎖定,1表示鎖定。下邊是Mutex的核心源碼:

java aqs詳解_Java AQS詳解

1 class Mutex implements Lock, java.io.Serializable {

2 // 自定義同步器

3 private static class Sync extends AbstractQueuedSynchronizer {

4 // 判斷是否鎖定狀态

5 protected boolean isHeldExclusively() {

6 return getState() == 1;

7 }

8

9 // 嘗試擷取資源,立即傳回。成功則傳回true,否則false。

10 public boolean tryAcquire(int acquires) {

11 assert acquires == 1; // 這裡限定隻能為1個量

12 if (compareAndSetState(0, 1)) {//state為0才設定為1,不可重入!

13 setExclusiveOwnerThread(Thread.currentThread());//設定為目前線程獨占資源

14 return true;

15 }

16 return false;

17 }

18

19 // 嘗試釋放資源,立即傳回。成功則為true,否則false。

20 protected boolean tryRelease(int releases) {

21 assert releases == 1; // 限定為1個量

22 if (getState() == 0)//既然來釋放,那肯定就是已占有狀态了。隻是為了保險,多層判斷!

23 throw new IllegalMonitorStateException();

24 setExclusiveOwnerThread(null);

25 setState(0);//釋放資源,放棄占有狀态

26 return true;

27 }

28 }

29

30 // 真正同步類的實作都依賴繼承于AQS的自定義同步器!

31 private final Sync sync = new Sync();

32

33 //lockacquire。兩者語義一樣:擷取資源,即便等待,直到成功才傳回。

34 public void lock() {

35 sync.acquire(1);

36 }

37

38 //tryLocktryAcquire。兩者語義一樣:嘗試擷取資源,要求立即傳回。成功則為true,失敗則為false。

39 public boolean tryLock() {

40 return sync.tryAcquire(1);

41 }

42

43 //unlockrelease。兩者國文一樣:釋放資源。

44 public void unlock() {

45 sync.release(1);

46 }

47

48 //鎖是否占有狀态

49 public boolean isLocked() {

50 return sync.isHeldExclusively();

51 }

52 }

java aqs詳解_Java AQS詳解

同步類在實作時一般都将自定義同步器(sync)定義為内部類,供自己使用;而同步類自己(Mutex)則實作某個接口,對外服務。當然,接口的實作要直接依賴sync,它們在語義上也存在某種對應關系!!而sync隻用實作資源state的擷取-釋放方式tryAcquire-tryRelelase,至于線程的排隊、等待、喚醒等,上層的AQS都已經實作好了,我們不用關心。

除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實作方式都差不多,不同的地方就在擷取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!

OK,至此,整個AQS的講解也要落下帷幕了。希望本文能夠對學習Java并發程式設計的同學有所借鑒,中間寫的有不對的地方,也歡迎讨論和指正~