天天看點

【Java源碼】AQS 解析一 背景二 基本資訊三 AQS 詳解四 基于 AQS 的實作

系列文章:

【Java 試題】從一道題目再看 Java 繼承

一 背景

AQS 即 AbstractQueuedSynchronizer,是 java.util.concurrent.locks 包的一個重要概念。Java 中鎖實作/同步的幾種方式:synchronized,ReentrantLock,CAS。其中,可重入鎖 ReentrantLock 就是基于 AbstractQueuedSynchronizer(AQS)的。是以,了解 AQS 的實作原理,對 Java 鎖了解非常重要。本篇将結合 JDK1.8 源碼,對 AQS 進行分析。

二 基本資訊

2.1 官方注解

/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues.  This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released.  Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.           

複制

複制代碼

核心部分翻譯過來,AQS 依賴于先進先出(FIFO)阻塞隊列,提供了一個實作阻塞鎖和同步器(semaphores-信号量,events-事件等)的架構。AbstractQueuedSynchronizer 這個類的設計是作為大多數類型依賴一個單原子值來表示狀态的同步器的一個有用的基礎。

2.2 AQS 核心思想

AQS 的核心思想是,如果被請求的共享資源空閑,則将目前請求資源的線程設定為有效的工作線程,并将共享資源設定為鎖定狀态,如果被請求的共享資源被占用,那麼就需要一套線程阻塞等待以及被喚醒時鎖配置設定的機制,這個機制 AQS 是用 CLH 隊列鎖實作的,即将暫時擷取不到鎖的線程加入到隊列中。

CLH(Craig,Landin,and Hagersten)隊列是一個虛拟的雙向隊列,虛拟的雙向隊列即不存在隊列執行個體,僅存在節點之間的關聯關系。AQS 是将每一條請求共享資源的線程封裝成一個 CLH 鎖隊列的一個結點(Node),來實作鎖的配置設定。

CLH 隊列:

【Java源碼】AQS 解析一 背景二 基本資訊三 AQS 詳解四 基于 AQS 的實作

簡單來說,AQS 是基于 CLH 隊列,用 volatile 修飾共享變量 state,線程通過 CAS 去改變狀态符,成功則擷取鎖成功,失敗則進入等待隊列,等待被喚醒。

有一點需要注意:AQS 是自旋鎖。也就是說,在等待喚醒的時候,經常會使用自旋(while(!cas()))的方式,不停地嘗試擷取鎖,直到被其他線程擷取成功。

2.3 AQS 繼承關系

先看一下類定義:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable            

複制

複制代碼

可見 AbstractQueuedSynchronizer 繼承了 AbstractOwnableSynchronizer,并且實作了序列化接口。

AbstractOwnableSynchronizer 是什麼?通過源碼可見一斑:

/**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }           

複制

代碼中可知,關鍵屬性隻有一個:Thread exclusiveOwnerThread,表示獨占模式同步器的目前所有者(線程)。可以由某個線程獨占,AbstractOwnableSynchronizer 為建立鎖和相關同步器(伴随着所有權的概念)提供了基礎。

三 AQS 詳解

3.1 隊列節點

AQS 所使用的 CLH 隊列結構示意:

【Java源碼】AQS 解析一 背景二 基本資訊三 AQS 詳解四 基于 AQS 的實作

3.1.1 隊列節點定義

隊列中節點的結構,在 AQS 的 Node 中定義,這是一個 static final 類型。除了雙向連結清單的相關定義:前驅節點、後繼節點、節點值(線程 Thread):

volatile Node prev;
volatile Node next;
volatile Thread thread;           

複制

複制代碼

之外,還有很多附加資訊:

3.1.1.1 節點等待模式

标記節點等待模式的标記,類型也是 Node,SHARED 值不為 null 時,表示是共享模式;

EXCLUSIVE 不為 null 時,表示是獨占模式

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;           

複制

3.1.1.2 等待狀态值

waitStatus,用于表示線程已取消:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

volatile int waitStatus;           

複制

3.1.1.3 nextWaiter

這個屬性比較特殊,因為前面已經有指向後繼節點的 next,又增加了一個指向下一個條件等待節點,或特殊 SHARED 值的指針。從下面的官方注釋中來看,因為條件隊列隻有在獨占模式下保持時才被通路,是以我們隻需要一個簡單的連結隊列來保持等待條件的節點。

/**
 * Link to next node waiting on condition, or the special
 * value SHARED.  Because condition queues are accessed only
 * when holding in exclusive mode, we just need a simple
 * linked queue to hold nodes while they are waiting on
 * conditions. They are then transferred to the queue to
 * re-acquire. And because conditions can only be exclusive,
 * we save a field by using special value to indicate shared
 * mode.
 */
