天天看點

java并發原理實戰(8)-- lock接口使用和認識

文章目錄

  • ​​lock接口​​
  • ​​lock的使用​​
  • ​​lock的本質​​
  • ​​ReentrantLock的調用過程​​
  • ​​鎖實作(加鎖)​​
  • ​​Sync.nonfairTryAcquire​​
  • ​​AbstractQueuedSynchronizer.addWaiter​​
  • ​​AbstractQueuedSynchronizer.acquireQueued​​
  • ​​解鎖​​
  • ​​Lock VS Synchronized​​

lock接口

lock的使用

  • lock的方法:
java并發原理實戰(8)-- lock接口使用和認識
  • lock的常用實作類
  • java并發原理實戰(8)-- lock接口使用和認識
  • 例子:
public class Sequence {
    private  int value;
    ReentrantLock lock = new ReentrantLock();

    public  int getNext() {
        lock.lock();
        int a = value++;
        lock.unlock();
        return a;
    }

    public static void main(String[] args) {
        Sequence sequence = new Sequence();
        new Thread(() ->{
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " " + sequence.getNext());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        ).start();
        new Thread(() -> {
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " " + sequence.getNext());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        ).start();
        new Thread(() -> {
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " " + sequence.getNext());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        ).start();
    }

}      

運作結果:

java并發原理實戰(8)-- lock接口使用和認識

結果顯示:lock解決了線程安全問題。

lock的本質

java并發原理實戰(8)-- lock接口使用和認識

轉載文章:

Lock完全用Java寫成,在java這個層面是無關JVM實作的。

在java.util.concurrent.locks包中有很多Lock的實作類,常用的有ReentrantLock、ReadWriteLock(實作類ReentrantReadWriteLock),其實作都依賴java.util.concurrent.AbstractQueuedSynchronizer類,實作思路都大同小異,是以我們以ReentrantLock作為講解切入點。

ReentrantLock的調用過程

經過觀察ReentrantLock把所有Lock接口的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer        

Sync又有兩個子類:

final static class NonfairSync extends Sync  

final static class FairSync extends Sync  
      
java并發原理實戰(8)-- lock接口使用和認識

顯然是為了支援公平鎖和非公平鎖而定義,預設情況下為非公平鎖。

先理一下Reentrant.lock()方法的調用過程(預設非公平鎖):

java并發原理實戰(8)-- lock接口使用和認識

這些讨厭的Template模式導緻很難直覺的看到整個調用過程,其實通過上面調用過程及AbstractQueuedSynchronizer的注釋可以發現,AbstractQueuedSynchronizer中抽象了絕大多數Lock的功能,而隻把tryAcquire方法延遲到子類中實作。tryAcquire方法的語義在于用具體子類判斷請求線程是否可以獲得鎖,無論成功與否AbstractQueuedSynchronizer都将處理後面的流程。

鎖實作(加鎖)

簡單說來,AbstractQueuedSynchronizer會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的後繼節點,但正在執行的線程并不在隊列中,而那些等待執行的線程全部處于阻塞狀态,經過調查線程的顯式阻塞是通過調用LockSupport.park()完成,而LockSupport.park()則調用sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中通過調用pthread_mutex_lock函數把線程交給系統核心進行阻塞。

該隊列如圖:

java并發原理實戰(8)-- lock接口使用和認識

與synchronized相同的是,這也是一個虛拟隊列,不存在隊列執行個體,僅存在節點之間的前後關系。令人疑惑的是為什麼采用CLH隊列呢?原生的CLH隊列是用于自旋鎖,但Doug Lea把其改造為阻塞鎖。

當有線程競争鎖時,該線程會首先嘗試獲得鎖,這對于那些已經在隊列中排隊的線程來說顯得不公平,這也是非公平鎖的由來,與synchronized實作類似,這樣會極大提高吞吐量。

如果已經存在Running線程,則新的競争線程會被追加到隊尾,具體是采用基于CAS的Lock-Free算法,因為線程并發對Tail調用CAS可能會導緻其他線程CAS失敗,解決辦法是循環CAS直至成功。AbstractQueuedSynchronizer的實作非常精巧,令人歎為觀止,不入細節難以完全領會其精髓,下面詳細說明實作過程:

Sync.nonfairTryAcquire

nonfairTryAcquire方法将是lock方法間接調用的第一個方法,每次請求鎖時都會首先調用該方法。

