CAS 全稱 Compare And Swap 中文意思 比較并交換
跟中文意思一樣就是一個比較然後交換的思想 比如
if(value == expectedValue)
{value = newValue;}
就像上面的這個代碼 隻要使用了CAS 當一個線程去記憶體擷取值(稱為b)的時候 會把記憶體的這個值 放到别的變量(稱為a)中 當這個線程對b進行了指派操作 這時會讓b跟a做比較 看看b有沒有被别的線程進行更改 如果b被更改了那麼會傳回a的值也就是舊值 如果b沒有被更改那麼會給b進行指派然後傳回這個新值 這就是cas
cas的使用
在 Java 中,CAS 操作是由 Unsafe 類提供支援的,該類定義了三種針對不同類型變量的 CAS 操作
compareAndSwapObject
compareAndSwapInt
compareAndSwapLong
它們都是 native 方法,由 Java 虛拟機提供具體實作,這意味着不同的 Java 虛拟機對它們的實作可能會略有不同。
以 compareAndSwapInt 為例,Unsafe 的 compareAndSwapInt 方法接收 4 個參數,分别是:對象執行個體、記憶體偏移量、字段期望值、字段新值。該方法會針對指定對象執行個體中的相應偏移量的字段執行 CAS 操作。
public class CASTest {
public static void main(String[] args) {
Entity entity = new Entity();
Unsafe unsafe = UnsafeFactory.getUnsafe();
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
boolean successful;
// 4個參數分别是:對象執行個體、字段的記憶體偏移量、字段期望值、字段新值
successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
System.out.println(successful + "\t" + entity.x);
}
}
public class UnsafeFactory {
/**
* 擷取 Unsafe 對象
* @return
*/
public static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 擷取字段的記憶體偏移量
* @param unsafe
* @param clazz
* @param fieldName
* @return
*/
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
try {
return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
針對 entity.x 的 3 次 CAS 操作,分别試圖将它從 0 改成 3、從 3 改成 5、從 3 改成 8。
CAS源碼分析
Hotspot 虛拟機對compareAndSwapInt 方法的實作如下:
#unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 根據偏移量,計算value的位址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值 e:要比較的值
//cas成功,傳回期望值e,等于e,此方法傳回true
//cas失敗,傳回記憶體中的value值,不等于e,此方法傳回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
核心邏輯在Atomic::cmpxchg方法中,這個根據不同作業系統和不同CPU會有不同的實作。這裡我們以linux_64x的為例,檢視
Atomic::cmpxchg的實作
#atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//判斷目前執行環境是否為多處理器環境
int mp = os::is_MP();
//LOCK_IF_MP(%4) 在多處理器環境下,為 cmpxchgl 指令添加 lock 字首,以達到記憶體屏障的效果
//cmpxchgl 指令是包含在 x86 架構及 IA-64 架構中的一個原子條件指令,
//它會首先比較 dest 指針指向的記憶體值是否和 compare_value 的值相等,
//如果相等,則雙向交換 dest 與 exchange_value,否則就單方面地将 dest 指向的記憶體值交給exchange_value。
//這條指令完成了整個 CAS 操作,是以它也被稱為 CAS 指令。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
CAS實作高效率鎖 自旋鎖
自旋鎖就是讓線程不斷的空循環 不讓線程被休眠 線程休眠就要調用作業系統的指令 進入核心态 這樣效率就大大降低 cas實作自旋鎖線程會一直空循環不會進入核心态 進而達到高效率
public class CASLock {
//加鎖标記
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;
static {
try {
UNSAFE = UnsafeFactory.getUnsafe();
OFFSET = UnsafeFactory.getFieldOffset(
UNSAFE, CASLock.class, "state");
} catch (Exception e) {
throw new Error(e);
}
}
public boolean cas() {
return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1); // cas操作判斷目前對象的state的值是不是0如果是0就更改成功如果不是就失敗 這裡在jvm源碼中傳回的是舊值 但是被jvm封裝了 這裡隻要傳回舊值就是false
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
}
public class Test {
private volatile static int sum = 0;
static Object object = "";
static ReentrantLock lock = new ReentrantLock();
static CASLock casLock = new CASLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
//synchronized (object) {
//lock.lock();
//
for(;;){ // 這個死循環就是所謂的自旋鎖
//state=0
if(casLock.getState()==0&&casLock.cas()) {
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
//
System.out.println(casLock.getState());
} finally {
//lock.unlock();
// state=0
casLock.setState(0);
}
break;
}
}
//}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sum);
}
}
Atomic原子操作類
在并發程式設計中很容易出現并發安全的問題,有一個很簡單的例子就是多線程更新變量i=1,比如多個線程執行i++操作,就有可能擷取不到正确的值,而這個問題,最常用的方法是通過Synchronized進行控制來達到線程安全的目的。但是由于synchronized是采用的是悲觀鎖政策,并不是特别高效的一種解決方案。實際上,在J.U.C下的atomic包提供了一系列的操作簡單,性能高效,并能保證線程安全的類去更新基本類型變量,數組元素,引用類型以及更新對象中的字段類型。atomic包下的這些類都是采用的是樂觀鎖政策去原子更新資料,在java中則是使用CAS操作具體實作。
原子類可分為五種
基本類型:AtomicInteger、AtomicLong、AtomicBoolean;
引用類型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
數組類型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
對象屬性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
原子類型累加器(jdk1.8增加的類):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64
原子更新基本類型
以AtomicInteger為例總結常用的方法
//以原子的方式将執行個體中的原值加1,傳回的是自增前的舊值;
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//getAndSet(int newValue):将執行個體中的值更新為新值,并傳回舊值;
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}
//incrementAndGet() :以原子的方式将執行個體中的原值進行加1操作,并傳回最終相加後的結果;
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//addAndGet(int delta) :以原子方式将輸入的數值與執行個體中原本的值相加,并傳回最後的結果;
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
使用
public class AtomicIntegerTest {
static AtomicInteger sum = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
// 原子自增 CAS
sum.incrementAndGet();
//TODO
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sum.get());
}
incrementAndGet()方法通過CAS自增實作,如果CAS失敗,自旋直到成功+1。
incrementAndGet()方法最終回到這裡
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
// compareAndSwapInt(var1, var2, var5, var5 + var4) 利用CAS不斷的更新值直到成功為止
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
原子更新數組類型
AtomicIntegerArray為例總結常用的方法
//addAndGet(int i, int delta):以原子更新的方式将數組中索引為i的元素與輸入值相加;
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}
//getAndIncrement(int i):以原子更新的方式将數組中索引為i的元素自增加1;
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
//compareAndSet(int i, int expect, int update):将數組中索引為i的位置的元素進行更新
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
測試
public class AtomicIntegerArrayTest {
static int[] value = new int[]{ 1, 2, 3, 4, 5 };
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);
public static void main(String[] args) throws InterruptedException {
//設定索引0的元素為100
atomicIntegerArray.set(0, 100);
System.out.println(atomicIntegerArray.get(0));
//以原子更新的方式将數組中索引為1的元素與輸入值相加
atomicIntegerArray.getAndAdd(1,5);
System.out.println(atomicIntegerArray);
}
LongAdder/DoubleAdder
之前有很多空循環也大大的浪廢cpu的資源LongAdder/DoubleAdder解決了這個問題
AtomicLong是利用了底層的CAS操作來提供并發性的,比如addAndGet方法:
性能測試
public class LongAdderTest {
public static void main(String[] args) {
testAtomicLongVSLongAdder(10, 10000);
System.out.println("==================");
testAtomicLongVSLongAdder(10, 200000);
System.out.println("==================");
testAtomicLongVSLongAdder(100, 200000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
try {
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
long end = System.currentTimeMillis() - start;
System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
System.out.println("結果>>>>>>LongAdder方式增加計數" + (threadCount * times) + "次,共計耗時:" + end);
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
long end2 = System.currentTimeMillis() - start2;
System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
System.out.println("結果>>>>>>AtomicLong方式增加計數" + (threadCount * times) + "次,共計耗時:" + end2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
AtomicLong atomicLong = new AtomicLong();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
countDownLatch.await();
}
static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
LongAdder longAdder = new LongAdder();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
longAdder.add(1);
}
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
countDownLatch.await();
}
LongAdder原理
10個線程各循環10萬次對i+1 使用了自旋鎖拿不到鎖的線程得到了cpu時間片會空循環 LongAdder解決這個問題的原理 它會建立一個數組 擷取目前線程的哈希值 然後對這個數組抹除 這樣這個線程會進入數組中其中的一個 這時把我們要加的數記下來也就是1 第一次把1指派給 為了好了解 這個建立一個數組 A[] a = new A[10] 假設來了一個線程進入了a[4] 第一次進入的時候把1指派給A類中的屬性(稱為aa)上 第二次進去會對aa屬性進行加1操作 然後所有線程執行完之後 把a數組中的每個A對象裡的aa屬性進行相加 最後的結果就是正确的 把一個屬性拆分成多個屬性最後把拆分的屬性進行相加 得到結果這樣的思想 大大的提升線程的使用率了