天天看點

CAS與原子類

CAS 全稱 Compare And Swap 中文意思 比較并交換

跟中文意思一樣就是一個比較然後交換的思想 比如

if(value == expectedValue)

{value = newValue;}

就像上面的這個代碼 隻要使用了CAS 當一個線程去記憶體擷取值(稱為b)的時候 會把記憶體的這個值 放到别的變量(稱為a)中 當這個線程對b進行了指派操作 這時會讓b跟a做比較 看看b有沒有被别的線程進行更改 如果b被更改了那麼會傳回a的值也就是舊值 如果b沒有被更改那麼會給b進行指派然後傳回這個新值 這就是cas

cas的使用

在 Java 中,CAS 操作是由 Unsafe 類提供支援的,該類定義了三種針對不同類型變量的 CAS 操作

compareAndSwapObject

compareAndSwapInt

compareAndSwapLong
           

它們都是 native 方法,由 Java 虛拟機提供具體實作,這意味着不同的 Java 虛拟機對它們的實作可能會略有不同。

以 compareAndSwapInt 為例,Unsafe 的 compareAndSwapInt 方法接收 4 個參數,分别是:對象執行個體、記憶體偏移量、字段期望值、字段新值。該方法會針對指定對象執行個體中的相應偏移量的字段執行 CAS 操作。

public class CASTest {

public static void main(String[] args) {
    Entity entity = new Entity();

    Unsafe unsafe = UnsafeFactory.getUnsafe();

    long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");

    boolean successful;

    // 4個參數分别是:對象執行個體、字段的記憶體偏移量、字段期望值、字段新值
    successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
    System.out.println(successful + "\t" + entity.x);

    successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
    System.out.println(successful + "\t" + entity.x);

    successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
    System.out.println(successful + "\t" + entity.x);
}
}

public class UnsafeFactory {

/**
 * 擷取 Unsafe 對象
 * @return
 */
public static Unsafe getUnsafe() {
    try {
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        return (Unsafe) field.get(null);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

/**
 * 擷取字段的記憶體偏移量
 * @param unsafe
 * @param clazz
 * @param fieldName
 * @return
 */
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
    try {
        return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
    } catch (NoSuchFieldException e) {
        throw new Error(e);
    }
}
           

針對 entity.x 的 3 次 CAS 操作,分别試圖将它從 0 改成 3、從 3 改成 5、從 3 改成 8。

CAS源碼分析

Hotspot 虛拟機對compareAndSwapInt 方法的實作如下:

#unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  // 根據偏移量,計算value的位址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值   e:要比較的值
  //cas成功,傳回期望值e,等于e,此方法傳回true 
  //cas失敗,傳回記憶體中的value值,不等于e,此方法傳回false
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
           

核心邏輯在Atomic::cmpxchg方法中,這個根據不同作業系統和不同CPU會有不同的實作。這裡我們以linux_64x的為例,檢視

Atomic::cmpxchg的實作
	#atomic_linux_x86.inline.hpp
	inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
	  //判斷目前執行環境是否為多處理器環境
	  int mp = os::is_MP();
	  //LOCK_IF_MP(%4) 在多處理器環境下,為 cmpxchgl 指令添加 lock 字首,以達到記憶體屏障的效果
	  //cmpxchgl 指令是包含在 x86 架構及 IA-64 架構中的一個原子條件指令,
	  //它會首先比較 dest 指針指向的記憶體值是否和 compare_value 的值相等,
	  //如果相等,則雙向交換 dest 與 exchange_value,否則就單方面地将 dest 指向的記憶體值交給exchange_value。
	  //這條指令完成了整個 CAS 操作,是以它也被稱為 CAS 指令。
	  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
	                    : "=a" (exchange_value)
	                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
	                    : "cc", "memory");
	  return exchange_value;
           

CAS實作高效率鎖 自旋鎖

自旋鎖就是讓線程不斷的空循環 不讓線程被休眠 線程休眠就要調用作業系統的指令 進入核心态 這樣效率就大大降低 cas實作自旋鎖線程會一直空循環不會進入核心态 進而達到高效率

public class CASLock {

//加鎖标記
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;

static {
    try {
        UNSAFE = UnsafeFactory.getUnsafe();
        OFFSET = UnsafeFactory.getFieldOffset(
                UNSAFE, CASLock.class, "state");
    } catch (Exception e) {
        throw new Error(e);
    }
}

public boolean cas() {
    return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1); // cas操作判斷目前對象的state的值是不是0如果是0就更改成功如果不是就失敗  這裡在jvm源碼中傳回的是舊值 但是被jvm封裝了  這裡隻要傳回舊值就是false
}

public int getState() {
    return state;
}

public void setState(int state) {
    this.state = state;
}

}





public class Test {
    private volatile static int sum = 0;
    static Object object = "";
    static ReentrantLock lock = new ReentrantLock();

static CASLock casLock = new CASLock();

public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            //synchronized (object) {
            //lock.lock();
            //
            for(;;){   // 這個死循環就是所謂的自旋鎖
                //state=0
                if(casLock.getState()==0&&casLock.cas()) {
                    try {
                        for (int j = 0; j < 10000; j++) {
                            sum++;
                        }
                        //
                        System.out.println(casLock.getState());
                    } finally {
                        //lock.unlock();
                        // state=0
                        casLock.setState(0);
                    }
                    break;
                }
            }

            //}
        });
        thread.start();
    }

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println(sum);

}


}
           

Atomic原子操作類

