天天看點

CAS和ABA問題

CAS簡介

CAS 全稱是 compare and swap,是一種用于在多線程環境下實作同步功能的機制。

CAS 它是一條CPU并發原語。操作包含三個操作數 – 記憶體位置、預期數值和新值。CAS 的實作邏輯是将記憶體位置處的數值與預期數值想比較,若相等,則将記憶體位置處的值替換為新值。若不相等,則不做任何操作。這個過程是原子的。

CAS并發原語展現在java語言中的sun.misc.Unsafe類中的各個方法。調用Unsafe類中的CAS方法,JVM會幫我們實作彙編指令。這是一種完全依賴硬體的功能,通過它實作了原子操作。由于CAS是一種系統原語,原語屬于作業系統用語範疇,是由若幹條指令組成的,用于完成某個功能的一個過程,并且原語的執行必須是連續的,在執行過程中不允許被打斷,也就是說CAS是一條CPU的原子指令,不會造成所謂的資料不一緻問題。

Unsafe類

Unsafe類是CAS的核心類,由于Java方法無法直接通路底層系統,需要通過本地(native)方法來通路,基于該類可以直接操作特定記憶體的資料。Unsafe類存在與sum.misc包中,其内部方法操作可以像C的指針一樣直接操作記憶體,因為Java中CAS操作的執行依賴于Unsafe類的方法。

Unsafe類中的所有方法都是native修飾的,也就是說Unsafe類中的方法都直接調用作業系統底層資源執行相應任務。

代碼解析

public class CASDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(5);
        
        // 運作結果: true   2019
        System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t" + atomicInteger.get());
        // 運作結果: false  2019
        System.out.println(atomicInteger.compareAndSet(5, 1024) + "\t" + atomicInteger.get());

        // 此方法可以解決多線程環境下i++問題,底層使用的是Unsafe類CAS和自旋鎖
        atomicInteger.getAndIncrement();
    }
}
           

源碼分析:

/**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        // this=目前對象 valueOffset=記憶體偏移量(記憶體位址) 1=固定值,每次調用+1
        // Unsafe就是根據記憶體偏移位址擷取資料的。
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    
    /**
     * 為了友善檢視和添加注釋,此方法是從Unsafe類中複制出來的
     */
    public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                // 擷取var1對象,記憶體位址在var2的值。
                // 相當于這個線程從主實體記憶體中取值copy到自己的工作記憶體中。
                var5 = this.getIntVolatile(var1, var2);
                
                // 比較并交換,如果var1對象,記憶體位址在var2的值和var5值一樣,那麼就+1
                // compareAndSwapInt如果傳回true,取反為false,說明更新成功,退出循環,則傳回。
                // compareAndSwapInt如果傳回false,取反為true,說明目前線程工作記憶體中的值和主實體記憶體中的值不一樣,被其他線程修改了,則繼續循環擷取比較,直到更新成功為止。
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }
           

執行過程說明:

  • 假設線程A和線程B兩個線程同時執行 getAndAddInt操作(分别跑在不同CPU上):
  • AtomicInteger裡面的value原始值為3,即主記憶體中 AtomicInteger的value為3,根據JMM模型,線程A和線程B各自持有一

    份值為3的value的副本分别到各自的工作記憶體。

  • 線程A通過 getIntVolatile(var1,var2)拿到value值3,這時線程A被挂起。
  • 線程B也通過 getIntVolatile(var1,var2)方法擷取到value值3,此時剛好線程B沒有被挂起并執行 compareAndSwapInt方法

    比較記憶體值也為3,成功修改記憶體值為4,線程B改完收工,一切OK。

  • 這時線程A恢複,執行 compareAndSwapInt方法比較,發現自己手裡的值數字3和主記憶體的值數字4不一緻,說明該值已

    經被其它線程搶先一步修改過了,那A線程本次修改失敗,隻能重新讀取重新來一遍了。

  • 線程A重新擷取 value值,因為變量value被 volatile修飾,是以其它線程對它的修改,線程A總是能夠看到,線程A繼續執

    了 compareAndSwapInt進行比較替換,直到成功。

volatile簡單說明:

volatile是一個輕量級的同步機制, 三大特性: 保證可見性, 不保證原子性, 禁止指令重排。

  • 可見性: 多個線程從主記憶體中copy一份資料,修改後,需要将自己的資料重新寫入主記憶體,并通知其他線程資料已更新,保證資料可見性,和多線程資料一緻性。
  • 禁止指令重排: 由于指令重排,會對代碼的執行順序進行優化,可能會導緻最後的結果和期望的結果不一緻,是以需要禁止重排。

CAS的優缺點

優點:

  • 不需要加鎖,保持了一緻性和并發性。

缺點:

  • 循環時間長開銷很大:我們可以看到getAndAddInt方法執行時,如果CAS失敗,會一直進行嘗試。如果CAS長時間一直不成功,可能會給CPU帶來很大的開銷。
  • 隻能保證一個共享變量的原子操作:當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖來保證原子性。
  • ABA問題:下面會提供詳細案例

ABA問題

舉個栗子說明:

主記憶體有個資料值:A,兩個線程A和B分别copy主記憶體資料到自己的工作區,A執行比較慢,需要10秒, B執行比較快,需要2秒, 此時B線程将主記憶體中的資料更改為B,過了一會又更改為A,然後A線程執行比較,發現結果是A,以為别人沒有動過,然後執行更改操作。其實中間已經被更改過了,這就是ABA問題。

也就是ABA問題隻要開始時的資料和結束時的資料一緻,我就認為沒改過,不管過程。

盡管A線程的CAS操作是成功的,但是不代表這個過程就是沒問題的。

ABA問題說簡單點就是,預判值還是和當初抓取的一樣,但是這個“ 值 ”的版本可能不一樣了,在某些不僅要考慮資料值是否一緻,還要考慮版本是否一緻的場景下需要注意.

Java并發包為了解決這個問題,提供了一個帶有标記的原子引用類“AtomicStampedReference”,它可以通過控制變量值的版本來保證CAS的正确性。

解決ABA問題的代碼示例

/**
 * 解決CAS的ABA問題
 */
public class SolveABAOfCAS {

    static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) throws InterruptedException {
        System.out.println("==========以下是ABA問題的産生==========");
        new Thread(() -> {
            atomicReference.compareAndSet(100, 101);
            atomicReference.compareAndSet(101, 100);
        }, "t1").start();

        new Thread(() -> {
            try {
                // 暫停1秒鐘,保證上面完成一次ABA操作
                Thread.sleep(1000);
                System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();

        Thread.sleep(2000);
        System.out.println("==========以下是ABA問題的解決==========");
        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
            try {
                // 暫停一秒鐘t3線程
                Thread.sleep(1000);
                atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + "\t第2次版本号" + atomicStampedReference.getStamp());
                atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + "\t第3次版本号" + atomicStampedReference.getStamp());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
            try {
                // 暫停3秒鐘t4線程,保證上面的t3線程完成一次ABA操作
                Thread.sleep(3000);
                boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
                System.out.println(Thread.currentThread().getName() + "\t修改成功否: " + result + "\t目前最新實際版本号: " + atomicStampedReference.getStamp());
                System.out.println(Thread.currentThread().getName() + "\t目前實際最新值: " + atomicStampedReference.getReference());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t4").start();

    }
}
           

如果覺得對你有幫助,歡迎來訪我的部落格:http://jianjieming.com