天天看點

JUC基石 ---AQS

AQS

  • 1.什麼是AQS??
  • 2.以ReentrantLock為例,了解AQS
    • 2.1首先來看公平與非公平是咋實作的??
      • 2.1.1 加鎖
    • 2.2 可重入鎖實作原理
    • 2.3 打斷與不可打斷實作原理
      • 2.1.1 解鎖
    • 2.4 擷取鎖逾時實作
    • 2.5 條件變量的實作

1.什麼是AQS??

AQS: 抽象的隊列同步器:

是JUC的基石,是建構鎖(ReentrantLock和ReentrantReadWriteLock)和其他同步器元件(Semaphore,CountDownLatch,CyclicBarriar)的基礎架構;

  • 抽象: 我們要使用AQS,需要繼承AQS并重寫諸如

    tryAcquire

    tryRelease

    tryAcquireShared

    tryReleaseShared

    isHeldExclusively

    等抽象方法;

  • 隊列: AQS抽象類中維護了一個CLH的變種,即雙向的先進先出的隊列,用head,tail記錄隊首和隊尾元素,元素的類型為Node,将暫時擷取不到鎖的線程封裝為Node節點加入到隊列中;
  • 同步器: AQS中定義了一個int型的被volatile修飾的變量,可以通過getState,setState,compareAndSetState函數修改其值;線程同步的關鍵就是對state進行操作,根據state是否屬于一個線程,操作state的方式分為獨占式和共享式

    對于ReentrantLock來說,state可以用來表示擷取鎖的可重入次數

    對于ReentrantReadWriteLock,state的高16位表示擷取讀鎖的次數,低16位表示擷取到寫鎖的線程的可重入次數

    對于Semaphore來說,state用來表示目前可用信号的個數

    對于CountDownLatch來說,用來表示計數器目前的值

AQS的組成

(紅色線代表是内部類)

JUC基石 ---AQS
JUC基石 ---AQS

AQS中還有一個ConditionObject内部類

JUC基石 ---AQS

Node節點

JUC基石 ---AQS

AQS在

獨占方式

下擷取和釋放資源使用的方法是:

void acquire(int arg)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
      }
           

void acquireInterruptibly(int arg)

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
           

boolean release(int arg)

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

其中tryAcquire和tryRelease方法需要由具體的子類來實作,這裡是模闆方法的展現

JUC基石 ---AQS

可以發現,AQS中該方法是直接抛出異常的

共享方式

下擷取和釋放資源使用的方法為:

void acquireShared(int arg)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
           

public final void acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
           

public final boolean releaseShared(int arg)

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
           

2.以ReentrantLock為例,了解AQS

我們知道ReentrantLock具有

可重入

可打斷

公平和非公平

條件變量

可逾時等特性

2.1首先來看公平與非公平是咋實作的??

可以看到ReentrantLock中維護了Sync,FairSync,NonFairSync3個内部類

以及他們的繼承關系Sync繼承AQS,FairSync,NonFairSync繼承Sync

JUC基石 ---AQS
JUC基石 ---AQS

預設建立的是非公平鎖

JUC基石 ---AQS

當調用Reentrant的ock方法時,實際上是調用sync的lock方法,sync進而調用fairSync或者NonFairSync的lock方法

public void lock() {
      sync.lock();
}
           
JUC基石 ---AQS
JUC基石 ---AQS

這裡可以發現公平鎖與非公平鎖的第一個差別:

非公平鎖: 調用lock時,首先就會執行一次CAS,如果失敗,才會進入acquire方法

公平鎖: 不會進行CAS搶占鎖,直接進入acquire方法

JUC基石 ---AQS

具體tryAcquire方法交給子類來實作

JUC基石 ---AQS

可以明顯看出公平鎖與非公平鎖的第二個差別在于tryAcquire方法**

就在于公平鎖在擷取同步鎖時多了一個限制條件:hasQueuedPredecessors()

hasQueuedPredecessors是判斷等待隊列中是否存在

有效節點

的方法

如果h == t,說明目前隊列為空,就直接傳回false,那麼公平鎖發現隊列中沒有人,那我就去擷取鎖

如果h != t, 也就是head != tail

------- > 如果h != t 并且 s ==null,說明有一個元素将要作為AQS的第一個節點,傳回true

--------> 如果h != t 并且 s != null 以及 s.thread != Thread.currentThread(),則說明,隊列裡面已經有節點了,但是不是目前線程,則傳回true

---------->如果h != t 并且 s != null 以及 s.thread == Thread.currentThread(),說明隊列中的節點是目前線程,那麼傳回false(這代表着鎖重入!!!)

是以總結一下: 隻有

隊列為空

或者

目前線程節點是AQS中的第一個節點

