重入鎖
重進入是指任意線程在擷取到鎖之後能夠再次擷取該鎖而不會被鎖所阻塞,該特性的實作需要解決以下兩個問題。
1)線程再次擷取鎖。鎖需要去識别擷取鎖的線程是否為目前占據鎖的線程,如果是,則再次成功擷取。
2)鎖的最終釋放。線程重複n次擷取了鎖,随後在第n次釋放該鎖後,其他線程能夠擷取到該鎖。鎖的最終釋放要求鎖對于擷取進行計數自增,計數表示目前鎖被重複擷取的次數,而鎖被釋放時,計數自減,當計數等于0時表示鎖已經成功釋放。
公平性鎖保證了鎖的擷取按照FIFO原則,而代價是進行大量的線程切換。非公平性鎖雖然可能造成線程“饑餓”,但極少的線程切換,保證了其更大的吞吐量。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
該方法與nonfairTryAcquire(int acquires)比較,唯一不同的位置為判斷條件多了hasQueuedPredecessors()方法,即加入了同步隊列中目前節點是否有前驅節點的判斷,如果該方法傳回true,則表示有線程比目前線程更早地請求擷取鎖,是以需要等待前驅線程擷取并釋放鎖之後才能繼續擷取鎖。
讀寫鎖ReentrantReadWriteLock
排他鎖,這些鎖在同一時刻隻允許一個線程進行通路,而讀寫鎖在同一時刻可以允許多個讀線程通路,但是在寫線程通路時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得并發性相比一般的排他鎖有了很大提升。
當寫鎖被擷取到時,後續(非目前寫操作線程)的讀寫操作都會被阻塞,寫鎖釋放之後,所有操作繼續執行,
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 擷取一個key對應的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 設定key對應的value,并傳回舊的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
讀寫狀态的設計
在一個整型變量上維護多種狀态,就一定需要“按位切割使用”這個變量,讀寫鎖将變量切分成了兩個部分,高16位表示讀,低16位表示寫
寫鎖的擷取與釋放
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//存在讀鎖或者目前擷取線程不是已經擷取寫鎖的線程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
如果存在讀鎖,則寫鎖不能被擷取,原因在于:讀寫鎖要確定寫鎖的操作對讀鎖可見,如果允許讀鎖在已被擷取的情況下對寫鎖的擷取,那麼正在運作的其他讀線程就無法感覺到目前寫線程的操作。
寫鎖一旦被擷取,則其他讀寫線程的後續通路均被阻塞。
寫鎖的釋放與ReentrantLock的釋放過程基本類似,每次釋放均減少寫狀态,當寫狀态為0時表示寫鎖已被釋放,進而等待的讀寫線程能夠繼續通路讀寫鎖,同時前次寫線程的修改對後續讀寫線程可見。
讀鎖的擷取與釋放
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
如果其他線程已經擷取了寫鎖,則目前線程擷取讀鎖失敗,進入等待狀态。
如果目前線程擷取了寫鎖或者寫鎖未被擷取,則目前線程(線程安全,依靠CAS保證)增加讀狀态,成功擷取讀鎖。
讀鎖的每次釋放(線程安全的,可能有多個讀線程同時釋放讀鎖)均減少讀狀态,減少的值是(1<<16)。
鎖降級
鎖降級指的是寫鎖降級成為讀鎖。如果目前線程擁有寫鎖,然後将其釋放,最後再擷取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(目前擁有的)寫鎖,再擷取到讀鎖,随後釋放(先前擁有的)寫鎖的過程。
鎖降級是指把持住(目前擁有的)寫鎖,再擷取到讀鎖,随後釋放(先前擁有的)寫鎖的過程。
示例:
public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級從寫鎖擷取到開始
writeLock.lock();
try {
if (!update) {
// 準備資料的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級完成,寫鎖降級為讀鎖
}
try {
// 使用資料的流程(略)
} finally {
readLock.unlock();
}
}
隻有一個線程能夠擷取到寫鎖,其他線程會被阻塞在讀鎖和寫鎖的lock()方法上。目前線程擷取寫鎖完成資料準備之後,再擷取讀鎖,随後釋放寫鎖,完成鎖降級。
目前線程 | 其他線程 | 是否阻塞 |
---|---|---|
讀鎖 | 讀鎖 | 不阻塞 |
讀鎖 | 寫鎖 | 阻塞 |
寫鎖 | 讀鎖 | 阻塞 |
寫鎖 | 寫鎖 | 阻塞 |