Node nextWaiter;           

複制

AQS 的阻塞隊列是以雙向的連結清單的形式儲存的,是通過 prev 和 next 建立起關系的,但是 AQS 中的條件隊列是以單向連結清單的形式儲存的,是通過 nextWaiter 建立起關系的,也就是 AQS 的阻塞隊列和 AQS 中的條件隊列并非同一個隊列。

3.1.2 Node 的構造方法

除了預設的無參構造方法外,還有兩個有參數的構造方法:

Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
}           

複制

3.2 AQS 的隊列結構

3.2.1 隊列核心屬性

隊列的頭尾節點、核心資源(state):

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;           

複制

可見隊列中為了處理同步,這幾個都是 volatile 修飾的。

3.2.2 核心方法

AQS 提供了 get/set 方法來讀和設定 state 字段;另外還有 compareAndSetState 來解決争用設定狀态的方法,使用了 CAS 機制(unsafe 的 compareAndSwapInt()方法):

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}           

複制

3.3 隊列操作

有了隊列,下一步就是如何操作隊列了。因為大部分都是線上程争用場景下進行,是以,如何保證對隊列的操作是正确同步的,這點至關重要。

3.3.1 隊尾添加節點

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}           

複制

可見操作是通過 compareAndSetTail()方法來操作的,依然是 CAS 保障同步。但在 return 之前,還有 enq(node)方法調用,我們再看這段代碼:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}           

複制

操作是在一個死循環中,根據 tail 節點是否為空分為兩塊大的邏輯:tail == null,cas 設定 head;否則,節點的前驅指向 tail 節點,然後 cas 設定 tail。

3.3.2 操作隊列頭結點

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}           

複制

這裡隻是一個簡單的設定方法,node 的線程值和前驅設定為 null。

四 基于 AQS 的實作

包括:Reentrant、Semaphore,CountDownLatch、ReentrantReadWriteLock、SynchronousQueue 和 FutureTask。

4.1 關于資源擷取與釋放

擷取是一種依賴狀态的操作,并且通常會阻塞,直到達到了理想狀态為止。以前面提到的幾種 AQS 的實作為例:

  • ReentrantLock,“擷取”操作意味着“等待直到鎖可被擷取”。
  • Semaphore,“擷取”操作意味着“等待直到許可可被擷取”。
  • CountDownLatch,“擷取”操作意味着“等待直到閉鎖達到結束狀态”。
  • FutureTask,“擷取”操作意味着“等待直到任務完成”。

釋放并不是一個可阻塞的操作,當執行“釋放”操作時,所有在請求時被阻塞的線程都會開始執行。

4.2 狀态機

AQS 使用 state(int)以表示狀态,提供了 getState、setState 及 compareAndSetState 等 protected 類型方法進行狀态轉換。通過巧妙地使用 state,表示各種關鍵狀态:

  • ReentrantLock 用 state 表示所有者線程已經重複擷取該鎖的次數。
  • Semaphore 用 state 表示剩餘的許可數量。
  • CountDownLatch 用 state 表示閉鎖的狀态,如關閉、打開。
  • FutureTask 用 state 表示任務的狀态,如尚未開始、正在運作、已完成、已取消。

除了 state,在同步器類中還可以自行管理一些額外的狀态變量。如:

  • ReentrantLock 儲存了鎖的目前所有者的資訊,這樣就能區分某個擷取操作是重入的還是競争的。
  • FutureTask 用 result 表示任務的結果,該結果可能是計算得到的答案,也可能是抛出的異常。