天天看点

吃透AQS的核心原理,有这一篇文章就够啦

目录

ReentrantLock 的实现原理

AQS是什么?

AQS的两种功能

AQS的内部实现

lock源码分析

cas的实现原理

unsafe

stateOffset

NonfairSync.tryAcquire

ReentrantLock.nofairTryAcquire

AQS.addWaiter

enq

图解分析

AQS.acquireQueued

shouldParkAfterFailedAcquire

parkAndCheckInterrupt

selfInterrupt

图形解释

释放锁过程

ReentrantLock.unlock

ReentrantLock.tryRelease

unparkSuccessor

为什么在释放锁的时候是从 tail 进行扫描

图形解释

ReentrantLock 的实现原理

       我们知道锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串 行执行,从而达到线程安全性的目的。在 synchronized 中,偏向锁、 轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了 synchronized 的加锁开销, 同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。 那么在ReentrantLock中,也一定会存在这样的需要去解决的问题。就是在多线程 竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

AQS是什么?

在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它 是一个同步工具也是Lock用来实现线程同步的核心组件。

AQS的两种功能

从使用层面来说,AQS的功能分为两种:独占和共享 独占锁,每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是 以独占方式实现的互斥锁 共 享 锁 , 允 许 多 个 线 程 同 时 获 取 锁 , 并 发 访 问 共 享 资 源 , 比 如 ReentrantReadWriteLock

AQS的内部实现

AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任 意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线 程争抢锁失败后会封装成Node加入到ASQ队列中去;当获取锁的线程释放锁以 后,会从队列中唤醒一个阻塞的节点(线程)。

    ···                                                                 ·

吃透AQS的核心原理,有这一篇文章就够啦
吃透AQS的核心原理,有这一篇文章就够啦

lock源码分析

吃透AQS的核心原理,有这一篇文章就够啦

ReentrantLock.lock()实质调用的是sync的lock方法,而sync的lock方法其实是一个抽象方法

吃透AQS的核心原理,有这一篇文章就够啦

而sync有两个内部类公平锁FairSync和非公平锁NonfairSync

而ReentrantLock是非公平锁的实现,所以真正调用的是非公平锁中的lock方法

吃透AQS的核心原理,有这一篇文章就够啦

在非公平锁中首先要进行CAS(乐观锁)

  • cas成功,线程加锁
  • cas失败,锁竞争

cas的实现原理

吃透AQS的核心原理,有这一篇文章就够啦

通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的 state的值和预期值expect相等,则替换为update。更新成功返回true,否则返 回false. 这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作, 以及涉及到state这个属性的意义。 state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入 锁的实现来说,表示一个同步状态。它有两个含义的表示 1. 当state=0时,表示无锁状态 2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增, 比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0 其他线程才有资格获得锁

unsafe

unsafe是什么?我们可以点开他看一下。

吃透AQS的核心原理,有这一篇文章就够啦

看到 native 关键字,我们就可以知道, unsafe并不属于Java的开发标准了,他其实是Java开的后门,底层其实是C++开发的部分,包括我们AQS中用到的CAS,还有直接操作内存,线程的挂起和恢复,内存屏障等等相关方法。

stateOffset

一个Java对象可以看做是一段内存,而对象中的字段会按照一定的顺序放在这段内存中,而stateOffset就代表的state这个字段在AQS类中在内存中的偏移量。

吃透AQS的核心原理,有这一篇文章就够啦

所以,当A,B,C三个线程同时竞争锁的时候,假设A线程首先获取到锁,这时候通过CAS,state改为1,当前锁线程为A,那么B,C两个线程怎么办呢?这时候就要走 acquire(1);

吃透AQS的核心原理,有这一篇文章就够啦

acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此 时继续acquire(1)操作 , 大家思考一下,acquire方法中的1的参数是用来做什么呢?

