天天看点

JDK源码解析之AbstractQueuedSynchronizer(上)1 整体感知2 用法3 使用案例

AbstractQueuedSynchronizer 抽象同步队列简称 AQS ,它是实现同步器的基础组件,

并发包中锁的底层就是使用 AQS 实现的.

大多数开发者可能永远不会直接使用AQS ,但是知道其原理对于架构设计还是很有帮助的,而且要理解ReentrantLock、CountDownLatch等高级锁我们必须搞懂 AQS.

1 整体感知

1.1 架构图

JDK源码解析之AbstractQueuedSynchronizer(上)1 整体感知2 用法3 使用案例

AQS框架大致分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据.

当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

AQS 本身就是一套锁的框架,它定义了获得锁和释放锁的代码结构,所以如果要新建锁,只要继承 AQS,并实现相应方法即可。

1.2 类设计

该类提供了一种框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。此类的设计旨在为大多数依赖单个原子int值表示 state 的同步器提供切实有用的基础。子类必须定义更改此 state 的 protected 方法,并定义该 state 对于 acquired 或 released 此对象而言意味着什么。鉴于这些,此类中的其他方法将执行全局的排队和阻塞机制。子类可以维护其他状态字段,但是就同步而言,仅跟踪使用方法 getState,setState 和 compareAndSetState 操作的原子更新的int值。

子类应定义为用于实现其所在类的同步属性的非公共内部帮助器类。

子类应定义为用于实现其所在类的同步属性的非 public 内部辅助类。类AbstractQueuedSynchronizer不实现任何同步接口。 相反,它定义了诸如acquireInterruptible之类的方法,可以通过具体的锁和相关的同步器适当地调用这些方法来实现其 public 方法。

此类支持默认的排他模式和共享模式:

当以独占方式进行获取时,其他线程尝试进行的获取将无法成功

由多个线程获取的共享模式可能(但不一定)成功

该类不理解这些差异,只是从机制的意义上说,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。在不同模式下等待的线程们共享相同的FIFO队列。 通常,实现的子类仅支持这些模式之一,但也可以同时出现,比如在ReadWriteLock.仅支持排他模式或共享模式的子类无需定义支持未使用模式的方法.

此类定义了一个内嵌的 ConditionObject 类,可由支持独占模式的子类用作Condition 的实现,该子类的 isHeldExclusively 方法报告相对于当前线程是否独占同步,使用当前 getState 值调用的方法 release 会完全释放此对象 ,并获得给定的此保存状态值,最终将该对象恢复为其先前的获取状态。否则,没有AbstractQueuedSynchronizer方***创建这样的条件,因此,如果无法满足此约束,请不要使用它。ConditionObject的行为当然取决于其同步器实现的语义。

此类提供了内部队列的检查,检测和监视方法,以及条件对象的类似方法。 可以根据需要使用 AQS 将它们导出到类中以实现其同步机制。

此类的序列化仅存储基础原子整数维护状态,因此反序列化的对象具有空线程队列。 需要序列化性的典型子类将定义一个readObject方法,该方法在反序列化时将其恢复为已知的初始状态。

2 用法

要将此类用作同步器的基础,使用getState setState和/或compareAndSetState检查和/或修改同步状态,以重新定义以下方法(如适用)

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

isHeldExclusively

默认情况下,这些方法中的每一个都会抛 UnsupportedOperationException。

这些方法的实现必须在内部是线程安全的,并且通常应简短且不阻塞。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为final,因为它们不能独立变化。

从 AQS 继承的方法对跟踪拥有排他同步器的线程很有用。 鼓励使用它们-这将启用监视和诊断工具,以帮助用户确定哪些线程持有锁。

虽然此类基于内部的FIFO队列,它也不会自动执行FIFO获取策略。 独占同步的核心采用以下形式:

  • Acquire
while (!tryAcquire(arg)) {
     如果线程尚未入队,则将其加入队列;
     可能阻塞当前线程;
}      
  • Release
if (tryRelease(arg))
    取消阻塞第一个入队的线程;      

共享模式与此相似,但可能涉及级联的signal。

acquire 中的检查是在入队前被调用,所以新获取的线程可能会在被阻塞和排队的其他线程之前插入。但若需要,可以定义tryAcquire、tryAcquireShared以通过内部调用一或多种检查方法来禁用插入,从而提供公平的FIFO获取顺序。

特别是,若 hasQueuedPredecessors()(公平同步器专门设计的一种方法)返回true,则大多数公平同步器都可以定义tryAcquire返回false.

公平与否取决于如下一行代码:

if (c == 0) {
    if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}      

hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    // s代表等待队列的第一个节点
    Node s;
    // (s = h.next) == null 说明此时有另一个线程正在尝试成为头节点,详见AQS的acquireQueued方法
    // s.thread != Thread.currentThread():此线程不是等待的头节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}      

对于默认的插入(也称为贪婪,放弃和convoey-avoidance)策略,吞吐量和可伸缩性通常最高。 尽管不能保证这是公平的或避免饥饿,但允许较早排队的线程在较晚排队的线程之前进行重新竞争,并且每个重新争用都有一次机会可以毫无偏向地成功竞争过进入的线程。

同样,尽管获取通常无需自旋,但在阻塞前,它们可能会执行tryAcquire的多次调用,并插入其他任务。 如果仅短暂地保持排他同步,则这将带来自旋的大部分好处,而如果不进行排他同步,则不会带来很多负担。 如果需要的话,可以通过在调用之前使用“fast-path”检查来获取方法来增强此功能,并可能预先检查hasContended()和/或hasQueuedThreads(),以便仅在同步器可能不存在争用的情况下这样做。

此类为同步提供了有效且可扩展的基础,部分是通过将其使用范围规范化到可以依赖于int状态,acquire 和 release 参数以及内部的FIFO等待队列的同步器。 当这还不够时,可以使用原子类、自定义队列类和锁支持阻塞支持从较低级别构建同步器。

3 使用案例

这里是一个不可重入的排他锁,它使用值0表示解锁状态,使用值1表示锁定状态。虽然不可重入锁并不严格要求记录当前所有者线程,但是这个类这样做是为了更容易监视使用情况。它还支持条件,并暴露其中一个检测方法:

class Mutex implements Lock, java.io.Serializable {

  // 我们内部的辅助类
  private static class Sync extends AbstractQueuedSynchronizer {
    // 报告是否处于锁定状态
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }

    // 如果 state 是 0,获取锁
    public boolean tryAcquire(int acquires) {
      assert acquires == 1; // Otherwise unused
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    // 通过将 state 置 0 来释放锁
    protected boolean tryRelease(int releases) {
      assert releases == 1; // Otherwise unused
      if (getState() == 0) throw new IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    //  提供一个 Condition
    Condition newCondition() { return new ConditionObject(); }

    // 反序列化属性
    private void readObject(ObjectInputStream s)
        throws IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); // 重置到解锁状态
    }
  }

  // 同步对象完成所有的工作。我们只是期待它.
  private final Sync sync = new Sync();

  public void lock()                { sync.acquire(1); }
  public boolean tryLock()          { return sync.tryAcquire(1); }
  public void unlock()              { sync.release(1); }
  public Condition newCondition()   { return sync.newCondition(); }
  public boolean isLocked()         { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
}      

这是一个闩锁类,它类似于CountDownLatch,只是它只需要一个单信号就可以触发。因为锁存器是非独占的,所以它使用共享的获取和释放方法。

class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }