ReentrantLock重入锁源码解析
ReentrantLock加锁释放锁,都是通过其一个对象属性Sync来实现的,Sync继承了AbstractQueuedSynchronizer,也就是大名鼎鼎的aqs了。而Sync本身有两个子类,一个是非公平实现,一个是公平实现,ReentrantLock中默认采用非公平锁。本文只探讨非公平实现逻辑
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock方法
lock方法会调用sync的lock方法
final void lock() {
//cas尝试修改state变量,从0变为1
//这里其实就能体现所谓的非公平了,上来就直接尝试获取锁,不会老老实实去同步队列尾部等着
if (compareAndSetState(0, 1))
//修改成功,设置当前持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//失败,这段代码的实现在父类AQS中
acquire(1);
}
state是aqs中的一个属性,加锁时就是使其+1,释放锁就是-1
acquire方法
public final void acquire(int arg) {
//tryAcquire方法会走子类实现,最终执行nonfairTryAcquire
//如果tryAcquire返回了false,那么就会往下走
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取state
int c = getState();
//等于0说明没有线程持有锁
if (c == 0) {
//cas加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//不等于0,锁被持有了,看看是否是自己持有的,是的话,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败,返回false
return false;
}
tryAcquire方法就是尝试去获取锁了,lock加锁失败,到这里可能锁又被释放了,或者是自己加的锁,现在是重入,所以tryAcquire就是重新尝试一下,如果还是失败了,那么就需要加入到同步队列尾部了,也就是接下来的addWaiter方法。
这里先说说这个同步队列。这个同步队列的节点是一个Node类,是AQS的一个静态内部类,也是AQS的核心。里面包装了当前线程,以及waitStatus,如等待唤醒、取消等
static final class Node{
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//已取消状态,要等阻塞队列里某个线程被唤醒抢锁失败抛异常才标记为这个
static final int CANCELED = 1;
//被加入阻塞队列后,等待唤醒的状态
static final SIGNAL = -1;
//等待状态,调用Condition.await方法,会加入到等待队列,状态就是这个
static final CONDITION = -2;
//这是用以读写重入锁的
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
acquireQueued与addWaiter 方法
//先看addWaiter方法
private Node addWaiter(Node mode) {
//首先构造一个node,状态是EXCLUSIVE,意思就是排它锁了
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//这里如果为null,说明链表还不存在,不为null,加入到尾部,这里使用cas操作进行抢占,因为可能有别的线程也在尝试
//如果链表不存在,或者抢占失败,走enq
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//加入到链表尾部
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //自旋,不断尝试。
Node t = tail;
//如果链表不存在,那么先创建一个头结点,这个头结点不存储线程信息
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//否则的话,尝试加入尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//这是一个中断标志,如果线程在自旋尝试获取锁的过程中被阻塞过,会返回true,然后再acquire方法中再次中断一下,目的是恢复中断标志位
//因为在下面parkAndCheckInterrupt中会调用interrupted方法,这个方法会清除中断标志
boolean interrupted = false;
for (;;) { //自旋
//拿到这个node的前一个结点
final Node p = node.predecessor();
//如果p是头结点
//前面tryAcquire方法我们知道,这里是去尝试抢占锁,或者重入锁,如果失败了,看下面
//如果成功了呢,把这个节点变成头结点,原来的头结点置为null,这里会把这个node的thread=null,prev=null
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p的waitStatus不是-1,返回的false,那么自旋重新来,如果是-1了,就走parkAndCheckInterrupt,阻塞线程
//那么假设现在我们的这个线程被阻塞在这个地方了,不会往下走了,我们看看当它的前一个线程释放锁会怎样,unlock
//!!!看完了吗!前一个节点unlock释放锁后,会唤醒后一个节点或是从后往前第一个可执行节点,这个线程被唤醒,那么它再次自旋去尝试抢占锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//报错了,就会进入这个方法了
if (failed)
cancelAcquire(node);
}
}
可以看到,addWaiter方法实际上就构造了一个同步队列(阻塞队列),每一个tryAcquire尝试获取锁失败的线程,都加入到这个队列尾部去。之后在acquireQueued方法中,自旋,看看自己是不是head的下一个节点,是的话尝试获取锁,不是的话在shouldParkAfterFailedAcquire方法中会判断前一个节点的waitStatus是否是signal,也就是-1,是的话就阻塞自己,同时这个方法内也会清除该节点前面的状态为CANCELLED的节点
而finally中的cancelAcquire,主要就是清除被取消的节点
unlock方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//如果state现在为0了,就是返回true了,看看怎么走
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒之前被阻塞的线程,注意传入的节点是head节点
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//释放锁就是-1嘛,如果c==0,说明现在没人持有锁了,就设置拥有线程为null,放回true。
//否则说明之前是重入的,需要继续释放,返回false
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果下一个节点是null或者状态是已取消
if (s == null || s.waitStatus > 0) {
s = null;
//从后往前找到一个可执行节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果是一个可执行节点,唤醒它!
if (s != null)
LockSupport.unpark(s.thread);
}
总结
一段lock与unlock流程看下来,就是lock的时候尝试获取锁,获取锁是通过cas修改state从0到1,修改成功的即获取锁成功并且标识出当前拥有锁的线程,同一个线程多次lock,state会+1,最后释放锁也需要多次释放直到state变为0。
而获取锁失败的线程,就会被封装成一个node节点,加入到同步队列的尾部,期间会通过自旋反复尝试获取锁,最终当前一个节点状态为待唤醒时就会阻塞自己,等待唤醒。
而释放锁就是cas让state-1了,当state为0时就是锁完全释放,唤醒头结点的下一个节点,或者是尾部往前第一个可执行节点。