天天看點

ConcurrentHashMap源碼解析_03 put方法源碼分析

JDK1.8 ConcurrentHashMap結構圖:

ConcurrentHashMap源碼解析_03 put方法源碼分析

1、put方法源碼解析

// 向并發Map中put一個資料
public V put(K key, V value) {
    return putVal(key, value, false);
}

// 向并發Map中put一個資料
// Key: 資料的鍵
// value:資料的值
// onlyIfAbsent:是否替換資料:
// 如果為false,則當put資料時,遇到Map中有相同K,V的資料,則将其替換
// 如果為true,則當put資料時,遇到Map中有相同K,V的資料,則不做替換,不插入
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 控制k 和 v 不能為null
    if (key == null || value == null) throw new NullPointerException();
    // 通過spread方法,可以讓高位也能參與進尋址運算,使最終得到的hash值更分散
    int hash = spread(key.hashCode());
    
    // binCount表示目前k-v 封裝成node插入到指定桶位後,在桶位中的所屬連結清單的下标位置
    // =0 表示目前桶位為null,node可以直接放入
    // >0 表示目前桶位中有節點,周遊連結清單,将封裝k-v的新node放傳入連結表末尾,并記錄連結清單末尾的下标binCount
    // =2 表示目前桶位**可能**已經樹化為紅黑樹了
    int binCount = 0;
    
    // tab 引用map對象的table
    // 自旋
    for (Node<K,V>[] tab = table;;) {
        // f 表示桶位的頭結點
        // n 表示散清單數組的長度
        // i 表示key通過尋址計算後,得到的桶位下标
        // fh 表示桶位頭結點的hash值
        Node<K,V> f; int n, i, fh;
        // -----------------------------------------------------------------------------
        // CASE1:成立,表示目前map中的table尚未初始化...
        if (tab == null || (n = tab.length) == 0)
            // 對map中的table進行初始化
            tab = initTable();
        // -----------------------------------------------------------------------------
        // CASE2:table已經初始化,此時根據尋址算法确定一個桶,并且桶的頭結點f為null
        // i 表示key使用路由尋址算法得到key對應table數組的下标位置,tabAt方法擷取指定桶位i的頭結點f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 這時候就可以将封裝k-v的結點直接放入桶
            // casTabAt通過CAS的方式去向Node數組指定位置i設定節點值,設定成功傳回true 否則傳回false
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        // -----------------------------------------------------------------------------
        // CASE3:table已經初始化,尋址得到的桶位中的頭結點f不是null,如果該頭結點f的hash值fh=-1:
        // 則,表示目前節點是FWD(forwarding)節點
        // 如果CASE3條件成立:表示目前桶位的頭結點為 FWD結點,表示目前map正處于擴容過程中..
        else if ((fh = f.hash) == MOVED)
            // 發現f是FWD結點後,目前結點有義務去幫助目前map對象完成資料遷移工作...
            // 等學完擴容再來分析這裡~
            tab = helpTransfer(tab, f);
        // -----------------------------------------------------------------------------
        // CASE4:CASE1-3都不滿足時,那麼目前桶位存放的可能是連結清單也可能是紅黑樹代理結點TreeBin
        else {
            // 保留替換之前的資料引用:當新插入的key存在後,會将舊值賦給OldVal,傳回給put方法調用
            V oldVal = null;
            // 使用synchronized加鎖“頭節點”(**理論上是“頭結點”**)
            synchronized (f) {
                // -----------------------------------------------------------------------
                // CASE5:tabAt(tab, i) == f
                // 對比一下目前桶位的頭節點是否為之前擷取的頭結點:為什麼又要對比一下?
                // 如果其它線程在目前線程之前将該桶位的頭結點修改掉了,目前線程再使用sync對該節點f加鎖就有問題了(鎖本身加錯了地方~) 
                if (tabAt(tab, i) == f) {// 如果條件成立,說明加鎖的對象f沒有問題,持有鎖!
                    // ------------------------------------------------------------------
                    // CASE6:fh >= 0)
                    // 如果條件成立,說明目前桶位就是普通連結清單桶位,這裡回顧下常量屬性字段:
                    // static final int MOVED = -1; 表示目前節點是FWD(forwarding)節點
                    // static final int TREEBIN = -2; 表示目前節點已經樹化
                    if (fh >= 0) {
                        // 1.目前插入key與連結清單當中所有元素的key都不一緻時,目前的插入操作是追加到連結清單的末尾,binCount此時表示連結清單長度
                        // 2.目前插入key與連結清單當中的某個元素的key一緻時,目前插入操作可能就是替換了。binCount表示沖突位置(binCount - 1)
                        binCount = 1;
                        // 疊代循環目前桶位的連結清單,e是每次循環處理節點。
                        for (Node<K,V> e = f;; ++binCount) {
                            // 目前循環周遊節點的key
                            K ek;
                            // --------------------------------------------------------
                            // CASE7:
                            // 條件一:e.hash == hash
                            // 成立:表示循環的目前元素的hash值與插入節點的hash值一緻,需要進一步判斷
                            // 條件二:((ek = e.key) == key ||(ek != null && key.equals(ek)))
                            // 成立:說明循環的目前節點與插入節點的key一緻,确實發生沖突了~
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 将目前循環的元素的值指派給oldVal
                                oldVal = e.val;
                                // 傳入的putVal方法參數onlyIfAbsent:是否替換資料:
                                // false,當put資料時,遇到Map中有相同K,V的資料,則将其替換
                                // true,當put資料時,遇到Map中有相同K,V的資料,則不做替換,不插入
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // --------------------------------------------------------
                            // CASE8:
                            // 目前元素與插入元素的key不一緻時,會走下面程式:
                            // 1.更新循環周遊的節點為目前節點的下一個節點
                            // 2.判斷下一個節點是否為null,如果是null,說明目前節點已經是隊尾了,插入資料需要追加到隊尾節點的後面。
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // ------------------------------------------------------------------
                    // CASE9:f instanceof TreeBin
                    // 如果條件成立,表示目前桶位是紅黑樹代理結點TreeBin
                    //(前置條件:該桶位一定不是連結清單)
                    else if (f instanceof TreeBin) {
                        // p表示紅黑樹中如果與你插入節點的key 有沖突節點的話,則putTreeVal方法會傳回沖突節點的引用。
                        Node<K,V> p;
                        // 強制設定binCount為2,因為binCount <= 1 時有其它含義,是以這裡設定為了2 (回頭講addCount的時候會詳細介紹)
                        binCount = 2;
                        
                        // 條件一成立,說明目前插入節點的key與紅黑樹中的某個節點的key一緻,沖突了:
                        // putTreeVal向紅黑樹中插入結點,插入成功傳回null,否則傳回沖突結點對象
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            // 将沖突節點的值指派給oldVal
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // ------------------------------------------------------------------
            // CASE10:binCount != 0
            // 說明目前桶位不為null,可能是紅黑樹也可能是連結清單
            if (binCount != 0) {
                // 如果binCount>=8 表示處理的桶位一定是連結清單
                if (binCount >= TREEIFY_THRESHOLD)
                    // 因為桶中連結清單長度大于了8,需要樹化:
                    // 調用轉化連結清單為紅黑樹的方法
                    treeifyBin(tab, i);
                // 說明目前線程插入的資料key,與原有k-v發生沖突,需要将原資料v傳回給調用者。
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // Map中的元素資料量累加方法:有額外的以下功能
    // 1.統計目前table一共有多少資料
    // 2.并判斷是否達到擴容門檻值标準,觸發擴容。
    addCount(1L, binCount);
    
    return null;
}
      

2、initTable方法源碼分析

第一次放元素時,初始化桶數組:

/** 
 * table初始化
 */
private final Node<K,V>[] initTable() {
    // tab: 引用map.table
    // sc: sizeCtl的臨時值
    // sizeCtl:預設為0,用來控制table的狀态、以及初始化和擴容操作:
    // sizeCtl<0表示table的狀态:
    //(1)=-1,表示有其他線程正在進行初始化操作。(其他線程就不能再進行初始化,相當于一把鎖)
    //(2)=-(1 + nThreads),表示有n個線程正在一起擴容。
    // sizeCtl>=0表示table的初始化和擴容相關操作:
    //(3)=0,預設值,後續在真正初始化table的時候使用,設定為預設容量DEFAULT_CAPACITY --> 16。
    //(4)>0,将sizeCtl設定為table初始容量或擴容完成後的下一次擴容的門檻。
    Node<K,V>[] tab; int sc;
    
    // 附加條件的自旋: 條件是map.table尚未初始化...
    while ((tab = table) == null || tab.length == 0) {
        // -----------------------------------------------------------------------------
        // CASE1: sizeCtl) < 0
        // sizeCtl < 0可能是以下2種情況:
        //(1)-1,表示有其他線程正在進行table初始化操作。
        //(2)-(1 + nThreads),表示有n個線程正在一起擴容。
        if ((sc = sizeCtl) < 0)
            // 這裡sizeCtl大機率就是-1,表示其它線程正在進行建立table的過程,目前線程沒有競争到初始化table的鎖,進而目前線程被迫等待...
            Thread.yield();
        // -----------------------------------------------------------------------------
        // CASE2:sizeCtl) >= 0 且U.compareAndSwapInt(this, SIZECTL, sc, -1)結果為true
        // U.compareAndSwapInt(this, SIZECTL, sc, -1):以CAS的方式修改目前線程的sizeCtl為-1,
        // sizeCtl如果成功被修改為-1,就傳回true,否則傳回false。
        // 當傳回true時,則該線程就可以進入下面的else if代碼塊中,這時候sizeCtl=-1相當于是一把鎖,表示下面的else if代碼塊已經被該線程占用,其他線程不能再進入~
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // ----------------------------------------------------------------------
                // CASE3: 這裡為什麼又要判斷呢? 
                // 為了防止其它線程已經初始化table完畢了,然後目前線程再次對其初始化..導緻丢失資料。
                // 如果條件成立,說明其它線程都沒有進入過這個if塊,目前線程就是具備初始化table權利了。
                if ((tab = table) == null || tab.length == 0) {
                    // sc大于等于0的情況如下:
                    //(3)=0,預設值,後續在真正初始化table的時候使用,設定為預設容量DEFAULT_CAPACITY --> 16。
                    //(4)>0,将sizeCtl設定為table初始容量或擴容完成後的下一次擴容的門檻。
                    // 如果sc大于0,則建立table時使用sc為指定table初始容量大小,
                    // 否則使用16預設值DEFAULT_CAPACITY
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 建立新數組nt
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 将新數組nt指派給table、tab
                    table = tab = nt;
                    // sc設定為下次散清單擴容的門檻:0.75n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 将sc指派給sizeCtl,分為一下2種情況:
                // 1、目前線程既通過了CASE2的判斷,也通過了CASE3的判斷:
                // 則,目前線程是第一次建立map.table的線程,那麼,sc就表示下一次擴容的門檻值。
                // 2、當線程通過了CASE2的判斷,但是沒有通過CASE3的判斷:
                // 則,目前線程并不是第一次建立map.table的線程,目前線程通過CASE2的判斷時,将
                // sizeCtl設定為了-1 ,那麼在該線程結束上面的代碼邏輯之後,需要将sc修改回進入CASE2之前的sc值。
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
      

小節:

(1)使用CAS鎖控制隻有一個線程初始化桶數組;

(2)sizeCtl在初始化後存儲的是擴容門檻;

(3)擴容門檻寫死的是桶數組大小的0.75倍,桶數組大小即map的容量,也就是最多存儲多少個元素。

3、addCount方法(難點)

在閱讀addCount方法源碼之前,最好再去熟悉下LongAdder源碼:LongAdder源碼解析。

addCount方法作用:每次添加元素後,元素數量加1,并判斷是否達到擴容門檻,達到了則進行擴容或協助擴容。

/**
 * Map中的元素資料量累加方法:有額外的以下功能
 * 1.統計目前table一共有多少資料
 * 2.并判斷是否達到擴容門檻值标準,觸發擴容
 */
private final void addCount(long x, int check) {
    // as 表示 LongAdder.cells
    // b 表示LongAdder.base
    // s 表示目前map.table中元素的數量
    CounterCell[] as; long b, s;
    // -------------------------統計目前table一共有多少資料----------------------------------
    // CASE1: 
    // (as = counterCells) != null
    // 條件一:true->表示cells已經初始化了,目前線程應該去使用hash尋址找到合适的cell 去累加資料
    //        false->表示目前線程應該直接将資料累加到base(沒有線程競争)
    // !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
    // 條件二:false->表示寫base成功,資料累加到base中了,目前競争不激烈,不需要建立cells
    //        true->表示寫base失敗,與其他線程在base上發生了競争,目前線程應該去嘗試建立cells。
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 有幾種情況進入到if塊中?
        // 條件一為true->表示cells已經初始化了,目前線程應該去使用hash尋址找到合适的cell去累加資料
        // 條件二為true->表示寫base失敗,與其他線程在base上發生了競争,目前線程應該去嘗試建立cells。

        // a 表示目前線程hash尋址命中的cell
        CounterCell a;
        // v 表示目前線程寫cell時的期望值
        long v;
        // m 表示目前cells數組的長度
        int m;
        // true -> 未發生線程競争  false->發生線程競争
        boolean uncontended = true;

        // ---------------------------------------------------------------------------
        // CASE2: 
        // 條件一:as == null || (m = as.length - 1) < 0
        // true-> 表示目前線程是通過寫base競争失敗(CASE1的條件二),然後進入的if塊,就需要調用fullAddCount方法去擴容或者重試.. 
        // (fullAddCount方法就類似于LongAdder.longAccumulate方法)
        // 條件二:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置條件:cells已經初始化了
        // true->表示目前線程命中的cell表格是個空的,需要目前線程進入fullAddCount方法去初始化cell,放入目前位置.
        // 條件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
        // 前置條件:條件二中目前線程命中的cell表格不是空的~
        //       false->取反得到false,表示目前線程使用cas方式更新目前命中的cell成功
        //       true->取反得到true,表示目前線程使用cas方式更新目前命中的cell失敗,需要進入fullAddCount進行重試或者擴容cells。
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
        ) {
            fullAddCount(x, uncontended);
            // 考慮到fullAddCount裡面的事情比較累,就讓目前線程不參與到擴容相關的邏輯了,直接傳回到調用點。
            return;
        }
        
        if (check <= 1)
            return;

        // sumCount統計目前散清單元素個數sum = base + cells[0] + ... + cells[n],這是一個期望值
        s = sumCount();
    }
    
    // -------------------------判斷是否達到擴容門檻值标準,觸發擴容----------------------------
    // CASE3: 
    // check >= 0表示一定是一個put操作調用的addCount,check < 0是remove操作調用的addCount方法
    if (check >= 0) {
        // tab 表示 map.table
        // nt 表示 map.nextTable
        // n 表示map.table數組的長度
        // sc 表示sizeCtl的臨時值
        Node<K,V>[] tab, nt; int n, sc;

        /**
         * sizeCtl < 0
         * 1. -1 表示目前table正在初始化(有線程在建立table數組),目前線程需要自旋等待..
         * 2.表示目前table數組正在進行擴容 ,高16位表示:擴容的辨別戳   低16位表示:(1 + nThread) 目前參與并發擴容的線程數量
         *
         * sizeCtl = 0,表示建立table數組時 使用DEFAULT_CAPACITY為大小
         *
         * sizeCtl > 0
         *
         * 1. 如果table未初始化,表示初始化大小
         * 2. 如果table已經初始化,表示下次擴容時的 觸發條件(門檻值)
         */

        // 有條件自旋~
        // 條件一:s >= (long)(sc = sizeCtl)
        //        true -> 1.目前sizeCtl為一個負數 表示正在擴容中..
        //                2.目前sizeCtl是一個正數,表示擴容門檻值,但是s已經達到擴容門檻值(需要擴容)
        //        false -> 表示目前table尚未達到擴容門檻值條件(不需要擴容)
        // 條件二:(tab = table) != null 恒成立 true
        // 條件三:(n = tab.length) < MAXIMUM_CAPACITY
        //        true -> 目前table長度小于最大值限制,則可以進行擴容。
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {

            // 擷取擴容唯一辨別戳
            // eg: 16 -> 32 擴容辨別戳為:1000 0000 0001 1011
            // 什麼意思呢?
            // 即,所有将map容量由16擴容到32的線程,其拿到的擴容唯一辨別戳都是1000 0000 0001 1011
            int rs = resizeStamp(n);
            
            // --------------------------------------------------------------------------
            // CASE4: 
            // 條件成立:表示目前table正在擴容,則,目前線程理論上應該協助table完成擴容
            if (sc < 0) {
                // --------------------------------------------------------------
                // CASE2: 條件1~4隻要有個為true就會跳出循環,不會繼續擴容~
                // 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
                //       true -> 說明目前線程擷取到的擴容唯一辨別戳 非 本批次擴容
                //       false -> 說明目前線程擷取到的擴容唯一辨別戳 是 本批次擴容
                // 條件二:JDK1.8 中有bug,jira已經提出來了,其實想表達的是 sc == (rs << 16 ) + 1
                //        true -> 表示擴容完畢,目前線程不需要再參與進來了
                //        false -> 擴容還在進行中,目前線程可以參與
                // 條件三:JDK1.8 中有bug,jira已經提出來了,其實想表達的是:
                // sc == (rs << 16) + MAX_RESIZERS
                //        true-> 表示目前參與并發擴容的線程達到了最大值 65535 - 1
                //        false->表示目前線程可以參與進來
                //條件四:(nt = nextTable) == null
                //        true -> 表示本次擴容結束
                //        false -> 擴容正在進行中
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    // 條件1~4隻要有個為true就會跳出循環,不會繼續擴容~
                    break;

                // --------------------------------------------------------------
                // CASE5: 
                // 前置條件:目前table正在執行擴容中(即,CASE4沒有被通過)目前線程有機會參與進擴容。
                // 條件成立:說明目前線程成功參與到擴容任務中,并且将sc低16位值加1,表示多了一個線程參與工作~
                // 條件失敗:
                // 1.目前有很多線程都在此處嘗試修改sizeCtl,有其它一個線程修改成功了
                // 導緻你的sc期望值與記憶體中的值不一緻,CAS修改失敗。(下次自選大機率不會在來到這裡~)
                // 2.transfer任務内部的線程也修改了sizeCtl。
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 擴容(遷移資料):協助擴容線程,持有nextTable參數,進入該方法協助擴容~
                    transfer(tab, nt);
            }
            // --------------------------------------------------------------------------
            // CASE6: 以CAS的方式去更新sc,更新成功傳回true,否則傳回false
            // 條件成立,說明目前線程是觸發擴容的**第一個**線程,在transfer方法需要做一些擴容準備工作:
            // rs << RESIZE_STAMP_SHIFT:将擴容唯一辨別戳左移16位 eg:
            // 1000 0000 0001 1011 << 16 得到 1000 0000 0001 1011 0000 0000 0000 0000
            // 緊接這 (rs << RESIZE_STAMP_SHIFT) + 2)操作:
            // 1000 0000 0001 1011 0000 0000 0000 0000 + 2 
            // => 1000 0000 0001 1011 0000 0000 0000 0010,這個二進制數有如下含義:
            // sizeCtl的高16位: 1000 0000 0001 1011 表示目前的擴容辨別戳
            // sizeCtl的低16位: 0000 0000 0000 0010 表示(1 + nThread),即,目前有多少個線程正在參與擴容~
            // 那麼這裡的n是怎麼來的呢? 
            // eg: (rs << RESIZE_STAMP_SHIFT) + 2 這裡的2,就是1 + 1,後面的1就是對1*Thread,即(1+1*Threads)
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 擴容(遷移資料):觸發擴容條件的線程 不持有nextTable
                transfer(tab, null);
            // 重新計算table中的元素個數
            s = sumCount();
        }
    }
}
      

(1)元素個數的存儲方式類似于LongAdder類,存儲在不同的段上,減少不同線程同時更新size時的沖突;

(2)計算元素個數時把這些段的值及baseCount相加算出總的元素個數;

(3)正常情況下sizeCtl存儲着擴容門檻,擴容門檻為容量的0.75倍;

(4)擴容時sizeCtl高位存儲擴容郵戳(resizeStamp),低位存儲擴容線程數加1(1+nThreads);

(5)其它線程添加元素後如果發現存在擴容,也會加入的擴容行列中來;

以上就是ConcurrentHashMap源碼的put存入資料的方法以及相關方法,由于transfer 遷移資料這個方法比較複雜,我們放在下一篇文章中單獨分析~