AQS
- 1.什么是AQS??
- 2.以ReentrantLock为例,理解AQS
-
- 2.1首先来看公平与非公平是咋实现的??
-
- 2.1.1 加锁
- 2.2 可重入锁实现原理
- 2.3 打断与不可打断实现原理
-
- 2.1.1 解锁
- 2.4 获取锁超时实现
- 2.5 条件变量的实现
1.什么是AQS??
AQS: 抽象的队列同步器:
是JUC的基石,是构建锁(ReentrantLock和ReentrantReadWriteLock)和其他同步器组件(Semaphore,CountDownLatch,CyclicBarriar)的基础框架;
-
抽象: 我们要使用AQS,需要继承AQS并重写诸如
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
等抽象方法;
- 队列: AQS抽象类中维护了一个CLH的变种,即双向的先进先出的队列,用head,tail记录队首和队尾元素,元素的类型为Node,将暂时获取不到锁的线程封装为Node节点加入到队列中;
-
同步器: AQS中定义了一个int型的被volatile修饰的变量,可以通过getState,setState,compareAndSetState函数修改其值;线程同步的关键就是对state进行操作,根据state是否属于一个线程,操作state的方式分为独占式和共享式
对于ReentrantLock来说,state可以用来表示获取锁的可重入次数
对于ReentrantReadWriteLock,state的高16位表示获取读锁的次数,低16位表示获取到写锁的线程的可重入次数
对于Semaphore来说,state用来表示当前可用信号的个数
对于CountDownLatch来说,用来表示计数器当前的值
AQS的组成
(红色线代表是内部类)
AQS中还有一个ConditionObject内部类
Node节点
AQS在
独占方式
下获取和释放资源使用的方法是:
void acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
void acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
boolean release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
其中tryAcquire和tryRelease方法需要由具体的子类来实现,这里是模板方法的体现
可以发现,AQS中该方法是直接抛出异常的
在
共享方式
下获取和释放资源使用的方法为:
void acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.以ReentrantLock为例,理解AQS
我们知道ReentrantLock具有
可重入
可打断
公平和非公平
条件变量
可超时等特性
2.1首先来看公平与非公平是咋实现的??
可以看到ReentrantLock中维护了Sync,FairSync,NonFairSync3个内部类
以及他们的继承关系Sync继承AQS,FairSync,NonFairSync继承Sync
默认创建的是非公平锁
当调用Reentrant的ock方法时,实际上是调用sync的lock方法,sync进而调用fairSync或者NonFairSync的lock方法
public void lock() {
sync.lock();
}
这里可以发现公平锁与非公平锁的第一个区别:
非公平锁: 调用lock时,首先就会执行一次CAS,如果失败,才会进入acquire方法
公平锁: 不会进行CAS抢占锁,直接进入acquire方法
具体tryAcquire方法交给子类来实现
可以明显看出公平锁与非公平锁的第二个区别在于tryAcquire方法**
就在于公平锁在获取同步锁时多了一个限制条件:hasQueuedPredecessors()
hasQueuedPredecessors是判断等待队列中是否存在
有效节点
的方法
如果h == t,说明当前队列为空,就直接返回false,那么公平锁发现队列中没有人,那我就去获取锁
如果h != t, 也就是head != tail
------- > 如果h != t 并且 s ==null,说明有一个元素将要作为AQS的第一个节点,返回true
--------> 如果h != t 并且 s != null 以及 s.thread != Thread.currentThread(),则说明,队列里面已经有节点了,但是不是当前线程,则返回true
---------->如果h != t 并且 s != null 以及 s.thread == Thread.currentThread(),说明队列中的节点是当前线程,那么返回false(这代表着锁重入!!!)
所以总结一下: 只有
队列为空
或者
当前线程节点是AQS中的第一个节点
,则返回false,代表后续可以去获取锁;
否则其他情况都表示,已经有前驱节点,返回true代表后续不能获取锁!!
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
2.1.1 加锁
2.2 可重入锁实现原理
**`从上面的分析可知:
- 如果cas设置state==0 成功,即获取锁成功,则返回true,
- 如果发现state != 0,但是持有锁的线程是当前线程,则表明当前线程要重入,所以给state + 1(
)所谓的可重入的特性,就是这里实现的
-
只有当state的状态不是为0(代表已经有线程持有锁)并且持有锁的线程不是当前线程,则返回false,即获取锁失败,
然后来到addWaiter()和acquireQueued()方法`**
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先来看看addWaiter()
private Node addWaiter(Node mode) {
// 把当前线程封装为一个节点, mode传入的是EXCLUSIVE代表独占模式
Node node = new Node(Thread.currentThread(), mode);
//把tail赋值给pred
Node pred = tail;
//刚开始,taii == head == pred == null
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//把当前节点入队
enq(node);
return node;
}
enq方法
private Node enq(final Node node) {
//循环
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一次循环:
由于t == null 创建一个哨兵节点
注意: 这里没有只是new Node(),创建了一个哨兵节点!!!
,因此进入
compareAndSetHead(new Node()
,把head的引用指向哨兵节点,然后tail = head,tail也指向哨兵结点
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
示意图如下:
创建好哨兵节点后,进入第二次循环
此时才开始把node节点入队,上面只是设置哨兵节点
node.prev设置node节点的前驱节点为哨兵节点,然后改变tail的引用指向node节点
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
至此,addWaiter方法执行结束,返回node节点,进入acquireQueued()
acquireQueued()方法: acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
-
如果当前节点的前驱节点是head(代表当前队列中我是第一个要获取锁的节点) 那么再次tryAcquire()获取锁,
1.1获取锁成功,则通过setHead方法把head指向node,并把node节点中线程置位null,prev置位null,然后p.next置位null,即node节点变为了此时的哨兵节点.然后返回打断标记
1.2如果获取锁失败,进入shouldParkAfterFailedAcquire
-
如果当前节点的前驱节点不是head,那么就不会尝试获取锁了(我前面还有人等着呢!!!老实去排队!!),直接进入shouldParkAfterFailedAcquire方法
2.1shouldParkAfterFailedAcquire返回false,则继续循环
2.2 如果返回true,则执行parkAndCheckInterrupt()方法,真正阻塞自己
(1)如果线程因为interrupt被唤醒,parkAndCheckInterrupt()会返回true则interrupted置为true,继续循环去尝试获取锁
(2)如果线程是被unpark唤醒, parkAndCheckInterrupt()会返回false,然后继续循环
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//得到node的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//这里个人感觉不会执行,有大佬知道请评论区指点!!
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire(p, node)方法: 该方法会尝试让当前的节点的前驱节点的waitStatus设置为-1,
该方法接收的参数: 是
node节点
和
node节点的前驱节点
这里会涉及到Node节点中waitStatus的状态值,列举出来,共有4种状态,默认是0
/** waitStatus value to indicate thread has cancelled */
//表示线程已经取消,就是线程在队列中等待过程中,取消等待(老子不等了!!!)
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//把当前线程置位-1, 表示需要唤醒(unpark)后继节点(线程)
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//当前线程因条件不满足,在条件变量(ConditionObject)的等待队列中
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//或得前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//跳过取消的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 如果前驱节点的状态已经是-1 .则返回true
- 如果前驱节点的状态为1 (>0),则表明前驱节点被取消,则跳过前驱节点,直到找到一个状态 <= 0的,然后把前驱的节点的next置位node,然后返回到外层循环
- 如果前驱节点的状态不是以上两种.,意味着只有waitStatus为0(默认值)和-3的能到这条分支,那么就执行compareAndSetWaitStatus方法,
;把前驱节点的waitStatus置位-1(代表前驱节点有义务唤醒后继节点)
- 如果CAS成功,然后进入parkAndCheckInterrupt方法,阻塞自己;
- 如果失败,返回false,返回到acquireQueued中继续循环尝试
- 如果以上3种情况都不走,返回false,返回到acquireQueued中继续循环尝试
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}
2.3 打断与不可打断实现原理
注意: park()被打断时,不会清除打断标记,sleep被打断会清除打断标记
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//打断时,清除打断标记
return Thread.interrupted();
}
因此这里使用Thread.interrupted()方法,除了判断线程是否被打断外,还会清除打断标记
好,峰回路转,来到最初的地方
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
执行完,acquireQueued方法后,如果返回true,代表线程被打断,则会执行selfInterrupt(),再次将自己中断!!!
所谓的不可打断就是在这里实现的!!!
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
(即如果使用lock方法加锁,在等待过程中被打断,那么他还是会去争抢锁,如果抢锁成功,然后返回打断标记(打断标记是只有抢锁成功才会一同返回!!),然后会自我中断
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
那么可打断是怎样实现的呢???
当我们调用Reentrant中的lockInterruptibly()方法时,会调用AQS中的acquireInterruptibly
如果其他线程调用了当前线程的interrupt方法,则当前线程会抛出InterruptedException,然后返回
如果没有被打断,那么tryAcquire()尝试获取锁,如果获取锁失败,然后进入doAcquireInterruptibly(arg)方法
在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入 for (;;)
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.1.1 解锁
public void unlock() {
sync.release(1);
}
会调用AQS中的release方法
1.首先tryRelease进行释放锁,由于可重入锁的缘故,只有当state-- 为0,才会返回true,那么此时进入if块
2.判断当前队列是否有结点,并且该节点的waitStatus是否为-1,如果两者都满足,则进入unparkSuccessor唤醒后继节点
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程,后继节点 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
tryRelease方法同样需要子类实现,但是
公平和非公平锁的释放锁是一样!
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
假设此时状态如下所示:释放锁的线程已经把state置位0,setExclusiveOwnerThread(null)为null,接下来需要唤醒阻塞队列中的线程!
首先获取头结点的waitStatus,尝试重置为0.允许失败
该方法找到队列中离 head
最近的一个 Node(没取消的)
,unpark 恢复其运行,
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
//参数是头结点
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
//找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
unpark该线程后,线程会回到当初acquireQueued中park的地方继续去获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//得到node的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//这里个人感觉不会执行,有大佬知道请评论区指点!!
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//打断时,清除打断标记
return Thread.interrupted();
}
2.4 获取锁超时实现
先看一下tryLock方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
发现: tryLock不会引起当前线程阻塞,获取不到锁就立即返回返回false了
由于tryLock调用的是nonfairTryAcquire(1),所以使用的是非公平策略!!
下面是设置了超时时间的tryLock,如果超时时间到,没有获取到锁,则返回false
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.5 条件变量的实现
先回忆一下条件变量的使用:
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
输出
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
一个锁对应一个AQS阻塞队列,可以对应多个条件变量,每个条件变量有自己的一个条件队列
开篇我们提到过AQS中有一个内部类ConditionObject
我们在lock.newCondition()的时候,其实是new了一个在AQS内部声明的ConditionObject对象;
需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newConditionObject函数
await流程
假设此时Thread-0持有锁,调用await,进入ConditionObject的addConditionWaiter流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//所有已经取消的Node从队列删除
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
接下来进入AQS的fullyRelease流程,释放同步器上的锁(如果是可重入的,需要都释放掉)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
然后阻塞Thread-0
signal流程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
什么情况transferForSignal(first)会不成功??
即该线程在等待过程中被打断或超时就会放弃对该锁的获取,那就没必要再添加到队列尾部
private void doSignal(Node first) {
do {
//已经是尾结点了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环
} while (!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
加入 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 上一个节点被取消
ws > 0 ||
// 上一个节点不能设置状态为 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
}
return true;
}