天天看点

ReentrantLock重入锁源码解析

ReentrantLock重入锁源码解析

ReentrantLock加锁释放锁,都是通过其一个对象属性Sync来实现的,Sync继承了AbstractQueuedSynchronizer,也就是大名鼎鼎的aqs了。而Sync本身有两个子类,一个是非公平实现,一个是公平实现,ReentrantLock中默认采用非公平锁。本文只探讨非公平实现逻辑

public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
           

lock方法

lock方法会调用sync的lock方法

final void lock() {
	//cas尝试修改state变量,从0变为1
	//这里其实就能体现所谓的非公平了,上来就直接尝试获取锁,不会老老实实去同步队列尾部等着
	if (compareAndSetState(0, 1))
		//修改成功,设置当前持有锁的线程
	    setExclusiveOwnerThread(Thread.currentThread());
   	else
   		//失败,这段代码的实现在父类AQS中
        acquire(1);
}
           

state是aqs中的一个属性,加锁时就是使其+1,释放锁就是-1

acquire方法

public final void acquire(int arg) {
		//tryAcquire方法会走子类实现,最终执行nonfairTryAcquire
		//如果tryAcquire返回了false,那么就会往下走
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //获取state
        int c = getState();
        //等于0说明没有线程持有锁
        if (c == 0) {
        	//cas加锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //不等于0,锁被持有了,看看是否是自己持有的,是的话,state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //获取锁失败,返回false
        return false;
    }
           

tryAcquire方法就是尝试去获取锁了,lock加锁失败,到这里可能锁又被释放了,或者是自己加的锁,现在是重入,所以tryAcquire就是重新尝试一下,如果还是失败了,那么就需要加入到同步队列尾部了,也就是接下来的addWaiter方法。

这里先说说这个同步队列。这个同步队列的节点是一个Node类,是AQS的一个静态内部类,也是AQS的核心。里面包装了当前线程,以及waitStatus,如等待唤醒、取消等

static final class Node{
    static final Node SHARED = new Node();
    
    static final Node EXCLUSIVE = null;
    //已取消状态,要等阻塞队列里某个线程被唤醒抢锁失败抛异常才标记为这个
    static final int CANCELED = 1;
    //被加入阻塞队列后,等待唤醒的状态
    static final SIGNAL = -1;
	//等待状态,调用Condition.await方法,会加入到等待队列,状态就是这个
    static final CONDITION = -2;
    //这是用以读写重入锁的
    static final int PROPAGATE = -3;
    
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
}
           

acquireQueued与addWaiter 方法

//先看addWaiter方法
private Node addWaiter(Node mode) {
    //首先构造一个node,状态是EXCLUSIVE,意思就是排它锁了
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //这里如果为null,说明链表还不存在,不为null,加入到尾部,这里使用cas操作进行抢占,因为可能有别的线程也在尝试
    //如果链表不存在,或者抢占失败,走enq
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //加入到链表尾部
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) { //自旋,不断尝试。
        Node t = tail;
        //如果链表不存在,那么先创建一个头结点,这个头结点不存储线程信息
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //否则的话,尝试加入尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    	//这是一个中断标志,如果线程在自旋尝试获取锁的过程中被阻塞过,会返回true,然后再acquire方法中再次中断一下,目的是恢复中断标志位
    	//因为在下面parkAndCheckInterrupt中会调用interrupted方法,这个方法会清除中断标志
        boolean interrupted = false;
        for (;;) { //自旋
            //拿到这个node的前一个结点
            final Node p = node.predecessor();
            //如果p是头结点
            //前面tryAcquire方法我们知道,这里是去尝试抢占锁,或者重入锁,如果失败了,看下面
            //如果成功了呢,把这个节点变成头结点,原来的头结点置为null,这里会把这个node的thread=null,prev=null
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果p的waitStatus不是-1,返回的false,那么自旋重新来,如果是-1了,就走parkAndCheckInterrupt,阻塞线程
            //那么假设现在我们的这个线程被阻塞在这个地方了,不会往下走了,我们看看当它的前一个线程释放锁会怎样,unlock
            //!!!看完了吗!前一个节点unlock释放锁后,会唤醒后一个节点或是从后往前第一个可执行节点,这个线程被唤醒,那么它再次自旋去尝试抢占锁
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //报错了,就会进入这个方法了
        if (failed)
            cancelAcquire(node);
    }
}
           

可以看到,addWaiter方法实际上就构造了一个同步队列(阻塞队列),每一个tryAcquire尝试获取锁失败的线程,都加入到这个队列尾部去。之后在acquireQueued方法中,自旋,看看自己是不是head的下一个节点,是的话尝试获取锁,不是的话在shouldParkAfterFailedAcquire方法中会判断前一个节点的waitStatus是否是signal,也就是-1,是的话就阻塞自己,同时这个方法内也会清除该节点前面的状态为CANCELLED的节点

而finally中的cancelAcquire,主要就是清除被取消的节点

unlock方法

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    //如果state现在为0了,就是返回true了,看看怎么走
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	//唤醒之前被阻塞的线程,注意传入的节点是head节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    //释放锁就是-1嘛,如果c==0,说明现在没人持有锁了,就设置拥有线程为null,放回true。
    //否则说明之前是重入的,需要继续释放,返回false
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
   
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    //如果下一个节点是null或者状态是已取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从后往前找到一个可执行节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果是一个可执行节点,唤醒它!
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

总结

一段lock与unlock流程看下来,就是lock的时候尝试获取锁,获取锁是通过cas修改state从0到1,修改成功的即获取锁成功并且标识出当前拥有锁的线程,同一个线程多次lock,state会+1,最后释放锁也需要多次释放直到state变为0。

而获取锁失败的线程,就会被封装成一个node节点,加入到同步队列的尾部,期间会通过自旋反复尝试获取锁,最终当前一个节点状态为待唤醒时就会阻塞自己,等待唤醒。

而释放锁就是cas让state-1了,当state为0时就是锁完全释放,唤醒头结点的下一个节点,或者是尾部往前第一个可执行节点。