final boolean nonfairTryAcquire(int acquires) {  
    final Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (compareAndSetState(0, acquires)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    }  
    else if (current == getExclusiveOwnerThread()) {  
        int nextc = c + acquires;  
        if (nextc < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(nextc);  
        return true;  
    }  
    return false;  
}  
123456789101112131415161718      

該方法會首先判斷目前狀态,如果c=0說明沒有線程正在競争該鎖,如果不c !=0 說明有線程正擁有了該鎖。

如果發現c=0,則通過CAS設定該狀态值為acquires,acquires的初始調用值為1,每次線程重入該鎖都會+1,每次unlock都會-1,但為0時釋放鎖。如果CAS設定成功,則可以預計其他任何線程調用CAS都不會再成功,也就認為目前線程得到了該鎖,也作為Running線程,很顯然這個Running線程并未進入等待隊列。

如果c !=0 但發現自己已經擁有鎖,隻是簡單地++acquires,并修改status值,但因為沒有競争,是以通過setStatus修改,而非CAS,也就是說這段代碼實作了偏向鎖的功能,并且實作的非常漂亮。

AbstractQueuedSynchronizer.addWaiter

addWaiter方法負責把目前無法獲得鎖的線程包裝為一個Node添加到隊尾:

private Node addWaiter(Node mode) {  
    Node node = new Node(Thread.currentThread(), mode);  
    // Try the fast path of enq; backup to full enq on failure  
    Node pred = tail;  
    if (pred != null) {  
        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {  
            pred.next = node;  
            return node;  
        }  
    }  
    enq(node);  
    return node;  
}  
1234567891011121314      

其中參數mode是獨占鎖還是共享鎖,預設為null,獨占鎖。追加到隊尾的動作分兩步:

如果目前隊尾已經存在(tail!=null),則使用CAS把目前線程更新為Tail

如果目前Tail為null或則線程調用CAS設定隊尾失敗,則通過enq方法繼續設定Tail

下面是enq方法:

private Node enq(final Node node) {  
    for (;;) {  
        Node t = tail;  
        if (t == null) { // Must initialize  
            Node h = new Node(); // Dummy header  
            h.next = node;  
            node.prev = h;  
            if (compareAndSetHead(h)) {  
                tail = node;  
                return h;  
            }  
        }  
        else {  
            node.prev = t;  
            if (compareAndSetTail(t, node)) {  
                t.next = node;  
                return t;  
            }  
        }  
    }  
}  
123456789101112131415161718192021      

該方法就是循環調用CAS,即使有高并發的場景,無限循環将會最終成功把目前線程追加到隊尾(或設定隊頭)。總而言之,addWaiter的目的就是通過CAS把目前線程追加到隊尾,并傳回包裝後的Node執行個體。

把線程要包裝為Node對象的主要原因,除了用Node構造供虛拟隊列外,還用Node包裝了各種線程狀态,這些狀态被精心設計為一些數字值:

  • SIGNAL(-1) :線程的後繼線程正/已被阻塞,當該線程release或cancel時要重新這個後繼線程(unpark)
  • CANCELLED(1):因為逾時或中斷,該線程已經被取消
  • CONDITION(-2):表明該線程被處于條件隊列,就是因為調用了Condition.await而被阻塞
  • PROPAGATE(-3):傳播共享鎖
  • 0:0代表無狀态

AbstractQueuedSynchronizer.acquireQueued

acquireQueued的主要作用是把已經追加到隊列的線程節點(addWaiter方法傳回值)進行阻塞,但阻塞前又通過tryAccquire重試是否能獲得鎖,如果重試成功能則無需阻塞,直接傳回

final boolean acquireQueued(final Node node, int arg) {  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  
                setHead(node);  
                p.next = null; // help GC  
                return interrupted;  
            }  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                interrupted = true;  
        }  
    } catch (RuntimeException ex) {  
        cancelAcquire(node);  
        throw ex;  
    }  
}  
12345678910111213141516171819      

仔細看看這個方法是個無限循環,感覺如果p == head && tryAcquire(arg)條件不滿足循環将永遠無法結束,當然不會出現死循環,奧秘在于第12行的parkAndCheckInterrupt會把目前線程挂起,進而阻塞住線程的調用棧。

private final boolean parkAndCheckInterrupt() {  
    LockSupport.park(this);  
    return Thread.interrupted();  
}  
1234      

如前面所述,LockSupport.park最終把線程交給系統(Linux)核心進行阻塞。當然也不是馬上把請求不到鎖的線程進行阻塞,還要檢查該線程的狀态,比如如果該線程處于Cancel狀态則沒有必要,具體的檢查在shouldParkAfterFailedAcquire中:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
      int ws = pred.waitStatus;  
      if (ws == Node.SIGNAL)  
          /* 
           * This node has already set status asking a release 
           * to signal it, so it can safely park 
           */  
          return true;  
      if (ws > 0) {  
          /* 
           * Predecessor was cancelled. Skip over predecessors and 
           * indicate retry. 
           */  
   do {  
            node.prev = pred = pred.prev;  
    } while (pred.waitStatus > 0);  
            pred.next = node;  
    } else {  
          /* 
           * waitStatus must be 0 or PROPAGATE. Indicate that we 
           * need a signal, but don't park yet. Caller will need to 
           * retry to make sure it cannot acquire before parking.  
           */  
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
      }   
      return false;  
  }  
