天天看点

Java并发编程之AbstractQueuedSychronizer(抽象队列同步器,简称AQS)

AbstractQueuedSychronizer(抽象队列同步器,简称AQS):

  1. JDK的并发包(包名:java.util.concurrent,以下简称JUC)下面提供了很多并发操作的工具类,如:ReentrantLock,CountDownLatch等。这些并发操作工具类的基础是AbstractQueuedSychronizer
  2. *AQS内部维护了一个共享资源和两个队列:*一个是同步队列;一个是条件队列。
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        //同步队列的头结点
        private transient volatile Node head;
    
        //同步队列的尾结点
        private transient volatile Node tail;
    
        //同步队列的共享资源,队列的同步状态
        private volatile int state;
    }
               
  3. Node类的主要信息;
    static final class Node {
        //静态变量,标识节点以共享模式等待资源
        static final Node SHARED = new Node();
        //标识节点以独占模式等待资源
        static final Node EXCLUSIVE = null;
        //等待状态,节点取消等待资源
        static final int CANCELLED =  1;
        //等待状态,标识后继节点需要唤醒
        static final int SIGNAL    = -1;
        //等待状态,标识线程处于条件等待状态
        static final int CONDITION = -2;
        //等待状态,标识线程以共享模式获取资源,释放锁的行为将传播到后续节点,该状态作用于头节点
        static final int PROPAGATE = -3;
        //节点等待状态,是上述4中状态之一,或者为0
        volatile int waitStatus;
    
        //节点的前驱节点
        volatile Node prev;
    
        //节点的后继节点
        volatile Node next;
    
        //节点对应的线程
        volatile Thread thread;
    
        //条件等待时标识下一个等待条件的节点,指向条件队列中的下一个节点
        //或者,标识共享模式
        Node nextWaiter;
    }
               
  4. 同步队列是用双向链表实现的,主要用于记录等待获取共享资源的线程
  5. 条件队列是一个单向链表的结构,链表中的元素也是Node,只不过条件队列中的元素使用Node的nextWaiter指向下一个元素。
  6. AQS对外提供的protected类型的方法入手分析一下AQS的工作原理:
    /**
        *尝试以独占模式获取共享资源
        *@param arg 表示需要获取资源的个数
        *@return true表示获取成功,false获取失败
        **/
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *尝试以独占模式释放共享资源
        *@param arg 表示释放资源的个数
        *@return true表示释放成功,false释放失败
        **/
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *尝试以共享模式获取共享资源
        *@param arg 表示获取资源的个数
        *@return true表示获取成功,false获取失败
        **/
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *尝试以共享模式释放共享资源
        *@param arg 表示释放资源的个数
        *@return true表示释放成功,false释放失败
        **/
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        * 当前线程是否以独占模式获取了共享资源,该方法是在条件对象ConditionObject内部使用的,如果不需要条件等待,则无需实现该方法
        *@return true表示是,false表示否
        **/
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
               
    AQS以模板方法的模式,提供了多个线程对共享资源(state)操作的算法框架,上面的五个protected类型的方法主要用于尝试获取和释放共享资源,并不会阻塞当前线程,是AQS留给具体的业务操作类(如:ReentrantLock)来实现的。
  7. AQS获取资源,释放资源等方法的具体代码:
    /**
        *以独占模式获取共享资源,获取成功则返回,否则阻塞当前线程,并将当前线程放入同步队列等待获取资源
        *@param arg 表示需要获取资源的个数
        **/
    public final void acquire(int arg) {
        //首先调用子类实现的tryAcquire方法,如果该方法返回true则表示获取成功,不进行后续判断
        //否则,调用acquireQueued方法将当前线程放入同步队列排队等待获取资源
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
        *将当前线程包装成Node,并放入同步队列
        *@param mode 模式,共享模式或者独占模式
        *@return 当前线程所在的节点
        **/
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试将节点放入队列的尾部,如果成功则返回,否则将节点入队
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //将当前节点放入同步队列,cas操作设置头和尾节点
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /**
        *在队列中不断尝试获取资源
        *@param node 当前线程所在节点
        *@param arg 获取资源的个数
        *@return 等待资源过程中线程是否被中断
        **/
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前节点的前驱节点
                final Node p = node.predecessor();
                //只有前驱是头节点的情况下才尝试获取锁,因为头结点是当前持有资源的线程所在的节点,如果前驱不是头节点那么没有必要尝试获取
                if (p == head && tryAcquire(arg)) {
                    //获取成功后将当前节点设置为头结点
                    setHead(node);
                    //释放原来的头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果节点前驱不是头结点或者获取资源失败则阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
        *以独占模式释放共享资源
        *@param arg 表示释放资源的个数
        *@return true表示释放成功,false释放失败
        **/
    public final boolean release(int arg) {
        //尝试释放资源,如果失败则直接返回
        if (tryRelease(arg)) {
            //释放成功后,唤醒后继节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
               
    AQS中独占模式获取和释放资源的方法,这两个方法可以用于实现锁的功能,事实上ReentrantLock就是基于以上方法实现的。以共享模式获取和释放资源的方法,与独占模式类似
  8. 条件对象的等待和唤醒方法:
    public class ConditionObject implements Condition, java.io.Serializable {    
    	//第一个条件等待的节点
        private transient Node firstWaiter;
        //最后一个条件等待的节点
        private transient Node lastWaiter;
    
        /**
          * 唤醒条件队列中的第一个等待的线程,此时该线程将进入同步队列重新等待获取资源
          */
        public final void signal() {
            //判断当前线程是否以独占模式占有资源
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                //将条件队列中的第一个线程,重新放入同步队列
                doSignal(first);
        }
    
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    
        final boolean transferForSignal(Node node) {
    
            //如果CAS操作失败,说明线程取消获取共享资源,此时返回false,doSignal会尝试将下一个节点放入同步队列
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            //将节点放入同步队列
            Node p = enq(node);
            int ws = p.waitStatus;
            //设置节点的前驱节点的状态为Node.SIGNAL
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
        //使获取共享资源的线程等待并进入条件队列,如果当前线程被中断则退出
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //在条件队列中添加一个节点
            Node node = addConditionWaiter();
            //释放当前线程获取的共享资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断当前节点是否在同步队列中,如果不在则暂停当前线程
            while (!isOnSyncQueue(node)) {
                //暂停当前线程,该方法响应中断;当调用signal()方法的线程释放共享资源时,会从该处继续执行
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //重新等待获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
      }
               
  9. 总结:当线程获取共享资源成功时返回,否则进入同步队列等待前驱节点唤醒,此时当前线程处于阻塞状态(LockSupport.park方法使线程阻塞);前驱节点释放共享资源后会唤醒(LockSupport.unpark方法唤醒线程)后继节点,需要说明的是获取共享资源成功的线程必定是头节点所在的线程。当获取共享资源的线程,调用Condition.await()方法时,当前线程会进入条件队列等待;当其他线程调用Condition.signal()方法,并释放共享资源时当前线程会重新进入同步队列等待获取共享资源。

继续阅读