,則傳回false,代表後續可以去擷取鎖;

否則其他情況都表示,已經有前驅節點,傳回true代表後續不能擷取鎖!!

// ㈠ AQS 繼承過來的方法, 友善閱讀, 放在此處
public final boolean hasQueuedPredecessors() {
	Node t = tail;
	Node h = head;
	Node s;
	// h != t 時表示隊列中有 Node
	return h != t &&
	(
	// (s = h.next) == null 表示隊列中還有沒有老二
	(s = h.next) == null ||
	// 或者隊列中老二線程不是此線程
	s.thread != Thread.currentThread()
	);
}
           

2.1.1 加鎖

2.2 可重入鎖實作原理

**`從上面的分析可知:

  1. 如果cas設定state==0 成功,即擷取鎖成功,則傳回true,
  2. 如果發現state != 0,但是持有鎖的線程是目前線程,則表明目前線程要重入,是以給state + 1(

    所謂的可重入的特性,就是這裡實作的

    )
  3. 隻有當state的狀态不是為0(代表已經有線程持有鎖)并且持有鎖的線程不是目前線程,則傳回false,即擷取鎖失敗,

    然後來到addWaiter()和acquireQueued()方法`**

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
           

先來看看addWaiter()

private Node addWaiter(Node mode) {
 		// 把目前線程封裝為一個節點, mode傳入的是EXCLUSIVE代表獨占模式
        Node node = new Node(Thread.currentThread(), mode);
        //把tail指派給pred
        Node pred = tail;
		//剛開始,taii == head == pred == null
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //把目前節點入隊
        enq(node);
        return node;
    }
           

enq方法

private Node enq(final Node node) {
  		//循環
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

第一次循環:

由于t == null 建立一個哨兵節點

注意: 這裡沒有隻是new Node(),建立了一個哨兵節點!!!

,是以進入

compareAndSetHead(new Node()

,把head的引用指向哨兵節點,然後tail = head,tail也指向哨兵結點

private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
           

示意圖如下:

JUC基石 ---AQS

建立好哨兵節點後,進入第二次循環

此時才開始把node節點入隊,上面隻是設定哨兵節點

node.prev設定node節點的前驅節點為哨兵節點,然後改變tail的引用指向node節點

private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
           
JUC基石 ---AQS

至此,addWaiter方法執行結束,傳回node節點,進入acquireQueued()

acquireQueued()方法: acquireQueued 會在一個死循環中不斷嘗試獲得鎖,失敗後進入 park 阻塞

  1. 如果目前節點的前驅節點是head(代表目前隊列中我是第一個要擷取鎖的節點) 那麼再次tryAcquire()擷取鎖,

    1.1擷取鎖成功,則通過setHead方法把head指向node,并把node節點中線程置位null,prev置位null,然後p.next置位null,即node節點變為了此時的哨兵節點.然後傳回打斷标記

    1.2如果擷取鎖失敗,進入shouldParkAfterFailedAcquire

  2. 如果目前節點的前驅節點不是head,那麼就不會嘗試擷取鎖了(我前面還有人等着呢!!!老實去排隊!!),直接進入shouldParkAfterFailedAcquire方法

    2.1shouldParkAfterFailedAcquire傳回false,則繼續循環

    2.2 如果傳回true,則執行parkAndCheckInterrupt()方法,真正阻塞自己

    (1)如果線程因為interrupt被喚醒,parkAndCheckInterrupt()會傳回true則interrupted置為true,繼續循環去嘗試擷取鎖

    (2)如果線程是被unpark喚醒, parkAndCheckInterrupt()會傳回false,然後繼續循環

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//得到node的前驅節點
                final Node p = node.predecessor();
				
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 還是需要獲得鎖後, 才能傳回打斷狀态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        
        //這裡個人感覺不會執行,有大佬知道請評論區指點!!
            if (failed)
                cancelAcquire(node);
        }
    }


  private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
           

shouldParkAfterFailedAcquire(p, node)方法: 該方法會嘗試讓目前的節點的前驅節點的waitStatus設定為-1,

該方法接收的參數: 是

node節點

node節點的前驅節點

這裡會涉及到Node節點中waitStatus的狀态值,列舉出來,共有4種狀态,預設是0