这个方法的主要逻辑是

  • 1. 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
  • 2. 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加 到AQS队列尾部
  • 3. acquireQueued,将Node作为参数,通过自旋去尝试获取锁

NonfairSync.tryAcquire

吃透AQS的核心原理,有这一篇文章就够啦

ReentrantLock.nofairTryAcquire

  • 1. 获取当前线程,判断当前的锁的状态
  • 2. 如果state=0表示当前是无锁状态,通过cas更新state状态的值
  • 3. 当前线程是属于重入,则增加重入次数
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();// 获取当前线程
    int c = getState(); // 获取当前state
    if (c == 0) {
        // 1. 如果状态等于0 证明上一个线程已经释放锁
        // 2. 当前线程重新走CAS进行锁竞争 得到锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 1. 证明没有释放锁
        // 2. 判断当前线程和锁线程是否为同一线线程 (锁重入)
        // 3. 如果是同一锁,则 state 进行 +1 
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 1. 没有竞争到锁
    // 2. 线程也非统一同一线程
    // 3. 返回 false
    return false;
}
           

AQS.addWaiter

当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node. 入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状 态。意味着重入锁用到了AQS的独占锁功能

  • 1. 将当前线程封装成Node
  • 2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node添加到AQS队列
  • 3. 如果为空或者cas失败,调用enq将节点添加到AQS队列
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
           
/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //  将当前线程封装成Node
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; // 将AQS当前的 tail (尾节点赋值给 pred) 相当于中介节点 
    if (pred != null) {
        // 1. 当前尾节点不为 null 证明已经存在队列
        // 2. 将当前节点的前置节点设置为 原队列的尾节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) // 将当前节点设置为尾部节点 {
            // 原尾节点的下一节点为当前节点  也就是现在的尾结点  
            pred.next = node;
            // 这样就给当前节点添加到了已有的 AQS 队列中 
            return node;
        }
    }
    // 如果为Null 证明当前队列不存在 所以调用 enq 进行队列初始化
    enq(node);
    return node;
}
           

enq

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    // 相当于 自旋 进行AQS 队列初始化
    for (;;) { 
         //假设当前 B线程进来  当前队列需要初始化
        Node t = tail;  // 当前尾结点是 null 
        if (t == null) { // Must initialize
            // 1. 初始一个 空节点 作为 头节点 
            if (compareAndSetHead(new Node()))
                // 将 2. 空节点 也设置为 尾结点
                tail = head;
                //  这样的话就是当前的空节点 既是头结点 也是 尾结点
        } else {
            // 第二次自旋进来
            // 将B线程生成的节点的 前置节点 设为 之前设置的空节点
            node.prev = t;
            if (compareAndSetTail(t, node) // 并将当前的B线程设为 尾结点) {
                // 之前生成的空节点的后置节点设为 B线程的节点
                t.next = node;
                // 这样队列就初始成功了
                return t;
            }
        }
    }
}
           

图解分析

吃透AQS的核心原理,有这一篇文章就够啦

AQS.acquireQueued