在并發程式設計中很容易出現并發安全的問題,有一個很簡單的例子就是多線程更新變量i=1,比如多個線程執行i++操作,就有可能擷取不到正确的值,而這個問題,最常用的方法是通過Synchronized進行控制來達到線程安全的目的。但是由于synchronized是采用的是悲觀鎖政策,并不是特别高效的一種解決方案。實際上,在J.U.C下的atomic包提供了一系列的操作簡單,性能高效,并能保證線程安全的類去更新基本類型變量,數組元素,引用類型以及更新對象中的字段類型。atomic包下的這些類都是采用的是樂觀鎖政策去原子更新資料,在java中則是使用CAS操作具體實作。

原子類可分為五種

基本類型:AtomicInteger、AtomicLong、AtomicBoolean;

引用類型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;

數組類型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

對象屬性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

原子類型累加器(jdk1.8增加的類):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

原子更新基本類型

以AtomicInteger為例總結常用的方法

//以原子的方式将執行個體中的原值加1,傳回的是自增前的舊值;

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
 
//getAndSet(int newValue):将執行個體中的值更新為新值,并傳回舊值;

public final boolean getAndSet(boolean newValue) {
    boolean prev;
    do {
        prev = get();
    } while (!compareAndSet(prev, newValue));
    return prev;
}
 
//incrementAndGet() :以原子的方式将執行個體中的原值進行加1操作,并傳回最終相加後的結果;
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
 
//addAndGet(int delta) :以原子方式将輸入的數值與執行個體中原本的值相加,并傳回最後的結果;
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;

}
           

使用

public class AtomicIntegerTest {
    static AtomicInteger sum = new AtomicInteger(0);

public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                // 原子自增  CAS
                sum.incrementAndGet();
                //TODO
            }
        });
        thread.start();
    }

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(sum.get());

}
           

incrementAndGet()方法通過CAS自增實作,如果CAS失敗,自旋直到成功+1。

incrementAndGet()方法最終回到這裡

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
		// compareAndSwapInt(var1, var2, var5, var5 + var4) 利用CAS不斷的更新值直到成功為止 
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
           

原子更新數組類型

AtomicIntegerArray為例總結常用的方法

//addAndGet(int i, int delta):以原子更新的方式将數組中索引為i的元素與輸入值相加;
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}
 
//getAndIncrement(int i):以原子更新的方式将數組中索引為i的元素自增加1;
public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}
 
//compareAndSet(int i, int expect, int update):将數組中索引為i的位置的元素進行更新
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);

}
           

測試

public class AtomicIntegerArrayTest {

static int[] value = new int[]{ 1, 2, 3, 4, 5 };
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);


public static void main(String[] args) throws InterruptedException {

    //設定索引0的元素為100
    atomicIntegerArray.set(0, 100);
    System.out.println(atomicIntegerArray.get(0));
    //以原子更新的方式将數組中索引為1的元素與輸入值相加
    atomicIntegerArray.getAndAdd(1,5);

    System.out.println(atomicIntegerArray);
}
           

LongAdder/DoubleAdder

之前有很多空循環也大大的浪廢cpu的資源LongAdder/DoubleAdder解決了這個問題

AtomicLong是利用了底層的CAS操作來提供并發性的,比如addAndGet方法:

性能測試

public class LongAdderTest {

    public static void main(String[] args) {
    testAtomicLongVSLongAdder(10, 10000);
    System.out.println("==================");
    testAtomicLongVSLongAdder(10, 200000);
    System.out.println("==================");
    testAtomicLongVSLongAdder(100, 200000);
}

static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
    try {
        long start = System.currentTimeMillis();
        testLongAdder(threadCount, times);
        long end = System.currentTimeMillis() - start;
        System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
        System.out.println("結果>>>>>>LongAdder方式增加計數" + (threadCount * times) + "次,共計耗時:" + end);

        long start2 = System.currentTimeMillis();
        testAtomicLong(threadCount, times);
        long end2 = System.currentTimeMillis() - start2;
        System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
        System.out.println("結果>>>>>>AtomicLong方式增加計數" + (threadCount * times) + "次,共計耗時:" + end2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    AtomicLong atomicLong = new AtomicLong();
    for (int i = 0; i < threadCount; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < times; j++) {
                    atomicLong.incrementAndGet();
                }
                countDownLatch.countDown();
            }
        }, "my-thread" + i).start();
    }
    countDownLatch.await();
}

static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    LongAdder longAdder = new LongAdder();
    for (int i = 0; i < threadCount; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < times; j++) {
                    longAdder.add(1);
                }
                countDownLatch.countDown();
            }
        }, "my-thread" + i).start();
    }

    countDownLatch.await();
}
           

LongAdder原理

10個線程各循環10萬次對i+1 使用了自旋鎖拿不到鎖的線程得到了cpu時間片會空循環 LongAdder解決這個問題的原理 它會建立一個數組 擷取目前線程的哈希值 然後對這個數組抹除 這樣這個線程會進入數組中其中的一個 這時把我們要加的數記下來也就是1 第一次把1指派給 為了好了解 這個建立一個數組 A[] a = new A[10] 假設來了一個線程進入了a[4] 第一次進入的時候把1指派給A類中的屬性(稱為aa)上 第二次進去會對aa屬性進行加1操作 然後所有線程執行完之後 把a數組中的每個A對象裡的aa屬性進行相加 最後的結果就是正确的 把一個屬性拆分成多個屬性最後把拆分的屬性進行相加 得到結果這樣的思想 大大的提升線程的使用率了