/** waitStatus value to indicate thread has cancelled */
		//表示線程已經取消,就是線程在隊列中等待過程中,取消等待(老子不等了!!!)
        static final int CANCELLED =  1;
        
         /** waitStatus value to indicate successor's thread needs unparking */
        //把目前線程置位-1, 表示需要喚醒(unpark)後繼節點(線程)
        static final int SIGNAL    = -1;
        
        /** waitStatus value to indicate thread is waiting on condition */
        //目前線程因條件不滿足,在條件變量(ConditionObject)的等待隊列中
        static final int CONDITION = -2;
        
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
         
        static final int PROPAGATE = -3;
           
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
		//或得前驅節點的狀态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
          		//跳過取消的前驅節點
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           
  • 如果前驅節點的狀态已經是-1 .則傳回true
  • 如果前驅節點的狀态為1 (>0),則表明前驅節點被取消,則跳過前驅節點,直到找到一個狀态 <= 0的,然後把前驅的節點的next置位node,然後傳回到外層循環
  • 如果前驅節點的狀态不是以上兩種.,意味着隻有waitStatus為0(預設值)和-3的能到這條分支,那麼就執行compareAndSetWaitStatus方法,

    把前驅節點的waitStatus置位-1(代表前驅節點有義務喚醒後繼節點)

    ;
    • 如果CAS成功,然後進入parkAndCheckInterrupt方法,阻塞自己;
    • 如果失敗,傳回false,傳回到acquireQueued中繼續循環嘗試
  • 如果以上3種情況都不走,傳回false,傳回到acquireQueued中繼續循環嘗試
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
 }
           

2.3 打斷與不可打斷實作原理

注意: park()被打斷時,不會清除打斷标記,sleep被打斷會清除打斷标記

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //打斷時,清除打斷标記
        return Thread.interrupted();
   }
           

是以這裡使用Thread.interrupted()方法,除了判斷線程是否被打斷外,還會清除打斷标記

好,峰回路轉,來到最初的地方

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }
           

執行完,acquireQueued方法後,如果傳回true,代表線程被打斷,則會執行selfInterrupt(),再次将自己中斷!!!

所謂的不可打斷就是在這裡實作的!!!

在此模式下,即使它被打斷,仍會駐留在 AQS 隊列中,一直要等到獲得鎖後方能得知自己被打斷了

(即如果使用lock方法加鎖,在等待過程中被打斷,那麼他還是會去争搶鎖,如果搶鎖成功,然後傳回打斷标記(打斷标記是隻有搶鎖成功才會一同傳回!!),然後會自我中斷

static void selfInterrupt() {
        Thread.currentThread().interrupt();
 }
           

那麼可打斷是怎樣實作的呢???

當我們調用Reentrant中的lockInterruptibly()方法時,會調用AQS中的acquireInterruptibly

如果其他線程調用了目前線程的interrupt方法,則目前線程會抛出InterruptedException,然後傳回

如果沒有被打斷,那麼tryAcquire()嘗試擷取鎖,如果擷取鎖失敗,然後進入doAcquireInterruptibly(arg)方法

在 park 過程中如果被 interrupt 會進入此, 這時候抛出異常, 而不會再次進入 for (;;)

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
}


 public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
 }


 private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


           
JUC基石 ---AQS

2.1.1 解鎖

public void unlock() {
        sync.release(1);
 }
           

會調用AQS中的release方法

1.首先tryRelease進行釋放鎖,由于可重入鎖的緣故,隻有當state-- 為0,才會傳回true,那麼此時進入if塊

2.判斷目前隊列是否有結點,并且該節點的waitStatus是否為-1,如果兩者都滿足,則進入unparkSuccessor喚醒後繼節點

public final boolean release(int arg) {
		// 嘗試釋放鎖
		if (tryRelease(arg)) {
		// 隊列頭節點 unpark
		Node h = head;
		if (
			// 隊列不為 null
			h != null &&
			// waitStatus == Node.SIGNAL 才需要 unpark
			h.waitStatus != 0
		) {
		// unpark AQS 中等待的線程,後繼節點 進入 ㈡
			unparkSuccessor(h);
		}
		return true;
	}	
	return false;
}
           

tryRelease方法同樣需要子類實作,但是

公平和非公平鎖的釋放鎖是一樣!

JUC基石 ---AQS
protected final boolean tryRelease(int releases) {
 			// state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
			// 支援鎖重入, 隻有 state 減為 0, 才釋放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}
           

假設此時狀态如下所示:釋放鎖的線程已經把state置位0,setExclusiveOwnerThread(null)為null,接下來需要喚醒阻塞隊列中的線程!

JUC基石 ---AQS

首先擷取頭結點的waitStatus,嘗試重置為0.允許失敗

該方法找到隊列中離 head

最近的一個 Node(沒取消的)

,unpark 恢複其運作,

// ㈡ AQS 繼承過來的方法, 友善閱讀, 放在此處
	//參數是頭結點
	private void unparkSuccessor(Node node) {
	// 如果狀态為 Node.SIGNAL 嘗試重置狀态為 0
	// 不成功也可以
	int ws = node.waitStatus;
	if (ws < 0) {
		compareAndSetWaitStatus(node, ws, 0);
	}
	// 找到需要 unpark 的節點, 但本節點從 AQS 隊列中脫離, 是由喚醒節點完成的
	Node s = node.next;
	// 不考慮已取消的節點, 從 AQS 隊列從後至前找到隊列最前面需要 unpark 的節點
	//找到隊列中離 head 最近的一個 Node(沒取消的),unpark 恢複其運作,
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
			s = t;
	}
	if (s != null)
		LockSupport.unpark(s.thread);
	}
}
	
           

