關于ConcurrentHashMap的size方法,有資料說size不能提供強的一緻性,但是也有人說size是強一緻性的。那麼對于這個問題,我們從源碼出發,來看看size的實作機制。最終看看能否得到正确的答案。
1.一緻性定義
關于一緻性的定義,大概如下:
一緻性(Consistency)是指多副本(Replications)問題中的資料一緻性。可以分為強一緻性、順序一緻性與弱一緻性。
1.1 強一緻性(Strict Consistency)
強一緻性也被可以被稱做:
原子一緻性(Atomic Consistency)
線性一緻性(Linearizable Consistency)
要滿足強一緻性,必須符合以下兩個要求:
- 任何一次讀都能讀到某個資料的最近一次寫的資料。
- 系統中的所有程序,看到的操作順序,都和全局時鐘下的順序一緻。
上述定義用通俗的話來解釋就是,假定對同一個資料集合,分别有兩個線程A、B進行操作,假定A首先進行的修改操作,那麼從時序上在A這個操作之後發生的所有B的操作都應該能看到A修改操作的結果。
1.2 弱一緻性
資料更新之後,如果能容忍通路不到或者隻能部分通路的情況,就是弱一緻性。最終一緻性是弱一緻性的一個特例。
也就是說,對于資料集,分别有兩個線程A、B進行操作,假定A首先進行了修改操作,那麼可能從時許上滞後的B進行的讀取操作在一段時間内還讀取不到這個結果。讀取的還是A操作之前的結果。這就是弱一緻性。
最終一緻性就是說,隻要A、B的都不進行任何更新操作,一段時間之後,資料都能讀取到最新的資料。
2.size方法源碼
2.1 jdk1.8實作
2.1.1 size方法
我們來看看1.8版本中的ConcurrnetHashMap中size方法的源碼:
/**
* {@inheritDoc}
*/
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
複制
2.1.2 sumCount
實際上底層調用的是sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
複制
可以看到,這個count,實際上是對CounterCell數組進行周遊,中間沒有任何鎖操作。
2.1.3 CounterCell
CounterCell源碼如下:
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
複制
這實際上就是一個volatile修飾的計數器。除了Contended這個注解之外,沒有什麼特别之處,在put、remove的時候,對這個計數器進行增減。
Contended這個注解我們在後面再來詳細解釋。
counterCells這個數組,實際上size和table一緻,這樣Counter中的value就是這個數組中index對應到table中bucket的長度。
在table擴容的時候,這個計數器數組也會擴容:
CounterCell[] rs = new CounterCell[n << 1];
複制
2.1.4 addCount
那麼在put和remove以及clear等方式對size數量有影響的方法中,都會調用addCount對size進行增減。
x為正數表示增加,負數表示減小。同時check如果大于0則需要對結果進行check,避免在并發過程中由于并發操作帶來的計算不準确。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//判斷是否為空
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
//a是計算出來的槽位
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
//cas方式
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//增加
fullAddCount(x, uncontended);
return;
}
//如果不需要檢查就直接傳回
if (check <= 1)
return;
s = sumCount();
}
//如果需要檢查
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//周遊并重新計算
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
複制
2.1.5 fullAddCount
這是執行增加的核心方法,其中大量使用了cas操作,另外還必須考慮到執行的并行性。
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//死循環,cas方式修改
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
複制
2.1.6 總結
通過對上述方法分析不難看出,size方法是弱一緻性的,這是因為,如果有A線程正在進行put操作,之後觸發了擴容或者紅黑樹轉置,那麼立即就會synchronized鎖定root節點。之後開始進行對應的操作,這個操作是需要時間的。但是這個時候,如果線程B來調用size方法,那麼size方法由于沒有任何鎖機制,肯定是能夠傳回的,此時傳回的size就是put之前的值。那麼這個結果就導緻了弱一緻性。即put在前的操作并不能馬上讓時許在其後面的操作得到結果,需要等一段時間。待synchronized執行完成。
ConcurrenthashMap的counter機制就是為了增加讀取性能而設計的,如果為了強一緻性,那麼隻能按HashTable的方式整個讀取方法都加鎖,那麼這樣肯定會影響性能的。
另外addCount,在增加操作的時候還會對數量進行檢查。以避免并發操作帶來的不一緻性。
2.2 jdk1.7源碼實作
由于1.7采用分段鎖的機制,是以設計沒有1.8複雜。
2.2.1 size方法源碼
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
//死循環
for (;;) {
//周遊 逐漸鎖定段
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
//假定初始的modCount
sum = 0L;
size = 0;
overflow = false;
//計算bucket
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
//将modCount相加
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//如果modCount在這個計算過程中沒有改變則說明size計算有效,否則會重置last之後重新計算
if (sum == last)
break;
last = sum;
}
} finally {
//将所有的lock進行unlock操作
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
複制
這個方法的邏輯是,在一開始,周遊segment的時候,先鎖定一個段,計算size,然後判斷在這個過程中,modCount是否發生了改變,如果發生改變則說明計算結果會産生誤差,則重新計算。直到modCount在計算前後相等,則說計算可行,之後再移動到下要給bucket。
可以看到這實際上是個低效的操作,隻有在所有的bucket都計算完成之後,才會統一在finally中進行unlock。這樣會導緻全部的段都被鎖定。
也就是說,1.7中的size方法,最開始是個樂觀鎖,最終會轉換為悲觀鎖,這樣實際上是個強一緻性的方法。
2.3 說明
通過上述對1.7和1.8源碼中對size方法的對比,在1.7中,size能做到強一緻性,但是這樣是有代價的,對分段鎖的lock導緻了整體性能的降低。而在1.8中,為了增加性能,而增加了一大段複雜的代碼将size變成了弱一緻性。但是好處是在put的過程中不會對size造成阻塞。
由此可見源碼作者為了提升ConcurrentHashMap所做的各種努力。
這也是我們在編碼過程中值得借鑒的地方。
至于@sun.misc.Contended,這是通過緩存行對齊來避免僞共享問題,這個将在後續單獨介紹。