天天看点

JUC基石 ---AQS

AQS

  • 1.什么是AQS??
  • 2.以ReentrantLock为例,理解AQS
    • 2.1首先来看公平与非公平是咋实现的??
      • 2.1.1 加锁
    • 2.2 可重入锁实现原理
    • 2.3 打断与不可打断实现原理
      • 2.1.1 解锁
    • 2.4 获取锁超时实现
    • 2.5 条件变量的实现

1.什么是AQS??

AQS: 抽象的队列同步器:

是JUC的基石,是构建锁(ReentrantLock和ReentrantReadWriteLock)和其他同步器组件(Semaphore,CountDownLatch,CyclicBarriar)的基础框架;

  • 抽象: 我们要使用AQS,需要继承AQS并重写诸如

    tryAcquire

    tryRelease

    tryAcquireShared

    tryReleaseShared

    isHeldExclusively

    等抽象方法;

  • 队列: AQS抽象类中维护了一个CLH的变种,即双向的先进先出的队列,用head,tail记录队首和队尾元素,元素的类型为Node,将暂时获取不到锁的线程封装为Node节点加入到队列中;
  • 同步器: AQS中定义了一个int型的被volatile修饰的变量,可以通过getState,setState,compareAndSetState函数修改其值;线程同步的关键就是对state进行操作,根据state是否属于一个线程,操作state的方式分为独占式和共享式

    对于ReentrantLock来说,state可以用来表示获取锁的可重入次数

    对于ReentrantReadWriteLock,state的高16位表示获取读锁的次数,低16位表示获取到写锁的线程的可重入次数

    对于Semaphore来说,state用来表示当前可用信号的个数

    对于CountDownLatch来说,用来表示计数器当前的值

AQS的组成

(红色线代表是内部类)

JUC基石 ---AQS
JUC基石 ---AQS

AQS中还有一个ConditionObject内部类

JUC基石 ---AQS

Node节点

JUC基石 ---AQS

AQS在

独占方式

下获取和释放资源使用的方法是:

void acquire(int arg)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
      }
           

void acquireInterruptibly(int arg)

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
           

boolean release(int arg)

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

其中tryAcquire和tryRelease方法需要由具体的子类来实现,这里是模板方法的体现

JUC基石 ---AQS

可以发现,AQS中该方法是直接抛出异常的

共享方式

下获取和释放资源使用的方法为:

void acquireShared(int arg)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
           

public final void acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
           

public final boolean releaseShared(int arg)

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

2.以ReentrantLock为例,理解AQS

我们知道ReentrantLock具有

可重入

可打断

公平和非公平

条件变量

可超时等特性

2.1首先来看公平与非公平是咋实现的??

可以看到ReentrantLock中维护了Sync,FairSync,NonFairSync3个内部类

以及他们的继承关系Sync继承AQS,FairSync,NonFairSync继承Sync

JUC基石 ---AQS
JUC基石 ---AQS

默认创建的是非公平锁

JUC基石 ---AQS

当调用Reentrant的ock方法时,实际上是调用sync的lock方法,sync进而调用fairSync或者NonFairSync的lock方法

public void lock() {
      sync.lock();
}
           
JUC基石 ---AQS
JUC基石 ---AQS

这里可以发现公平锁与非公平锁的第一个区别:

非公平锁: 调用lock时,首先就会执行一次CAS,如果失败,才会进入acquire方法

公平锁: 不会进行CAS抢占锁,直接进入acquire方法

JUC基石 ---AQS

具体tryAcquire方法交给子类来实现

JUC基石 ---AQS

可以明显看出公平锁与非公平锁的第二个区别在于tryAcquire方法**

就在于公平锁在获取同步锁时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors是判断等待队列中是否存在

有效节点

的方法

如果h == t,说明当前队列为空,就直接返回false,那么公平锁发现队列中没有人,那我就去获取锁

如果h != t, 也就是head != tail

------- > 如果h != t 并且 s ==null,说明有一个元素将要作为AQS的第一个节点,返回true

