在JDK1.5之前共享對象的協調機制隻有synchronized和volatile,在JDK1.5中增加了新的機制ReentrantLock,該機制的誕生并不是為了替代synchronized,而是在 synchronized 不适用的情況下,提供一種可以選擇的進階功能。
synchronized 和 ReentrantLock 是如何實作的?它們有什麼差別?
synchronized 屬于獨占式悲觀鎖,是通過 JVM 隐式實作的,synchronized 隻允許同一時刻隻有一個線程操作資源。
在 Java 中每個對象都隐式包含一個 monitor(螢幕)對象,加鎖的過程其實就是競争 monitor 的過程,當線程進入位元組碼 monitorenter 指令之後,線程将持有 monitor 對象,執行 monitorexit 時釋放 monitor 對象,當其他線程沒有拿到 monitor 對象時,則需要阻塞等待擷取該對象。
ReentrantLock 是 Lock 的預設實作方式之一,它是基于 AQS(Abstract Queued Synchronizer,隊列同步器)實作的,它預設是通過非公平鎖實作的,在它的内部有一個 state 的狀态字段用于表示鎖是否被占用,如果是 0 則表示鎖未被占用,此時線程就可以把 state 改為 1,并成功獲得鎖,而其他未獲得鎖的線程隻能去排隊等待擷取鎖資源。
synchronized 和 ReentrantLock 都提供了鎖的功能,具備互斥性和不可見性。在 JDK 1.5 中 synchronized 的性能遠遠低于 ReentrantLock,但在 JDK 1.6 之後 synchronized 的性能略低于 ReentrantLock,它的差別如下:
synchronized是JVM隐式實作的,而ReentrantLock是Java語言提供的API;
ReentrantLock可設定為公平鎖,而synchronized卻不行;
ReentrantLock 隻能修飾代碼塊,而 synchronized 可以用于修飾方法、修飾代碼塊等;
ReentrantLock 需要手動加鎖和釋放鎖,如果忘記釋放鎖,則會造成資源被永久占用,而 synchronized 無需手動釋放鎖;
ReentrantLock 可以知道是否成功獲得了鎖,而 synchronized 卻不行。
ReentrantLock 源碼分析
構造函數:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();// 無參構造方法建立的鎖為非公平鎖
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
無參的構造函數建立了一個非公平鎖,使用者也可以根據第二個構造函數,設定一個 boolean 類型的值,來決定是否使用公平鎖來實作線程的排程。
公平鎖和非公平鎖的差別:
公平鎖的含義是線程需要按照請求的順序來獲得鎖;而非公平鎖則允許“插隊”的情況存在,所謂的“插隊”指的是,線程在發送請求的同時該鎖的狀态恰好變成了可用,那麼此線程就可以跳過隊列中所有排隊的線程直接擁有該鎖。
而公平鎖由于有挂起和恢複是以存在一定的開銷,是以性能不如非公平鎖,是以 ReentrantLock 和 synchronized 預設都是非公平鎖的實作方式。
ReentrantLock 是通過 lock() 來擷取鎖,并通過 unlock() 釋放鎖,使用代碼如下:
Lock lock = new ReentrantLock();
try {
// 加鎖
lock.lock();
// 業務處理
} finally {
// 釋放鎖
lock.unlock();
}
}
ReentrantLock 中的 lock() 是通過 sync.lock() 實作的,但 Sync 類中的 lock() 是一個抽象方法,需要子類 NonfairSync 或 FairSync 去實作
sync的源碼如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
NonfairSync 中的源碼如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
FairSync 中的源碼如下:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平鎖比非公平鎖多了一行代碼 !hasQueuedPredecessors()
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 嘗試擷取鎖
setExclusiveOwnerThread(current); // 擷取成功,标記被搶占
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // set state = state + 1
return true;
}
return false;
}
}
從lock()方法中可以看出非公平鎖比公平鎖隻是多了一行compareAndSetState方法,該方法是嘗試将state值由0置換為1,如果設定成功的話,則說明目前沒有其他線程持有該鎖,不用再去排隊了,可直接占用該鎖,否則,則需要通過 acquire 方法去排隊。
tryacquire()方法嘗試擷取鎖,如果擷取鎖失敗,則把它加入到阻塞隊列中。
對于tryacquire()方法,公平鎖比非公平鎖隻多一行代碼 !hasQueuedPredecessors(),它用來檢視隊列中是否有比它等待時間更久的線程,如果沒有,就嘗試一下是否能擷取到鎖,如果擷取成功,則标記為已經被占用。
如果擷取鎖失敗,則調用 addWaiter 方法把線程包裝成 Node 對象,同時放入到隊列中,但 addWaiter 方法并不會嘗試擷取鎖,acquireQueued 方法才會嘗試擷取鎖,如果擷取失敗,則此節點會被挂起。
AbstractQueuedSynchronizer類源碼如下:
addWaiter() 方法
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued()方法源碼:
/**
* 隊列中的線程嘗試擷取鎖,失敗則會被挂起
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 擷取鎖是否成功的辨別
try {
boolean interrupted = false; // 線程是否被中斷
for (;;) {
// 擷取前一個節點(前驅節點)
final Node p = node.predecessor();
// 目前節點為頭節點的下一個節點時,有權嘗試擷取鎖
if (p == head && tryAcquire(arg)) {
setHead(node); // 擷取成功,将目前節點設定為 head 節點
p.next = null; // help GC 原 head 節點出隊,等待被 GC
failed = false; // 擷取成功
return interrupted;
}
// 判斷擷取鎖失敗後是否可以挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 線程若被中斷,傳回 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
該方法會使用 for(;;)無限循環的方式來嘗試擷取鎖,若擷取失敗,則調用 shouldParkAfterFailedAcquire 方法,嘗試挂起目前線程
shouldParkAfterFailedAcquire ()方法源碼:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
* 判斷線程是否可以被挂起
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲得前驅節點的狀态
int ws = pred.waitStatus;
// 前驅節點的狀态為SIGNAL,目前線程可以被挂起(阻塞)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 若前驅節點狀态為 CANCELLED,那就一直往前找,直到找到一個正常等待的狀态為止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 并将目前節點排在它後邊
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 把前驅節點的狀态修改為 SIGNA
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
線程入列被挂起的前提條件是,前驅節點的狀态為 SIGNAL,SIGNAL 狀态的含義是後繼節點處于等待狀态,目前節點釋放鎖後将會喚醒後繼節點。是以在上面這段代碼中,會先判斷前驅節點的狀态,如果為 SIGNAL,則目前線程可以被挂起并傳回 true;如果前驅節點的狀态 >0,則表示前驅節點取消了,這時候需要一直往前找,直到找到最近一個正常等待的前驅節點,然後把它作為自己的前驅節點;如果前驅節點正常(未取消),則修改前驅節點狀态為 SIGNAL。
到這裡整個加鎖的流程就已經走完了,最後的情況是,沒有拿到鎖的線程會在隊列中被挂起,直到擁有鎖的線程釋放鎖之後,才會去喚醒其他的線程去擷取鎖資源,整個運作流程如下圖所示:
unlock 相比于 lock 來說就簡單很多了,源碼如下:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 釋放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
鎖的釋放流程為,先調用 tryRelease 方法嘗試釋放鎖,如果釋放成功,則檢視頭結點的狀态是否為SIGNAL,如果是,則喚醒頭結點的下個節點關聯的線程;如果釋放鎖失敗,則傳回 false。
tryRelease 源碼如下:
/**
* 嘗試釋放目前線程占有的鎖
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 釋放鎖後的狀态,0 表示釋放鎖成功
// 如果擁有鎖的線程不是目前線程的話抛出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 鎖被成功釋放
free = true;
setExclusiveOwnerThread(null); // 清空獨占線程
}
setState(c); // 更新 state 值,0 表示為釋放鎖成功
return free;
}
在 tryRelease 方法中,會先判斷目前的線程是不是占用鎖的線程,如果不是的話,則會抛出異常;如果是的話,則先計算鎖的狀态值 getState() - releases 是否為 0,如果為 0,則表示可以正常的釋放鎖,然後清空獨占的線程,最後會更新鎖的狀态并傳回執行結果。
JDK 1.6 鎖優化
自适應自旋鎖
JDK 1.5 在更新為 JDK 1.6 時,HotSpot 虛拟機團隊在鎖的優化上下了很大功夫,比如實作了自适應式自旋鎖、鎖更新等。
JDK 1.6 引入了自适應式自旋鎖意味着自旋的時間不再是固定的時間了,比如在同一個鎖對象上,如果通過自旋等待成功擷取了鎖,那麼虛拟機就會認為,它下一次很有可能也會成功 (通過自旋擷取到鎖),是以允許自旋等待的時間會相對的比較長,而當某個鎖通過自旋很少成功獲得過鎖,那麼以後在擷取該鎖時,可能會直接忽略掉自旋的過程,以避免浪費 CPU 的資源,這就是自适應自旋鎖的功能。
鎖更新
鎖更新其實就是從偏向鎖到輕量級鎖再到重量級鎖更新的過程,這是 JDK 1.6 提供的優化功能,也稱之為鎖膨脹。
偏向鎖是指在無競争的情況下設定的一種鎖狀态。偏向鎖的意思是它會偏向于第一個擷取它的線程,當鎖對象第一次被擷取到之後,會在此對象頭中設定标示為“01”,表示偏向鎖的模式,并且在對象頭中記錄此線程的 ID,這種情況下,如果是持有偏向鎖的線程每次在進入的話,不再進行任何同步操作,如 Locking、Unlocking 等,直到另一個線程嘗試擷取此鎖的時候,偏向鎖模式才會結束,偏向鎖可以提高帶有同步但無競争的程式性能。但如果在多數鎖總會被不同的線程通路時,偏向鎖模式就比較多餘了,此時可以通過 -XX:-UseBiasedLocking 來禁用偏向鎖以提高性能。
輕量鎖是相對于重量鎖而言的,在 JDK 1.6 之前,synchronized 是通過作業系統的互斥量(mutex lock)來實作的,這種實作方式需要在使用者态和核心态之間做轉換,有很大的性能消耗,這種傳統實作鎖的方式被稱之為重量鎖。
而輕量鎖是通過比較并交換(CAS,Compare and Swap)來實作的,它對比的是線程和對象的 Mark Word(對象頭中的一個區域),如果更新成功則表示目前線程成功擁有此鎖;如果失敗,虛拟機會先檢查對象的 Mark Word 是否指向目前線程的棧幀,如果是,則說明目前線程已經擁有此鎖,否則,則說明此鎖已經被其他線程占用了。當兩個以上的線程争搶此鎖時,輕量級鎖就膨脹為重量級鎖,這就是鎖更新的過程,也是 JDK 1.6 鎖優化的内容。