并發基礎裡主要明白下CAS和AQS吧 CAS:Compare And Swap 比較然後交換 AQS:AbstractqueuedSynchronizer抽象的隊列式同步器 一、CAS CAS在很多無鎖的并發裡使用。無鎖并發的意思就是,用不加鎖的方式實作并發操作,這一波操作是不是很666啊 通常加鎖操作(無論是synchronized還是Lock)都是比較悲觀的認為被鎖住的部分(無論是對象、方法還是代碼塊)每一次操作都會引起資源競争。 而CAS就比較樂觀了,他認為對資源的通路是沒有沖突的,既然沒有沖突就應該被友好的通路。所有通路的線程走到這一塊的時候都不需要停下來等待其它線程釋放資源。 那CAS是哪兒來的樂觀呢?就是通過比較然後交換的思想。 CAS算法:包含三個參數(V,E,N),其中,V表示要更新的變量,E表示預期的變量,N表示新值。 當且僅當V值等于E值時,将V值更新為N值。并傳回V值。 對一個線程L來講,他要去更新一個值的時候(V),他首先知道這個值應該等于E,更新之前他先去比較一下,當确認V值等于E了,他就認為這個值沒有被其他線程修改過,藍後,再對V進行更新,更新為N值。 CAS的應用之一是jdk的atomic包。比如AtomicInteger compareAndSet是主要使用的方法 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } unsafe的 compareAndSwapInt方法是native的,會調用底層作業系統完成對值的修改 CAS會遇到一個無法解決的問題,即ABA問題。即V值被多次修改後值仍為最初值。 意思是說對于之前的線程L來說,他去更新V值的時候。确認V值等于E不能完全确認這個值沒有被其它線程修改過。 比如線程L1将V值改成V1,線程L2将V值又改回了V值。 V值經曆了兩次修改,但是對于L線程來說,這個值沒有修改過。But,V值其實進行了多次更新。 我個人沒有想到ABA可能會引發的問題,猜對于狀态比較敏感的情景下應該不被允許吧。 對于ABA問題,解決方案是設定一個時間戳,每次修改成功後更新時間戳。 對于V值的預期值E和預期時間戳都滿足時才對V值進行更新。 jdk有個 AtomicStampedReference類,其内部實作了一個Pair類,用來封裝值和時間戳
1 private static class Pair<T> {
2 final T reference;
3 final int stamp;
4 private Pair(T reference, int stamp) {
5 this.reference = reference;
6 this.stamp = stamp;
7 }
8 static <T> Pair<T> of(T reference, int stamp) {
9 return new Pair<T>(reference, stamp);
10 }
11 }
Pair
二、AQS AQS是java.util.concurrent.locks包下的AbstractQueuedSynchronizer類。 AQS核心是通過共享變量同步狀态,變量的狀态由子類維護,AQS架構主要做的是: 線程阻塞隊列的維護 線程阻塞和喚醒 * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> 上圖摘自jdk源碼說明,是LCH隊列鎖的說明。LCH隊列鎖是一個FIFO隊列,每個節點是一個等待線程,其前繼節點(pre)釋放鎖之後就可以獲得鎖了。 AbstractQueuedSynchronizer通過内部類Node來維護這個隊列。 AbstractQueuedSynchronizer成員變量維護一個Node類型的head,一個Node類型的tail以及狀态state。維護對LCH隊列的入隊出隊操作以及對state狀态的修改。 其中,共享變量state的修改是通過unsafe的CAS來操作的。 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node. prev = pred; if (compareAndSetTail(pred, node)) { pred. next = node; return node; } } enq(node) ; return node; } 其中,入隊的操作如下 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node. prev = t; if (compareAndSetTail(t, node)) { t. next = node; return t; } } } } AbstractQueuedSynchronizer主要有兩個方法:acquire()和release() acquire方法用來擷取鎖,傳回true說明線程擷取成功繼續執行,一旦傳回false則線程加入到等待隊列中,等待被喚醒,release方法用來釋放鎖。 一般來說實作的時候這兩個方法被封裝為lock和unlock方法。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0)//喚醒後繼節點 unparkSuccessor(h) ; return true; } return false; } 以下四個方法由子類去實作 protected boolean tryAcquire(int arg) protected boolean tryRelease(int arg) protected int tryAcquireShared(int arg) protected boolean tryReleaseShared(int arg)
轉載于:https://www.cnblogs.com/liumumu2014/p/9168252.html