--------> 如果h != t 并且 s != null 以及 s.thread != Thread.currentThread(),则说明,队列里面已经有节点了,但是不是当前线程,则返回true

---------->如果h != t 并且 s != null 以及 s.thread == Thread.currentThread(),说明队列中的节点是当前线程,那么返回false(这代表着锁重入!!!)

所以总结一下: 只有

队列为空

或者

当前线程节点是AQS中的第一个节点

,则返回false,代表后续可以去获取锁;

否则其他情况都表示,已经有前驱节点,返回true代表后续不能获取锁!!

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
	Node t = tail;
	Node h = head;
	Node s;
	// h != t 时表示队列中有 Node
	return h != t &&
	(
	// (s = h.next) == null 表示队列中还有没有老二
	(s = h.next) == null ||
	// 或者队列中老二线程不是此线程
	s.thread != Thread.currentThread()
	);
}
           

2.1.1 加锁

2.2 可重入锁实现原理

**`从上面的分析可知:

  1. 如果cas设置state==0 成功,即获取锁成功,则返回true,
  2. 如果发现state != 0,但是持有锁的线程是当前线程,则表明当前线程要重入,所以给state + 1(

    所谓的可重入的特性,就是这里实现的

    )
  3. 只有当state的状态不是为0(代表已经有线程持有锁)并且持有锁的线程不是当前线程,则返回false,即获取锁失败,

    然后来到addWaiter()和acquireQueued()方法`**

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
           

先来看看addWaiter()

private Node addWaiter(Node mode) {
 		// 把当前线程封装为一个节点, mode传入的是EXCLUSIVE代表独占模式
        Node node = new Node(Thread.currentThread(), mode);
        //把tail赋值给pred
        Node pred = tail;
		//刚开始,taii == head == pred == null
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //把当前节点入队
        enq(node);
        return node;
    }
           

enq方法

private Node enq(final Node node) {
  		//循环
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

第一次循环:

由于t == null 创建一个哨兵节点

注意: 这里没有只是new Node(),创建了一个哨兵节点!!!

,因此进入

compareAndSetHead(new Node()

,把head的引用指向哨兵节点,然后tail = head,tail也指向哨兵结点

private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
           

示意图如下:

JUC基石 ---AQS

创建好哨兵节点后,进入第二次循环

此时才开始把node节点入队,上面只是设置哨兵节点

node.prev设置node节点的前驱节点为哨兵节点,然后改变tail的引用指向node节点

private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
           
JUC基石 ---AQS

至此,addWaiter方法执行结束,返回node节点,进入acquireQueued()

acquireQueued()方法: acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

  1. 如果当前节点的前驱节点是head(代表当前队列中我是第一个要获取锁的节点) 那么再次tryAcquire()获取锁,

    1.1获取锁成功,则通过setHead方法把head指向node,并把node节点中线程置位null,prev置位null,然后p.next置位null,即node节点变为了此时的哨兵节点.然后返回打断标记

    1.2如果获取锁失败,进入shouldParkAfterFailedAcquire

  2. 如果当前节点的前驱节点不是head,那么就不会尝试获取锁了(我前面还有人等着呢!!!老实去排队!!),直接进入shouldParkAfterFailedAcquire方法

    2.1shouldParkAfterFailedAcquire返回false,则继续循环

    2.2 如果返回true,则执行parkAndCheckInterrupt()方法,真正阻塞自己

    (1)如果线程因为interrupt被唤醒,parkAndCheckInterrupt()会返回true则interrupted置为true,继续循环去尝试获取锁

    (2)如果线程是被unpark唤醒, parkAndCheckInterrupt()会返回false,然后继续循环

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//得到node的前驱节点
                final Node p = node.predecessor();
				
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        
        //这里个人感觉不会执行,有大佬知道请评论区指点!!
            if (failed)
                cancelAcquire(node);
        }
    }


  private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
           

shouldParkAfterFailedAcquire(p, node)方法: 该方法会尝试让当前的节点的前驱节点的waitStatus设置为-1,

该方法接收的参数: 是

node节点

node节点的前驱节点

这里会涉及到Node节点中waitStatus的状态值,列举出来,共有4种状态,默认是0