123456789101112131415161718192021222324252627      

檢查原則在于:

  • 規則1:如果前繼的節點狀态為SIGNAL,表明目前節點需要unpark,則傳回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)将導緻線程阻塞
  • 規則2:如果前繼節點狀态為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,傳回false,acquireQueued方法的無限循環将遞歸調用該方法,直至規則1傳回true,導緻線程阻塞
  • 規則3:如果前繼節點狀态為非SIGNAL、非CANCELLED,則設定前繼的狀态為SIGNAL,傳回false後進入acquireQueued的無限循環,與規則2同

總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷目前線程是否應該被阻塞,如果前繼節點處于CANCELLED狀态,則順便删除這些節點重新構造隊列。

至此,鎖住線程的邏輯已經完成,下面讨論解鎖的過程。

解鎖

請求鎖不成功的線程會被挂起在acquireQueued方法的第12行,12行以後的代碼必須等線程被解鎖鎖才能執行,假如被阻塞的線程得到解鎖,則執行第13行,即設定interrupted = true,之後又進入無限循環。

從無限循環的代碼可以看出,并不是得到解鎖的線程一定能獲得鎖,必須在第6行中調用tryAccquire重新競争,因為鎖是非公平的,有可能被新加入的線程獲得,進而導緻剛被喚醒的線程再次被阻塞,這個細節充分展現了“非公平”的精髓。通過之後将要介紹的解鎖機制會看到,第一個被解鎖的線程就是Head,是以p == head的判斷基本都會成功。

至此可以看到,把tryAcquire方法延遲到子類中實作的做法非常精妙并具有極強的可擴充性,令人歎為觀止!當然精妙的不是這個Template設計模式,而是Doug Lea對鎖結構的精心布局。

解鎖代碼相對簡單,主要展現在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:

class AbstractQueuedSynchronizer

public final boolean release(int arg) {  
    if (tryRelease(arg)) {  
        Node h = head;  
        if (h != null && h.waitStatus != 0)  
            unparkSuccessor(h);  
        return true;  
    }  
    return false;  
}  
123456789      

class Sync

protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (Thread.currentThread() != getExclusiveOwnerThread())  
        throw new IllegalMonitorStateException();  
    boolean free = false;  
    if (c == 0) {  
        free = true;  
        setExclusiveOwnerThread(null);  
    }  
    setState(c);  
    return free;  
}  
123456789101112      

tryRelease與tryAcquire語義相同,把如何釋放的邏輯延遲到子類中。

tryRelease語義很明确:如果線程多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設定status為0,因為無競争是以沒有使用CAS。

release的語義在于:如果可以釋放鎖,則喚醒隊列第一個線程(Head),具體喚醒代碼如下:

private void unparkSuccessor(Node node) {  
    /* 
     * If status is negative (i.e., possibly needing signal) try 
     * to clear in anticipation of signalling. It is OK if this 
     * fails or if status is changed by waiting thread. 
     */  
    int ws = node.waitStatus;  
    if (ws < 0)  
        compareAndSetWaitStatus(node, ws, 0);   

    /* 
     * Thread to unpark is held in successor, which is normally 
     * just the next node.  But if cancelled or apparently null, 
     * traverse backwards from tail to find the actual 
     * non-cancelled successor. 
     */  
    Node s = node.next;  
    if (s == null || s.waitStatus > 0) {  
        s = null;  
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)  
                s = t;  
    }  
    if (s != null)  
        LockSupport.unpark(s.thread);  
}  
1234567891011121314151617181920212223242526      

這段代碼的意思在于找出第一個可以unpark的線程,一般說來head.next == head,Head就是第一個線程,但Head.next可能被取消或被置為null,是以比較穩妥的辦法是從後往前找第一個可用線程。貌似回溯會導緻性能降低,其實這個發生的幾率很小,是以不會有性能影響。之後便是通知系統核心繼續該線程,在Linux下是通過pthread_mutex_unlock完成。之後,被解鎖的線程進入上面所說的重新競争狀态。

Lock VS Synchronized

AbstractQueuedSynchronizer通過構造一個基于阻塞的CLH隊列容納所有的阻塞線程,而對該隊列的操作均通過Lock-Free(CAS)操作,但對已經獲得鎖的線程而言,ReentrantLock實作了偏向鎖的功能。

synchronized的底層也是一個基于CAS操作的等待隊列,但JVM實作的更精細,把等待隊列分為ContentionList和EntryList,目的是為了降低線程的出列速度;當然也實作了偏向鎖,從資料結構來說二者設計沒有本質差別。但synchronized還實作了自旋鎖,并針對不同的系統和硬體體系進行了優化,而Lock則完全依靠系統阻塞挂起等待線程。

繼續閱讀