一 AQS
1 AbstractQueuedSynchronizer
AbstractQueuedSynchronizer簡稱AQS,是一個用于建構鎖和相關同步器的架構,它依賴于FIFO的等待隊列實作。見AbstractQueuedSynchronizer的描述:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
事實上concurrent包内許多類都是基于AQS建構,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實作同步容器時設計的大量細節問題。
AQS中維護了一個FIFO隊列,用于記錄等待的線程,上鎖和釋放鎖的過程可以認為是線程入隊列和出隊列的過程;擷取不到鎖,入隊列等待;出隊列時被喚醒。
AQS中有一個表示狀态的字段state,ReentrantLock用它表示線程重入鎖的次數,Semaphore用它表示剩餘的許可數量,FutureTask用它表示任務的狀态。對state變量值的更新都采用CAS操作保證更新操作的原子性。
AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個類隻有一個變量:exclusiveOwnerThread,表示目前占用該鎖的線程,并且提供了相應的get,set方法。AQS使用此字段記錄目前占有鎖的線程。
2 隊列
1) 隊列
AQS使用一個FIFO的等待隊清單示排隊等待鎖的線程,隊列結構如圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL4AzN1UjZ4kTYlJTNzUjMzkDN4ITOxIGZiljM1MzYiNzYxImY1ATZk9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
隊列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何線程關聯,其他的節點與一個等待線程關聯。
每次一個線程釋放鎖以後,從Head開始向後尋找,找到一個waitStatus是SIGNAL的節點,然後通過LockSupport.unpark喚醒被挂起的線程,被挂起的線程繼續執行嘗試上鎖邏輯。新的線程嘗試擷取鎖時,如果擷取不到鎖,将會建立一個Node并加入到隊尾,然後将自己挂起,等待挂起時間到或者等待被prev對應的節點喚醒。
說明:代碼的中是從tail向head周遊查找waitStatus=SIGNAL的節點,但結果與這裡說的是一樣的,即最終也是找Head後的第一個waitStatus=SIGNAL的節點。見unparkSuccessor方法
2) Node的主要屬性
類型
屬性名
描述
int
waitStatus
等待狀态
Node
prev
指向隊列中前一個節點
next
指向隊列中後一個節點
Thread
thread
Node的持有線程
nextWaiter
下一個等待condition的Node
3) Node的waitStatus取值
狀态
狀态值
CANCELLED
1
取消狀态,例如因為等待鎖逾時而取消的線程;處于這種狀态的Node會被踢出隊列,被GC回收
SIGNAL
-1
表示這個Node的繼任Node被阻塞了,到時需要通知它
CONDITION
-2
表示這個Node在條件隊列中,因為等待某個條件而被阻塞
PROPAGATE
-3
使用在共享模式頭Node有可能處于這種狀态, 表示鎖的下一次擷取可以無條件傳播
其他
初始狀态
4) 隊列示例
接下來我們以一個簡單的示例描述ReentrantLock的等待鎖、釋放鎖的原理。為了友善描述,以下示例中節點和線程一一對應,例如Node1與Thread1對應,依次類推。
假設初始狀态,如下圖所示:
Thread3申請鎖時,因為無法擷取鎖,是以建立一個Node,然後加入到等待隊列中;如下圖所示:
假設3s中到了以後,Thread1被喚醒(挂起的時候指定了時間,是以等待時間到後會被喚醒),此時會将自己的狀态改成CANCELLED,表示等待被從隊列中移除;如下圖所示:
如果此時有新線程申請鎖,那麼在入隊列過程中會順便将處于CANCLE狀态的節點移除。如下圖所示:
Thread0執行完畢後,釋放線程,将喚醒Head後面第一個狀态為等待SIGNAL的節點對應的線程。
注意:新Node入隊列時會檢查并删除被CANCELLED的節點;其實Node1在等待時間到後被喚醒後,在将自己狀态改為CANCELLED時,如果發現自己是最後一個節點也會将Node1删除。
二 流程
這一部分簡要的介紹加鎖、釋放鎖的流程,讓大家對ReentrantLock有一個整體的概念,後面将通過源碼詳細分析實作細節。
1 NonfairSync#lock
非公平鎖上鎖主要流程如下所示:
這裡僅給出流程圖,後面會通過代碼詳細講述這個流程。
2 FairSync#lock
如流程圖中所示,主要流程基本和非公平鎖一緻,有兩個差别:
NonfairSync執行lock的第一步就是嘗試通過compareAndSetState(0, 1)擷取鎖;而FairSync不會進行此操作。
NonfairSync通過nonfairTryAcquire擷取鎖;FairSync通過tryAcquire擷取鎖。這兩個方法的主要邏輯相同,唯一的差别在于FairSync在擷取鎖前會先檢查FIFO隊列中是否有Node存在,如果沒有,則目前線程執行compareAndSetState(0, 1)操作,嘗試擷取鎖。
3 unlock
公平鎖和非公平鎖的釋放鎖的過程是相同的,釋放鎖的流程圖如下。
三 lock & unlock 源碼分析
因為公平鎖、非公平鎖的大部分邏輯相同,是以這裡主要以非公平鎖的源碼來讨論。
1 構造器
以下是ReentrantLock的兩個構造器,如下所示:
我們可以看到,ReentrantLock預設構造器是非公平的。
2 擷取鎖
1) lock
如下代碼示例中,可以看到公平鎖和非公平鎖的第一個差别:非公平鎖lock的第一步就是嘗試通過compareAndSetState(0, 1)擷取鎖,因為他不要求公平,是以他上來就争搶鎖。
a) ReentrantLock#lock
b) NonfairSync#lock
exclusiveOwnerThread代表持有所得線程資訊,是以一旦上鎖成功,需要将exclusiveOwnerThread設定為目前線程。
c) FairSync#lock
2) compareAndSetState
compareAndSetState(0, 1)方法是基于CAS操作,嘗試将state從0改為1。這是基于硬體指令集的原子操作,當且僅當state=0時将其修改為1。
在AQS部分我們提到ReentrantLock中state記錄的鎖次數(當然也包括重入次數)。state=0代表目前沒有線程持有鎖。state>0代表加鎖次數,第一次lock成功state=1,重入一次state++。
Java中的CAS操作基本上都是通過unsafe實作的,代碼如下:
3) acquire
acquire屬于AbstractQueuedSynchronizer中的方法;此方法主要邏輯如下:
嘗試擷取鎖,如果成功,那麼直接傳回。
如果擷取鎖失敗,建立一個Node.EXCLUSIVE類型的Node,然後加入到隊列中。這裡的隊列就是在AQS中的FIFO隊列,其head為一個空節點,其餘的每個Node與一個線程相關聯。
已經入隊列的線程嘗試擷取鎖,如果擷取成功,那麼傳回,擷取失敗,則将被挂起(這裡有一定的條件:要求Node的prev節點的waitStatus=SIGNAL)
4) tryAcquire
此方法是嘗試擷取鎖。此時如果state=0,說明所已經被釋放了,是以可以重新嘗試上鎖操作,即進行compareAndSetState(0, 1)操作;如果state!=0,并且目前線程持有鎖,那麼重入,更新state,即state++。
當且僅當以下兩中情況,此方法傳回true。
如果鎖已經被釋放了,并且目前線程的compareAndSetState(0, 1)成功。
目前線程持有鎖,此時相當于重入,更新state
a) NonfairSync#tryAcquire
b) NonfairSync#nonfairTryAcquire
c) FairSync#tryAcquire
5) addWaiter
addWaiter是屬于AbstractQueuedSynchronizer中的方法。此方法主要邏輯是:建立一個Node,與目前線程關聯,然後嘗試加入到FIFO隊列的尾部。加入到FIFO隊列時,如果發現隊列沒有初始化過,即Head為空,那麼先建立一個空的不與任何線程相關聯的Node最為Head;然後将Node加入到隊列尾部。
a) addWaiter
b) enq
如果FIFO隊列尚未初始化,那麼先初始化;如果已經初始化了,那麼嘗試将node加入到隊尾,失敗則再試一次。
6) acquireQueued
addWaiter是屬于AbstractQueuedSynchronizer中的方法。此方法主要邏輯為:
擷取node的prev節點。
如果prev節點是head節點,則說明目前node是第二個節點,那麼可以嘗試讓目前線程擷取鎖。如果擷取成功,那麼重新設定head,并傳回interrupted标志位。
如果prev節點不是head,那麼認為不用嘗試擷取鎖,此時判斷判斷是否應該挂起(pack)目前線程,如果應該挂起,那麼通過LockSupport#park挂起線程。
注意,如果線程擷取鎖失敗,那麼會在parkAndCheckInterrupt()方法中被挂起。一個線程徹底釋放鎖資源以後,會喚醒head後的一個節點對應的線程,當某一個線程被喚醒以後,繼續執行parkAndCheckInterrupt()方法中LockSupport#park以後的代碼。
a) acquireQueued
b) shouldParkAfterFailedAcquire
此方法是用來判斷是否應該挂起目前節點對應的線程。允許挂起的條件是:prev節點的waitStatus=SIGNAL。這個狀态的意思是:prev節點對應的線程在釋放鎖以後應該喚醒(unpack)node節點對應的線程。
如果prev節點waitStatus>0,即為CANCELLED狀态時,需要将prev從隊列中移除。重試此操作直到找到一個prve節點的狀态不為CANCELLED。
如果prev節點waitStatus<=0(當然不包括SIGNAL狀态),那麼通過CAS操作設定waitStatus= SIGNAL
3 等待逾時則放棄擷取鎖
以下三個方法時擷取逾時放棄鎖的源碼,其中主要邏輯在doAcquireNanos中,和前面讨論的“擷取鎖”部分的(見acquireQueued方法)主要差別在于:在循環中會檢測等待時間是否已經超過指定時間,如果超過了,那麼将Node的waitStatus改為CANCELLED狀态。
1) tryLock
2) tryAcquireNanos
3) doAcquireNanos
4 釋放鎖
1) unlock
2) release
釋放鎖,主要邏輯是:
嘗試釋放鎖。
如果此次釋放完以後,state=0,即持有鎖的線程已經将鎖(包括重入)徹底釋放了,那麼嘗試喚醒(unpack)head後面Node對應的線程(這裡有一個檢查Node節點對應的線程是否已經被CANCELLED的過程,對于這種節點會将他們從隊列中移除)。
3) tryRelease
此方法主要邏輯是:
如果目前線程不是占用所得線程,不允許釋放。
設定state=state-releases。
如果state=0,那麼将鎖的占用線程清理掉。
釋放後如果state=0,那麼傳回true;即隻有在Sync不被任何線程占用,才傳回true。
4) unparkSuccessor
如果node的waitStatus!=CANCELLED狀态,那麼将node的waitStatus改成0。這其實是一個清理head節點狀态的過程,這裡的清理指的是改成初始狀态。
如果node的next節點處于CANCELLED狀态,那麼将此next節點移除隊列;重試此過程直到找到一個不處于CANCELLED狀态的節點或者隊列中沒有節點為止。
如果上一步找到這樣的一個節點,那麼通過LockSupport.unpark去喚醒這個節點對應的線程。
四 Condition
Condition提供了這樣一種能力:線程擷取鎖以後,如果條件不滿足,挂起線程釋放資源;如果條件滿足,則喚醒線程繼續處理。
ReentrantLock可以支援多條件等待,其實作原理如下:每次調用newCondition()方法,都會建立一個ConditionObject對象,每個ConditionObject對象都可以挂一個等待隊列;如果希望同時等待多個條件,隻需要簡單的多次調用newCondition建立多個條件對象就好了。
說明:先區分兩個概念,本節中同步等待隊列指的是指的是AQS中的FIFO的隊列。條件等待隊列指的是ConditionObject對象上的等待隊列。
1 源碼分析
1) newCondition
建立ConditionObject對象,一個ConditionObject對象就是一個普通的對象,沒有什麼特别的地方,隻是提供了await、signal等方法,這一部分将在後面詳細分析。
2) await
釋放鎖,挂起目前線程,等待其他線程發起喚醒信号(signal);被喚醒以後重新擷取鎖。此方法可以認為和Object的wait等價。
此方法主要邏輯如下:
建立一個條件等待Node(waitStatus= CONDITION),加入到條件等待隊列隊尾。
釋放ReentrantLock上的鎖。因為後面要休眠了,是以需要先将此ReentrantLock對象的鎖釋放掉,這樣其他線程才能擷取鎖。
判斷Node是否在同步等待隊列中,如果不在,那麼挂起目前線程。因為Node是await時新建立的,故不在同步隊列中,是以目前線程在這裡會被挂起。
線程被挂起,等待signal信号。
線程被喚醒以後,嘗試擷取鎖。acquireQueued方法在前面已經将結果了,是以這裡就不在累述。因為之前将鎖釋放了,是以線程被喚醒以後需要重新嘗試擷取鎖,擷取鎖的次數為之前釋放的次數。
清理掉已經被取消的節點。
3) addConditionWaiter
此方法的主要邏輯如下:
如果lastWaiter已經被cancel了,那麼直接清理掉他們。
接着建立一個Node,加入到條件等待隊列的尾部。
4) isOnSyncQueue
判斷目前Node的狀态,如果是CONDITION,表明是正常的沒有被喚醒的節點,傳回false。此時目前的Node還沒進入到同步等待隊列。
判斷Node的下一個節點是否為null,不為null表明在同步等待隊列中,傳回true。
周遊整個Node的同步等待隊列,如果存在傳回true ;注意:如果條件已經滿足,這裡有一個條件等待隊列向同步等待隊列的轉化過程。
5) signal
簡單來說是:喚醒線程。
具體來說是:周遊等待隊列,找出第一個等待被喚醒的節點Node,然後将它插入到同步等待隊列尾部,然後就是等待被喚醒(具體的過程與一個線程釋放鎖以後其他線程擷取所得過程相同)。
從條件等待隊列中取出第一個Node。
周遊條件等待隊列,清理已經不再CONDITION狀态的節點,接着将Node加入到同步等待隊列中,然後将Node加入之前的隊尾Node設定為SIGNAL狀态,最後如果狀态修改失敗,則喚醒Node對應的線程。
2 使用示例僞代碼
五 ReentrantLock與synchronized對比
1 Synchronized
Synchronized是通過同步互斥來實作線程安全的;即同一時間隻能有一個線程通路synchronized修飾的代碼塊或方法。其特性與功能如下:
synchronized是java的關鍵字,由jvm支援。
支援重入,即一個線程在擷取對象的鎖以後可以再次對此對象上鎖。
修飾普通方法,鎖是目前執行個體對象;修飾靜态方法,鎖是目前類的class對象;包裹代碼塊,鎖是括号中的對象。
不支援等待中斷。
synchronized是非公平的。
2 ReentrantLock
ReentrantLock是一種非常常見的臨界區處理手段,通過在執行代碼前上鎖保證同一時間隻有一個線程能執行指定的代碼塊。ReentrantLock的特性與功能如下:
ReentrantLock是java api層面的實作,有Unsafe支援。
支援重入。
支援公平鎖、非公平鎖,預設是非公平鎖。公平鎖指:多個線程在等待同一個線程的鎖時,必須按照申請所得時間順序來擷取鎖。非公平鎖指:在鎖被釋放時,任何一個等待鎖的線程都有機會擷取鎖。
支援等待中斷。例如:A線程擷取對象O的鎖, B線程等待擷取O的鎖,當B長時間無法擷取鎖時,B可以放棄擷取鎖。
鎖可以綁定多個條件。線程進入臨界區,卻發現在某一條件滿足之後才能執行,條件對象就是用來管理那些已經獲得了鎖,但是卻不能做有用工作的線程。一個ReentrantLock對象可以同時綁定多個Condition對象。
3 總結
ReentrantLock本質上是通過一個隊列來完成同步的。因為每個Node與一個線程關聯,隻需要做好對隊列節點的同步處理,既可以完成多線程的同步處理。
相比于synchronized關鍵字,ReentrantLock等多的類似于我們使用zookeeper實作的等待隊列。Zookeeper的隊列示例可以參考部落格《Zookeeper使用案例》位址為:https://yq.aliyun.com/articles/272103
六 部落格
關于線程、synchronized的更多内容,可以參考《線程-基礎知識》,位址為:https://yq.aliyun.com/articles/414908
關于synchronized的原理,可以參考《互斥同步-鎖》,位址為:https://yq.aliyun.com/articles/414939