/** waitStatus value to indicate thread has cancelled */
		//表示线程已经取消,就是线程在队列中等待过程中,取消等待(老子不等了!!!)
        static final int CANCELLED =  1;
        
         /** waitStatus value to indicate successor's thread needs unparking */
        //把当前线程置位-1, 表示需要唤醒(unpark)后继节点(线程)
        static final int SIGNAL    = -1;
        
        /** waitStatus value to indicate thread is waiting on condition */
        //当前线程因条件不满足,在条件变量(ConditionObject)的等待队列中
        static final int CONDITION = -2;
        
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
         
        static final int PROPAGATE = -3;
           
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
		//或得前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
          		//跳过取消的前驱节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           
  • 如果前驱节点的状态已经是-1 .则返回true
  • 如果前驱节点的状态为1 (>0),则表明前驱节点被取消,则跳过前驱节点,直到找到一个状态 <= 0的,然后把前驱的节点的next置位node,然后返回到外层循环
  • 如果前驱节点的状态不是以上两种.,意味着只有waitStatus为0(默认值)和-3的能到这条分支,那么就执行compareAndSetWaitStatus方法,

    把前驱节点的waitStatus置位-1(代表前驱节点有义务唤醒后继节点)

    ;
    • 如果CAS成功,然后进入parkAndCheckInterrupt方法,阻塞自己;
    • 如果失败,返回false,返回到acquireQueued中继续循环尝试
  • 如果以上3种情况都不走,返回false,返回到acquireQueued中继续循环尝试
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
 }
           

2.3 打断与不可打断实现原理

注意: park()被打断时,不会清除打断标记,sleep被打断会清除打断标记

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //打断时,清除打断标记
        return Thread.interrupted();
   }
           

因此这里使用Thread.interrupted()方法,除了判断线程是否被打断外,还会清除打断标记

好,峰回路转,来到最初的地方

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
           

执行完,acquireQueued方法后,如果返回true,代表线程被打断,则会执行selfInterrupt(),再次将自己中断!!!

所谓的不可打断就是在这里实现的!!!

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

(即如果使用lock方法加锁,在等待过程中被打断,那么他还是会去争抢锁,如果抢锁成功,然后返回打断标记(打断标记是只有抢锁成功才会一同返回!!),然后会自我中断

static void selfInterrupt() {
        Thread.currentThread().interrupt();
 }
           

那么可打断是怎样实现的呢???

当我们调用Reentrant中的lockInterruptibly()方法时,会调用AQS中的acquireInterruptibly

如果其他线程调用了当前线程的interrupt方法,则当前线程会抛出InterruptedException,然后返回

如果没有被打断,那么tryAcquire()尝试获取锁,如果获取锁失败,然后进入doAcquireInterruptibly(arg)方法

在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入 for (;;)

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
}


 public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
 }


 private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


           
JUC基石 ---AQS

2.1.1 解锁

public void unlock() {
        sync.release(1);
 }
           

会调用AQS中的release方法

1.首先tryRelease进行释放锁,由于可重入锁的缘故,只有当state-- 为0,才会返回true,那么此时进入if块

2.判断当前队列是否有结点,并且该节点的waitStatus是否为-1,如果两者都满足,则进入unparkSuccessor唤醒后继节点

public final boolean release(int arg) {
		// 尝试释放锁
		if (tryRelease(arg)) {
		// 队列头节点 unpark
		Node h = head;
		if (
			// 队列不为 null
			h != null &&
			// waitStatus == Node.SIGNAL 才需要 unpark
			h.waitStatus != 0
		) {
		// unpark AQS 中等待的线程,后继节点 进入 ㈡
			unparkSuccessor(h);
		}
		return true;
	}	
	return false;
}
           

tryRelease方法同样需要子类实现,但是

公平和非公平锁的释放锁是一样!

