天天看點

AQS 源碼詳解

AQS 源碼詳解

1、可重入鎖

可重入鎖又名遞歸鎖,是指在同一個線程在外層方法擷取鎖的時候,再進入該線程的的内層方法會自動擷取鎖(前提是鎖對象得是同一個對象),不會因為之前已經擷取過還沒釋放而阻塞。

Java中ReentrantLock和synchronized都是可重入鎖,可重入鎖的一個優點是可一定程度避免死鎖。

可重入鎖的種類:

  • 隐式鎖:在使用的時候不需要去手動釋放鎖(即synchronized關鍵字使用的鎖)預設是可重入鎖。
  • 顯式鎖:使用的時候需要去釋放鎖(即Lock)也有ReentrantLock這樣的可重入鎖。

2、LockSupport

文檔位址

LockSupport是一個程式設計工具類,主要是為了阻塞和喚醒線程用的,它的内部其實兩類主要的方法:park(停車阻塞線程)和unpark(啟動喚醒線程)。

3種讓線程等待和喚醒的方法:

  • 方式1:使用Object中的wait()方法讓線程等待,使用object中的notify()方法喚醒線程
  • 方式2:使用JUC包中Condition的await()方法讓線程等待,使用signal()方法喚醒線程
  • 方式3:LockSupport類可以阻塞目前線程以及喚醒指定被阻塞的線程

park()/park(Object blocker) - 阻塞目前線程阻塞傳入的具體線程

public class LockSupport {

    ...
    
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
    
    ...
    
}
           

permit預設是0,是以一開始調用park()方法,目前線程就會阻塞,直到别的線程将目前線程的permit設定為1時,park方法會被喚醒,然後會将permit再次設定為0并傳回。

unpark(Thread thread) - 喚醒處于阻塞狀态的指定線程

public class LockSupport {
 
    ...
    
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    
    ...

}

           

調用unpark(thread)方法後,就會将thread線程的許可permit設定成1(注意多次調用unpark方法,不會累加,pemit值還是1)會自動喚醒thead線程,即之前阻塞中的LockSupport.park()方法會立即傳回。

3、同樣是阻塞、喚醒的方法,park/unpark 和 wait/notify,await/signal有什麼差別?

3.1、Object類中的wait和notify方法實作線程等待和喚醒
public class WaitNotifyDemo {

	static Object lock = new Object();
	
