话不多说,下面通过流程图及源码介绍ReentrantLock的实现细节。
先看下逻辑流程图,总体统揽:
1. ReentrantLock的实现依赖于AQS-AbstractQueuedSynchronizer
ReentrantLock中有两种锁实现,分别是公平锁
fairSync
和非公平锁
NonfairSync
,他们都继承Sync,而 Sync又继承自AbstractQueuedSynchronizer 。
重点关注重写的
tryAcquire
与
tryRelease
方法,他俩分别是加锁和释放锁的重要逻辑。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
...
}
复制
2. AQS的模板模式
acquire方法作为模板方法,包含了未实现方法
tryAcquire
,相当于抽象类。(这里没有实现成抽象类的原因是AQS还可以提供共享锁的实现框架,而tryAcquire属于独占锁依赖的方法,实现共享锁没必要实现独占锁的抽象方法)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败则进入等待队列
selfInterrupt();
}
复制
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
复制
3. 以公平锁来看获取锁逻辑
主要有三个关注点:
- lock(加锁)调用模板方法acquire
- tryAcquire(尝试获取锁 成功-true 失败-false)如果当前没有线程持有锁(有可能等待队列中有线程等待,但是锁刚释放,队首线程还未争抢)则CAS争抢锁,争抢成功设置当前线程为锁持有线程。
- tryAcquire如果有线程持有锁,判断是否是当前线程持有的,如果是则变成重入锁state+1,否则返回false获取失败。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);// 加锁,直接调用AQS的模板方法acquire
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 没有线程持有锁
if (!hasQueuedPredecessors() && // 等待队列中没有等待线程
compareAndSetState(0, acquires)) { // CAS争抢锁
setExclusiveOwnerThread(current); // 争抢成功之后将锁持有线程变成当前持有线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入锁的关键,判断持有锁的线程是否是当前线程
int nextc = c + acquires;
if (nextc < 0) // 溢出
throw new Error("Maximum lock count exceeded");
setState(nextc); // 重入锁+1
return true;
}
return false;
}
}
复制
4. 尝试获取锁失败进入等待队列
- 将新进线程节点放置队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 如果等待队列为空,初始化等待队列并将新进线程节点放置队尾
return node;
}
复制
- 如果等待队列为空,初始化等待队列并将新进线程节点放置队尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 将新进线程节点放置队尾
t.next = node;
return t;
}
}
}
}
复制
- 将排队线程挂起,形成阻塞态,释放cpu,等待唤醒
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 对当前线程进行LockSupport.park(this)操作挂起线程,形成阻塞态,释放cpu
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制
- 对当前线程进行LockSupport.park(this)操作挂起线程,形成阻塞态,释放cpu,排队等待获取锁,获取锁时通过LockSupport.unpark唤醒线程。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制
5. unlock释放锁
- 调用AQS的release方法
public void unlock() {
sync.release(1); // AQS的release模板方法
}
复制
- release也是一个模板方法,最后调用unparkSuccessor唤醒线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒线程
return true;
}
return false;
}
复制
- 调用LockSupport.unpark唤醒等待线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒等待线程
}
复制