天天看點

聊聊java中的哪些Map:(七)ConcurrentHashMap的size方法的一緻性分析

關于ConcurrentHashMap的size方法,有資料說size不能提供強的一緻性,但是也有人說size是強一緻性的。那麼對于這個問題,我們從源碼出發,來看看size的實作機制。最終看看能否得到正确的答案。

1.一緻性定義

關于一緻性的定義,大概如下:

一緻性(Consistency)是指多副本(Replications)問題中的資料一緻性。可以分為強一緻性、順序一緻性與弱一緻性。

1.1 強一緻性(Strict Consistency)

強一緻性也被可以被稱做:

原子一緻性(Atomic Consistency)

線性一緻性(Linearizable Consistency)

要滿足強一緻性,必須符合以下兩個要求:

  • 任何一次讀都能讀到某個資料的最近一次寫的資料。
  • 系統中的所有程序,看到的操作順序,都和全局時鐘下的順序一緻。

上述定義用通俗的話來解釋就是,假定對同一個資料集合,分别有兩個線程A、B進行操作,假定A首先進行的修改操作,那麼從時序上在A這個操作之後發生的所有B的操作都應該能看到A修改操作的結果。

1.2 弱一緻性

資料更新之後,如果能容忍通路不到或者隻能部分通路的情況,就是弱一緻性。最終一緻性是弱一緻性的一個特例。

也就是說,對于資料集,分别有兩個線程A、B進行操作,假定A首先進行了修改操作,那麼可能從時許上滞後的B進行的讀取操作在一段時間内還讀取不到這個結果。讀取的還是A操作之前的結果。這就是弱一緻性。

最終一緻性就是說,隻要A、B的都不進行任何更新操作,一段時間之後,資料都能讀取到最新的資料。

2.size方法源碼

2.1 jdk1.8實作

2.1.1 size方法

我們來看看1.8版本中的ConcurrnetHashMap中size方法的源碼:

/**
 * {@inheritDoc}
 */
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}           

複制

2.1.2 sumCount

實際上底層調用的是sumCount方法:

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}           

複制

可以看到,這個count,實際上是對CounterCell數組進行周遊,中間沒有任何鎖操作。

2.1.3 CounterCell

CounterCell源碼如下:

/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}           

複制

這實際上就是一個volatile修飾的計數器。除了Contended這個注解之外,沒有什麼特别之處,在put、remove的時候,對這個計數器進行增減。

Contended這個注解我們在後面再來詳細解釋。

counterCells這個數組,實際上size和table一緻,這樣Counter中的value就是這個數組中index對應到table中bucket的長度。

在table擴容的時候,這個計數器數組也會擴容:

CounterCell[] rs = new CounterCell[n << 1];           

複制

2.1.4 addCount

那麼在put和remove以及clear等方式對size數量有影響的方法中,都會調用addCount對size進行增減。

x為正數表示增加,負數表示減小。同時check如果大于0則需要對結果進行check,避免在并發過程中由于并發操作帶來的計算不準确。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //判斷是否為空
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
        //a是計算出來的槽位
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
            //cas方式
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
              //增加
            fullAddCount(x, uncontended);
            return;
        }
        //如果不需要檢查就直接傳回
        if (check <= 1)
            return;
        s = sumCount();
    }
    
    //如果需要檢查
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //周遊并重新計算
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}           

複制

2.1.5 fullAddCount

這是執行增加的核心方法,其中大量使用了cas操作,另外還必須考慮到執行的并行性。

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    //死循環,cas方式修改
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}           

複制

2.1.6 總結

通過對上述方法分析不難看出,size方法是弱一緻性的,這是因為,如果有A線程正在進行put操作,之後觸發了擴容或者紅黑樹轉置,那麼立即就會synchronized鎖定root節點。之後開始進行對應的操作,這個操作是需要時間的。但是這個時候,如果線程B來調用size方法,那麼size方法由于沒有任何鎖機制,肯定是能夠傳回的,此時傳回的size就是put之前的值。那麼這個結果就導緻了弱一緻性。即put在前的操作并不能馬上讓時許在其後面的操作得到結果,需要等一段時間。待synchronized執行完成。

ConcurrenthashMap的counter機制就是為了增加讀取性能而設計的,如果為了強一緻性,那麼隻能按HashTable的方式整個讀取方法都加鎖,那麼這樣肯定會影響性能的。

另外addCount,在增加操作的時候還會對數量進行檢查。以避免并發操作帶來的不一緻性。

2.2 jdk1.7源碼實作

由于1.7采用分段鎖的機制,是以設計沒有1.8複雜。

2.2.1 size方法源碼

/**
 * Returns the number of key-value mappings in this map.  If the
 * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
 * <tt>Integer.MAX_VALUE</tt>.
 *
 * @return the number of key-value mappings in this map
 */
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    //死循環
        for (;;) {
        //周遊 逐漸鎖定段
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            //假定初始的modCount
            sum = 0L;
            size = 0;
            overflow = false;
            //計算bucket
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    //将modCount相加
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            //如果modCount在這個計算過程中沒有改變則說明size計算有效,否則會重置last之後重新計算
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
    //将所有的lock進行unlock操作
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}           

複制

這個方法的邏輯是,在一開始,周遊segment的時候,先鎖定一個段,計算size,然後判斷在這個過程中,modCount是否發生了改變,如果發生改變則說明計算結果會産生誤差,則重新計算。直到modCount在計算前後相等,則說計算可行,之後再移動到下要給bucket。

可以看到這實際上是個低效的操作,隻有在所有的bucket都計算完成之後,才會統一在finally中進行unlock。這樣會導緻全部的段都被鎖定。

也就是說,1.7中的size方法,最開始是個樂觀鎖,最終會轉換為悲觀鎖,這樣實際上是個強一緻性的方法。

2.3 說明

通過上述對1.7和1.8源碼中對size方法的對比,在1.7中,size能做到強一緻性,但是這樣是有代價的,對分段鎖的lock導緻了整體性能的降低。而在1.8中,為了增加性能,而增加了一大段複雜的代碼将size變成了弱一緻性。但是好處是在put的過程中不會對size造成阻塞。

由此可見源碼作者為了提升ConcurrentHashMap所做的各種努力。

這也是我們在編碼過程中值得借鑒的地方。

至于@sun.misc.Contended,這是通過緩存行對齊來避免僞共享問題,這個将在後續單獨介紹。