	public static void main(String[] args) {
		new Thread(()->{
			synchronized (lock) {
				System.out.println(Thread.currentThread().getName()+" come in.");
				try {
					lock.wait();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			System.out.println(Thread.currentThread().getName()+" 換醒.");
		}, "Thread A").start();
		
		new Thread(()->{
			synchronized (lock) {
				lock.notify();
				System.out.println(Thread.currentThread().getName()+" 通知.");
			}
		}, "Thread B").start();
	}
}

           

結論:

  • wait和notify方法必須要在同步塊或者方法裡面且成對出現使用,否則會抛出java.lang.IllegalMonitorStateException。
  • 調用順序要先wait後notify才OK。
3.2、Lock鎖Condation中的await和singnal方法實作線程等待和喚醒
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionAwaitSignalDemo {
		
	public static void main(String[] args) {
		
		ReentrantLock lock = new ReentrantLock();
		Condition condition = lock.newCondition();
		
		new Thread(()->{
			
			try {
				System.out.println(Thread.currentThread().getName()+" come in.");
				lock.lock();
				condition.await();				
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
			
			System.out.println(Thread.currentThread().getName()+" 換醒.");
		},"Thread A").start();
		
		new Thread(()->{
			try {
				lock.lock();
				condition.signal();
				System.out.println(Thread.currentThread().getName()+" 通知.");
			}finally {
				lock.unlock();
			}
		},"Thread B").start();
	}
	
}

           

結論:

  • await和singnal必須要在同步塊或者方法裡面且成對出現使用,否則會抛出java.lang.IllegalMonitorStateException。
  • 調用順序要先await後singnal才可以。
3.3、LockSupport的park/unpark方法實作線程等待和喚醒

LockSupport是用來建立鎖和其他同步類的基本線程阻塞原語。

LockSupport類使用了一種名為Permit(許可)的概念來做到阻塞和喚醒線程的功能,每個線程都有一個許可(permit),permit隻有兩個值1和零,預設是零。可以把許可看成是一種(0.1)信号量(Semaphore),但與Semaphore不同的是,許可的累加上限是1。

也就是說,調用LockSupport的park方法時,需要消耗一張許可證,如果沒有擷取到許可證(也就是permit的值為0)時,就會一直阻塞,等待許可證來進行消耗,如果有許可證可消耗,就會停止阻塞。調用LockSupport的unpark方法,就是生産一份許可證(permint的值加一,由原來的0變為1)。而且許可證的的累加上限是1。

線程阻塞需要消耗憑證(permit),這個憑證最多隻有1個。當調用park方法時,如果有憑證,則會直接消耗掉這個憑證然後正常退出。如果無憑證,就必須阻塞等待憑證可用。而unpark則相反,它會增加一個憑證,但憑證最多隻能有1個,累加無效。

public class LockSupportDemo {

	public static void main(String[] args) {
		Thread a = new Thread(()->{
			System.out.println(Thread.currentThread().getName() + " come in. " + System.currentTimeMillis());
			LockSupport.park();
			System.out.println(Thread.currentThread().getName() + " 換醒. " + System.currentTimeMillis());
		}, "Thread A");
		a.start();
		
		Thread b = new Thread(()->{
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			LockSupport.unpark(a);
			System.out.println(Thread.currentThread().getName()+" 通知.");
		}, "Thread B");
		b.start();
	}
}

======================結果=======================
Thread A come in.
Thread B 通知.
Thread A 換醒.

           

結論:

  • 正常 + 無鎖塊要求。
  • 先前錯誤的先喚醒後等待順序,LockSupport可無視這順序。
3.4、為什麼可以先喚醒線程後阻塞線程?

因為unpark獲得了一個憑證,之後再調用park方法,就可以名正言順的憑證消費,故不會阻塞。

3.5、為什麼喚醒兩次後阻塞兩次,但最終結果還會阻塞線程?

因為憑證的數量最多為1(不能累加),連續調用兩次 unpark和調用一次 unpark效果一樣,隻會增加一個憑證;而調用兩次park卻需要消費兩個憑證,證不夠,不能放行。

4、AQS

AQS指的是AbstractQueuedSynchronizer 抽象隊列同步器。通過内置的FIFO隊列來完成資源擷取線程的排隊工作,以及一個int類型變量表示持有鎖的狀态。是建構鎖或者其它同步器元件的重量級基礎架構及整個JUC體系的基石。

AQS 源碼詳解

CLH:Craig、Landin and Hagersten隊列,是一個單向連結清單,AQS中的隊列是CLH變體的虛拟雙向隊列FIFO。

4.1、AQS的作用
AQS 源碼詳解
AQS 源碼詳解

進一步了解鎖和同步器的關系

  • 鎖,面向鎖的使用者 - 定義了程式員和鎖互動的使用層APl,隐藏了實作細節,你調用即可
  • 同步器,面向鎖的實作者 - 比如Java并發大神DougLee,提出統一規範并簡化了鎖的實作,屏蔽了同步狀态管理、阻塞線程排隊和通知、喚醒機制等。

實作了AQS的鎖有:自旋鎖、互斥鎖、讀鎖寫鎖、條件産量、信号量、栅欄都是AQS的衍生物。

多個線程共享同一個資源時,同一時刻隻有一個線程擷取到鎖,搶占到資源,搶到資源的線程直接使用處理業務邏輯,搶不到資源的必然涉及一種排隊等候機制,搶占資源失敗的線程繼續去等待但等候線程仍然保留擷取鎖的可能且擷取鎖流程仍在繼續。

如果共享資源被占用,就需要一定的阻塞等待喚醒機制來保證鎖配置設定。這個機制主要用的是CLH隊列的變體實作的,将暫時擷取不到鎖的線程加入到隊列中,這個隊列就是AQS的抽象表現。它将請求共享資源的線程封裝成隊列的結點(Node),通過CAS、自旋以及LockSupportpark)的方式,維護state變量的狀态,使并發達到同步的控制效果。

以Lock lock = new ReentrantLock();

/** Synchronizer providing all implementation mechanics */
    private final Sync sync;
 /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }
           

ReentrantLock預設是非公平鎖,定義了一個最終類Sync來建立非公平鎖。NonfairSync繼承與Sync類,Sync類又繼承于AbstractQueuedSynchronizer。

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
           
/**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
           

AQS使用一個volatile的int類型的成員變量來表示同步狀态,通過内置的FIFo隊列來完成資源擷取的排隊工作将每條要去搶占資源的線程封裝成一個Node,節點來實作鎖的配置設定,通過CAS完成對State值的修改。

static final class Node{
 	......
 }
/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;
           

Lock接口的實作類,基本都是通過聚合了一個隊列同步器的子類完成線程通路控制的。

AQS 源碼詳解

NonfairSync ,FairSync繼承于 Sync類, ReentrantLock 是基于 NonfairSync ,FairSync來實作的,Sync類又繼承于AbstractQueuedSynchronizer 。

公平鎖與非公平鎖搶占鎖的差別:

非公平鎖:

/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
           

公平鎖:

/**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
           

公平鎖與非公平鎖的lock()方法唯一的差別就在于公平鎖在擷取同步狀态時多了一個限制條件:hasQueuedPredecessors()

hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。

hasQueuedPredecessors() 在AbstractQueuedSynchronizer中

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
           

公平鎖:公平鎖講究先來先到,線程在擷取鎖時,如果這個鎖的等待隊列中已經有線程在等待,那麼目前線程就會進入等待隊列中;

非公平鎖:不管是否有等待隊列,如果可以擷取鎖,則立刻占有鎖對象。如果擷取不到,再去隊列中排隊。

AQS 源碼詳解

整個ReentrantLock 的加鎖過程,可以分為三個階段:

  • 嘗試加鎖;
  • 加鎖失敗,線程入隊列;
  • 線程入隊列後,進入阻賽狀态。

帶入一個銀行辦理業務的案例來模拟我們的AQS 如何進行線程的管理和通知喚醒機制,3個線程模拟3個來銀行網點,受理視窗辦理業務的顧客。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AQSDemo {
	
	public static void main(String[] args) {
		ReentrantLock lock = new ReentrantLock();
		
		//帶入一個銀行辦理業務的案例來模拟我們的AQs 如何進行線程的管理和通知喚醒機制
		//3個線程模拟3個來銀行網點,受理視窗辦理業務的顧客

		//A顧客就是第一個顧客,此時受理視窗沒有任何人,A可以直接去辦理
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
				try {
					TimeUnit.SECONDS.sleep(5);//模拟辦理業務時間
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			} finally {
				lock.unlock();
			}
		}, "Thread A").start();
		
		//第2個顧客,第2個線程---->,由于受理業務的視窗隻有一個(隻能一個線程持有鎖),此代B隻能等待,
		//進入候客區
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
			} finally {
				lock.unlock();
			}
		}, "Thread B").start();
		
		
		//第3個顧客,第3個線程---->,由于受理業務的視窗隻有一個(隻能一個線程持有鎖),此代C隻能等待,
		//進入候客區
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
			} finally {
				lock.unlock();
			}
		}, "Thread C").start();
	}
}
           
AQS 源碼詳解

非公平鎖為例進入源碼: 首先是運作線程A,

public void lock() {
        sync.lock();
    }

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

           

線程A進入,先調用lock()方法,擷取鎖,此時state(鎖的同步狀态)處于初始狀态,即為0,然後進行判斷比較,使用的是compareAndSetState()方法,CAS的思想,底層實作是unsafe類,比較鎖狀态的期望值和預計值是否相同,相同則更新狀态1,接着設定目前線程為獨占的所有者線程。線程A占用鎖成功,更改鎖同步的狀态為1,開始處理自己的業務。

AQS 源碼詳解

接着線程B進入,也調用lock方法,進行狀态比較的時候,發現線程A正在占用,是以就會走acquire(1);方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
           

先調用tryAcquire(arg),嘗試再次擷取鎖

/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();   // 線程A已經占用,狀态更改為1
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {  //  擷取目前擷取獨占的的所有者的線程  線程A
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;  // 傳回false
        }
           

接着進入addWaiter(Node.EXCLUSIVE)方法

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
    	 // 該node節點是封裝線程B的節點
        Node node = new Node(Thread.currentThread(), mode);  // 自定義一個節點根據目前的線程以及占用模式    擷取目前的線程,已經占用模式   獨占 
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;  // 初始狀态是null  将null指派值新建立節點 
        if (pred != null) {
            node.prev = pred; //
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);  // 将節點插入隊列中
        return node;  // 傳回線程B的節點
    }
           

線程B将建立的節點插入隊列

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) { // 該node節點是封裝線程B的節點
        for (;;) { // 自旋  通過自旋 進入下一輪判斷 
            Node t = tail;  // 此時tail為null 
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))  // 插入一個哨兵節點
                    tail = head; // 此時head為傳過來的node哨兵節點  指派給尾節點
            } else {  // 第二次進來 t的值為哨兵節點
                node.prev = t;  //線程B的前置節點是哨兵節點
                if (compareAndSetTail(t, node)) {  
                // t是哨兵節點  node是線程B封裝的節點    期望是哨兵節點 實際也是哨兵節點  尾節點更改為 線程B封裝的節點												                  	
                    t.next = node;  // 設定哨兵的後置節點是 線程B封裝的節點	
                    return t; // 傳回哨兵節點
                }
            }
        }
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update); // 将輸入的節點和預期的節點進行比較 如果原來的為null 則設定為傳過來的節點
    }
  /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {  // 比較期望的節點和實際的節點是否相同,相同則更新值
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
           

然後線程B進入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {   // 該節點是線程B的節點
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 擷取線程B節點的前置節點
                if (p == head && tryAcquire(arg)) { // 判斷是否是頭節點并嘗試擷取鎖   
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 線程B傳回flase 再次此自旋
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev; // 擷取哨兵節點
            if (p == null)
                throw new NullPointerException();
            else
                return p; // 傳回
        }


/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // pred  哨兵節點 node 線程B節點
        int ws = pred.waitStatus; // 擷取節點的狀态值
        if (ws == Node.SIGNAL) 
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // CAS比較
        }
        return false; 
    }

 /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

 /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);  // 消耗許可證,未擷取進入阻塞狀态
        return Thread.interrupted();
    }
           
AQS 源碼詳解

線程B加入等待隊列。

AQS 源碼詳解

線程A依然工作,線程C如線程B那樣炮制加入等待隊列。

雙向連結清單中,第一個節點為虛節點(也叫哨兵節點),其實并不存儲任何資訊,隻是占位。真正的第一個有資料的節點,是從第二個節點開始的。

假設線程A工作結束,調用unLock(),釋放鎖占用。

AQS 源碼詳解

線程A調用unlock()方法

public void unlock() {
        sync.release(1);
    }
 /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;  // 擷取到線程B
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);   // 鎖的狀态更改為原始的  釋放獨占所有者的線程
            return free;
        }

 /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;  // 擷取線程B的下個節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 生産 許可證 
    }
    
 /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
           

擷取到許可證 線程B不在阻塞,再次去嘗試擷取鎖成功,線程B執行業務。線程C也是如此。

繼續閱讀