unpark該線程後,線程會回到當初acquireQueued中park的地方繼續去擷取鎖

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//得到node的前驅節點
                final Node p = node.predecessor();
				
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 還是需要獲得鎖後, 才能傳回打斷狀态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        
        //這裡個人感覺不會執行,有大佬知道請評論區指點!!
            if (failed)
                cancelAcquire(node);
        }
    }

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //打斷時,清除打斷标記
        return Thread.interrupted();
   }
           

2.4 擷取鎖逾時實作

先看一下tryLock方法

public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
 }


final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
}

           

發現: tryLock不會引起目前線程阻塞,擷取不到鎖就立即傳回傳回false了

由于tryLock調用的是nonfairTryAcquire(1),是以使用的是非公平政策!!

下面是設定了逾時時間的tryLock,如果逾時時間到,沒有擷取到鎖,則傳回false

public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}


public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
 }


    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
 static final long spinForTimeoutThreshold = 1000L;


private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
           

2.5 條件變量的實作

先回憶一下條件變量的使用:

static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                lock.lock();
                while (!hasCigrette) {
                    try {
                        waitCigaretteQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的煙");
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {
            try {
                lock.lock();
                while (!hasBreakfast) {
                    try {
                        waitbreakfastQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的早餐");
            } finally {
                lock.unlock();
            }
        }).start();
        sleep(1);
        sendBreakfast();
        sleep(1);
        sendCigarette();
    }

    private static void sendCigarette() {
        lock.lock();
        try {
            log.debug("送煙來了");
            hasCigrette = true;
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }
    }

    private static void sendBreakfast() {
        lock.lock();
        try {
            log.debug("送早餐來了");
            hasBreakfast = true;
            waitbreakfastQueue.signal();
        } finally {
            lock.unlock();
        }
    }

輸出
18:52:27.680 [main] c.TestCondition - 送早餐來了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送煙來了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的煙
           

一個鎖對應一個AQS阻塞隊列,可以對應多個條件變量,每個條件變量有自己的一個條件隊列

JUC基石 ---AQS

開篇我們提到過AQS中有一個内部類ConditionObject

JUC基石 ---AQS
JUC基石 ---AQS

我們在lock.newCondition()的時候,其實是new了一個在AQS内部聲明的ConditionObject對象;

需要注意的是,AQS隻提供了ConditionObject的實作,并沒有提供newCondition函數,該函數用來new一個ConditionObject對象,需要由AQS的子類來提供newConditionObject函數

await流程

假設此時Thread-0持有鎖,調用await,進入ConditionObject的addConditionWaiter流程,建立新的 Node 狀态為 -2(Node.CONDITION),關聯 Thread-0,加入等待隊列尾部

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
 }
 
 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
            //所有已經取消的Node從隊列删除
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 建立一個關聯目前線程的新 Node, 添加至隊列尾部
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
}


private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
 }
           
JUC基石 ---AQS

接下來進入AQS的fullyRelease流程,釋放同步器上的鎖(如果是可重入的,需要都釋放掉)

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
 }


 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }
 
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
 }
           
JUC基石 ---AQS

unpark AQS 隊列中的下一個節點,競争鎖,假設沒有其他競争線程,那麼 Thread-1 競争成功

JUC基石 ---AQS

然後阻塞Thread-0

JUC基石 ---AQS

signal流程

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
}

什麼情況transferForSignal(first)會不成功??
即該線程在等待過程中被打斷或逾時就會放棄對該鎖的擷取,那就沒必要再添加到隊列尾部

  private void doSignal(Node first) {
            do {
            	//已經是尾結點了
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                // 将等待隊列中的 Node 轉移至 AQS 隊列, 不成功且還有節點則繼續循環
            } while (!transferForSignal(first) &&
            			// 隊列還有節點
                     (first = firstWaiter) != null);
        }

           
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         如果狀态已經不是 Node.CONDITION, 說明被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        加入 AQS 隊列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
			// 上一個節點被取消
			ws > 0 ||
			// 上一個節點不能設定狀态為 Node.SIGNAL
			!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
		) {
			// unpark 取消阻塞, 讓線程重新同步狀态
			LockSupport.unpark(node.thread);
		}
        return true;
    }
           

繼續閱讀