通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给 acquireQueued方法,去竞争锁

  • 1. 获取当前节点的prev节点
  • 2. 如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占 锁
  • 3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点
  • 4. 如果获得锁失败,则根据waitStatus决定是否需要挂起线程
  • 5. 最后,通过cancelAcquire取消获得锁的操作
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取当前Node的 prev 节点
            final Node p = node.predecessor();
            // 1. 如果当前Node的 prev 前置节点是头结点 证明当前 Node 有资格 进行锁竞争
            // 2. 当前Node进行锁竞争
            if (p == head && tryAcquire(arg)) {
                // 如果竞争成功 将当前 Node 升级为 头结点
                setHead(node);
                // 原头结点的 next  设置为 NULL  
                // 这样原头节点就彻底作废掉 并且帮助 GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断当前线程是否需要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

shouldParkAfterFailedAcquire

如果ThreadA的锁还没有释放的情况下,ThreadB和ThreadC来争抢锁肯定是会 失败,那么失败以后会调用shouldParkAfterFailedAcquire方法 Node 有 5 中状态,分别是:

  • CANCELLED(1),
  • SIGNAL(-1) 、
  • CONDITION(2)、
  • PROPAGATE(-3)、
  • 默认状态(0)
  1. CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取 消该Node的结点, 其结点的waitStatus为CANCELLED,即结束状态,进入该状 态后的结点将不会再变化
  2. SIGNAL: 只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程
  3. CONDITION: 和Condition有关系
  4. PROPAGATE:共享模式下,PROPAGATE状态的线程处于可运行状态
  5. 0:初始状态

这个方法的主要作用是,通过 Node 的状态来判断,ThreadB 竞争锁失败以后是 否应该被挂起

  • 1. 如果ThreadB的pred节点状态为SIGNAL,那就表示可以放心挂起当前线程
  • 2. 通过循环扫描链表把CANCELLED状态的节点移除
  • 3. 修改pred节点的状态为SIGNAL,返回false.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取当前节点的等待标识
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
         // 证明当前为等待 可以放心的挂起线程
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         //修改当前的状态  将状态从初始状态修改为 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
           

parkAndCheckInterrupt

使用LockSupport.park挂起当前线程编程WATING状态

Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是 thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回true,意味 着在acquire方法中会执行selfInterrupt()。

吃透AQS的核心原理,有这一篇文章就够啦

selfInterrupt

标识如果当前线程在acquireQueued中被中断过,则需要产生一 个中断请求,原因是线程在调用acquireQueued方法的时候是不会响应中断请求 的

吃透AQS的核心原理,有这一篇文章就够啦

图形解释

吃透AQS的核心原理,有这一篇文章就够啦

释放锁过程

ReentrantLock.unlock

在unlock中,会调用release方法来释放

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 释放锁成功
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 获取当前头结点不为NULL 并且状态不是初始状态
            // 恢复挂起的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           

ReentrantLock.tryRelease

这个方法可以认为是一个设置锁状态的操作,通过将state状态减掉传入的参数值 (参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。 在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时 候减掉1,同一个锁,在可以重入后,可能会被叠加为2、 3、 4这些值,只有unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下 才会返回true。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 当前状态 - 1
    if (Thread.currentThread() != getExclusiveOwnerThread())
        // 如果当前线程不是锁线程  抛出异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
         // 如果当前状态修改为 0 锁释放成功
         free = true;
        // 将锁线程设置为 NULL 
        setExclusiveOwnerThread(null);
    }
    // 设置状态 
    setState(c);
    return free;
}
           

unparkSuccessor

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
     // 获取当前头结点的状态标识
    int ws = node.waitStatus;
    if (ws < 0)
    // 挂起状态 
    // 修改当前头结点的状态标识
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    // 当前头结点的后续节点 也就是真正需要恢复的线程
    if (s == null || s.waitStatus > 0) {
        // 如果当前线程为销毁线程 从尾部进行寻找 寻找一个为 挂起状态的 线程 
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 恢复当前线程   
        LockSupport.unpark(s.thread);
}
           

为什么在释放锁的时候是从 tail 进行扫描

回到 enq 那个方法、。在标注为红色部分的代码来看一个新的节点是如何加入到链表中的

1. 将新的节点的prev指向tail

2. 通过cas将tail设置为新的节点,因为cas是原子操作所以能够保证线程安全性

3. t.next=node;设置原tail的next节点指向新的节点

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
             // ------ 注意 ------
            if (compareAndSetTail(t, node)) {
                // 这里并不是一个原子操作
                t.next = node;
                return t;
            }
        }
    }
}
           

在cas操作之后, t.next=node操作之前。 存在其他线程调用unlock方法从head 开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。 就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题

图形解释

吃透AQS的核心原理,有这一篇文章就够啦

继续阅读