JUC基石 ---AQS
protected final boolean tryRelease(int releases) {
 			// state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
			// 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}
           

假设此时状态如下所示:释放锁的线程已经把state置位0,setExclusiveOwnerThread(null)为null,接下来需要唤醒阻塞队列中的线程!

JUC基石 ---AQS

首先获取头结点的waitStatus,尝试重置为0.允许失败

该方法找到队列中离 head

最近的一个 Node(没取消的)

,unpark 恢复其运行,

// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
	//参数是头结点
	private void unparkSuccessor(Node node) {
	// 如果状态为 Node.SIGNAL 尝试重置状态为 0
	// 不成功也可以
	int ws = node.waitStatus;
	if (ws < 0) {
		compareAndSetWaitStatus(node, ws, 0);
	}
	// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
	Node s = node.next;
	// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
	//找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
			s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
	}
}
	
           

unpark该线程后,线程会回到当初acquireQueued中park的地方继续去获取锁

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//得到node的前驱节点
                final Node p = node.predecessor();
				
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        
        //这里个人感觉不会执行,有大佬知道请评论区指点!!
            if (failed)
                cancelAcquire(node);
        }
    }

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //打断时,清除打断标记
        return Thread.interrupted();
   }
           

2.4 获取锁超时实现

先看一下tryLock方法

public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
 }


final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
}

           

发现: tryLock不会引起当前线程阻塞,获取不到锁就立即返回返回false了

由于tryLock调用的是nonfairTryAcquire(1),所以使用的是非公平策略!!

下面是设置了超时时间的tryLock,如果超时时间到,没有获取到锁,则返回false

public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}


public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
 }


    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
 static final long spinForTimeoutThreshold = 1000L;


private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
           

2.5 条件变量的实现

先回忆一下条件变量的使用:

static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                lock.lock();
                while (!hasCigrette) {
                    try {
                        waitCigaretteQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的烟");
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {
            try {
                lock.lock();
                while (!hasBreakfast) {
                    try {
                        waitbreakfastQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的早餐");
            } finally {
                lock.unlock();
            }
        }).start();
        sleep(1);
        sendBreakfast();
        sleep(1);
        sendCigarette();
    }

    private static void sendCigarette() {
        lock.lock();
        try {
            log.debug("送烟来了");
            hasCigrette = true;
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }
    }

    private static void sendBreakfast() {
        lock.lock();
        try {
            log.debug("送早餐来了");
            hasBreakfast = true;
            waitbreakfastQueue.signal();
        } finally {
            lock.unlock();
        }
    }

输出
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
           

一个锁对应一个AQS阻塞队列,可以对应多个条件变量,每个条件变量有自己的一个条件队列

JUC基石 ---AQS

开篇我们提到过AQS中有一个内部类ConditionObject

JUC基石 ---AQS
JUC基石 ---AQS

我们在lock.newCondition()的时候,其实是new了一个在AQS内部声明的ConditionObject对象;

需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newConditionObject函数

await流程

假设此时Thread-0持有锁,调用await,进入ConditionObject的addConditionWaiter流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
 }
 
 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
            //所有已经取消的Node从队列删除
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 创建一个关联当前线程的新 Node, 添加至队列尾部
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
}


private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
 }
           
JUC基石 ---AQS

接下来进入AQS的fullyRelease流程,释放同步器上的锁(如果是可重入的,需要都释放掉)

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
 }


 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }
 
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
 }
           
JUC基石 ---AQS

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

JUC基石 ---AQS

然后阻塞Thread-0

JUC基石 ---AQS

signal流程

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
}

什么情况transferForSignal(first)会不成功??
即该线程在等待过程中被打断或超时就会放弃对该锁的获取,那就没必要再添加到队列尾部

  private void doSignal(Node first) {
            do {
            	//已经是尾结点了
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环
            } while (!transferForSignal(first) &&
            			// 队列还有节点
                     (first = firstWaiter) != null);
        }

           
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         如果状态已经不是 Node.CONDITION, 说明被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        加入 AQS 队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
			// 上一个节点被取消
			ws > 0 ||
			// 上一个节点不能设置状态为 Node.SIGNAL
			!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
		) {
			// unpark 取消阻塞, 让线程重新同步状态
			LockSupport.unpark(node.thread);
		}
        return true;
    }
           

继续阅读