天天看點

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

技術文章第一時間送達!

SkipListSkipList的特性SkipList的查找SkipList的插入SkipList的删除ConcurrentSkipListMapput操作get操作remove操作size操作

到目前為止,我們在Java世界裡看到了兩種實作key-value的資料結構:Hash、TreeMap,這兩種資料結構各自都有着優缺點。

  1. Hash表:插入、查找最快,為O(1);如使用連結清單實作則可實作無鎖;資料有序化需要顯式的排序操作。
  2. 紅黑樹:插入、查找為O(logn),但常數項較小;無鎖實作的複雜性很高,一般需要加鎖;資料天然有序。

然而,這次介紹第三種實作key-value的資料結構:SkipList。SkipList有着不低于紅黑樹的效率,但是其原理和實作的複雜度要比紅黑樹簡單多了。

SkipList

什麼是SkipList?Skip List ,稱之為跳表,它是一種可以替代平衡樹的資料結構,其資料元素預設按照key值升序,天然有序。Skip list讓已排序的資料分布在多層連結清單中,以0-1随機數決定一個資料的向上攀升與否,通過“空間來換取時間”的一個算法,在每個節點中增加了向前的指針,在插入、删除、查找時可以忽略一些不可能涉及到的結點,進而提高了效率。

我們先看一個簡單的連結清單,如下:

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

1499585893174201707090001

如果我們需要查詢9、21、30,則需要比較次數為3 + 6 + 8 = 17 次,那麼有沒有優化方案呢?有!我們将該連結清單中的某些元素提煉出來作為一個比較“索引”,如下:

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

1499586063109201707090002

我們先與這些索引進行比較來決定下一個元素是往右還是下走,由于存在“索引”的緣故,導緻在檢索的時候會大大減少比較的次數。當然元素不是很多,很難展現出優勢,當元素足夠多的時候,這種索引結構就會大顯身手。

SkipList的特性

SkipList具備如下特性:

  1. 由很多層結構組成,level是通過一定的機率随機産生的
  2. 每一層都是一個有序的連結清單,預設是升序,也可以根據建立映射時所提供的Comparator進行排序,具體取決于使用的構造方法
  3. 最底層(Level 1)的連結清單包含所有元素
  4. 如果一個元素出現在Level i 的連結清單中,則它在Level i 之下的連結清單也都會出現
  5. 每個節點包含兩個指針,一個指向同一連結清單中的下一個元素,一個指向下面一層的元素

我們将上圖再做一些擴充就可以變成一個典型的SkipList結構了

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

1499590828559201707090003

SkipList的查找

SkipListd的查找算法較為簡單,對于上面我們我們要查找元素21,其過程如下:

  1. 比較3,大于,往後找(9),
  2. 比9大,繼續往後找(25),但是比25小,則從9的下一層開始找(16)
  3. 16的後面節點依然為25,則繼續從16的下一層找
  4. 找到21

如圖

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

201707090004

紅色虛線代表路徑。

SkipList的插入

SkipList的插入操作主要包括:

  1. 查找合适的位置。這裡需要明确一點就是在确認新節點要占據的層次K時,采用丢硬币的方式,完全随機。如果占據的層次K大于連結清單的層次,則重新申請新的層,否則插入指定層次
  2. 申請新的節點
  3. 調整指針

假定我們要插入的元素為23,經過查找可以确認她是位于25後,9、16、21前。當然需要考慮申請的層次K。

如果層次K > 3

需要申請新層次(Level 4)

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

201707090005

如果層次 K = 2

直接在Level 2 層插入即可

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

201707090006

這裡會涉及到以個算法:通過丢硬币決定層次K,該算法我們通過後面ConcurrentSkipListMap源碼來分析。還有一個需要注意的地方就是,在K層插入元素後,需要確定所有小于K層的層次都應該出現新節點。

SkipList的删除

删除節點和插入節點思路基本一緻:找到節點,删除節點,調整指針。

比如删除節點9,如下:

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

201707090007

ConcurrentSkipListMap

通過上面我們知道SkipList采用空間換時間的算法,其插入和查找的效率O(logn),其效率不低于紅黑樹,但是其原理和實作的複雜度要比紅黑樹簡單多了。一般來說會操作連結清單List,就會對SkipList毫無壓力。

ConcurrentSkipListMap其内部采用SkipLis資料結構實作。為了實作SkipList,ConcurrentSkipListMap提供了三個内部類來建構這樣的連結清單結構:Node、Index、HeadIndex。其中Node表示最底層的單連結清單有序節點、Index表示為基于Node的索引層,HeadIndex用來維護索引層次。到這裡我們可以這樣說ConcurrentSkipListMap是通過HeadIndex維護索引層次,通過Index從最上層開始往下層查找,一步一步縮小查詢範圍,最後到達最底層Node時,就隻需要比較很小一部分資料了。在JDK中的關系如下圖:

【死磕 Java 并發】—– J.U.C 之 Java 并發容器:ConcurrentLinkedQueue

201707090008

** Node **

static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile ConcurrentSkipListMap.Node<K, V> next;

        /** 省略些許代碼 */
    }
           

Node的結構和一般的單連結清單毫無差別,key-value和一個指向下一個節點的next。

Index

static class Index<K,V> {
        final ConcurrentSkipListMap.Node<K,V> node;
        final ConcurrentSkipListMap.Index<K,V> down;
        volatile ConcurrentSkipListMap.Index<K,V> right;

        /** 省略些許代碼 */
    }
           

Index提供了一個基于Node節點的索引Node,一個指向下一個Index的right,一個指向下層的down節點。

HeadIndex

static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;  //索引層,從1開始,Node單連結清單層為0
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }
           

HeadIndex内部就一個level來定義層級。

ConcurrentSkipListMap提供了四個構造函數,每個構造函數都會調用initialize()方法進行初始化工作。

final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
                null, null, 1);
    }
           

注意,initialize()方法不僅僅隻在構造函數中被調用,如clone,clear、readObject時都會調用該方法進行初始化步驟。這裡需要注意randomSeed的初始化。

private transient int randomSeed;
 randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
           

randomSeed一個簡單的随機數生成器(在後面介紹)。

put操作

CoucurrentSkipListMap提供了put()方法用于将指定值與此映射中的指定鍵關聯。源碼如下:

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }
           

首先判斷value如果為null,則抛出NullPointerException,否則調用doPut方法,其實如果各位看過JDK的源碼的話,應該對這樣的操作很熟悉了,JDK源碼裡面很多方法都是先做一些必要性的驗證後,然後通過調用do**()方法進行真正的操作。

doPut()方法内容較多,我們分步分析。

private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        // 比較器
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

            /** 省略代碼 */
           

doPut()方法有三個參數,除了key,value外還有一個boolean類型的onlyIfAbsent,該參數作用與如果存在目前key時,該做何動作。當onlyIfAbsent為false時,替換value,為true時,則傳回該value。用代碼解釋為:

if (!map.containsKey(key))
      return map.put(key, value);
  else
       return map.get(key);
           

首先判斷key是否為null,如果為null,則抛出NullPointerException,從這裡我們可以确認ConcurrentSkipList是不支援key或者value為null的。然後調用findPredecessor()方法,傳入key來确認位置。findPredecessor()方法其實就是确認key要插入的位置。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            // 從head節點開始,head是level最進階别的headIndex
            for (Index<K,V> q = head, r = q.right, d;;) {

                // r != null,表示該節點右邊還有節點,需要比較
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    // value == null,表示該節點已經被删除了
                    // 通過unlink()方法過濾掉該節點
                    if (n.value == null) {
                        //删掉r節點
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

                    // value != null,節點存在
                    // 如果key 大于r節點的key 則往前進一步
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                // 到達最右邊,如果dowm == null,表示指針已經達到最下層了,直接傳回該節點
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }
           

findPredecessor()方法意思非常明确:尋找前輩。從最高層的headIndex開始向右一步一步比較,直到right為null或者右邊節點的Node的key大于目前key為止,然後再向下尋找,依次重複該過程,直到down為null為止,即找到了前輩,看傳回的結果注意是Node,不是Item,是以插入的位置應該是最底層的Node連結清單。

在這個過程中ConcurrentSkipListMap賦予了該方法一個其他的功能,就是通過判斷節點的value是否為null,如果為null,表示該節點已經被删除了,通過調用unlink()方法删除該節點。

final boolean unlink(Index<K,V> succ) {
            return node.value != null && casRight(succ, succ.right);
        }
           

删除節點過程非常簡單,更改下right指針即可。

通過findPredecessor()找到前輩節點後,做什麼呢?看下面:

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        // 前輩節點的next != null
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;

            // 不一緻讀,主要原因是并發,有節點捷足先登
            if (n != b.next)               // inconsistent read
                break;

            // n.value == null,該節點已經被删除了
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }

            // 前輩節點b已經被删除
            if (b.value == null || v == n) // b is deleted
                break;

            // 節點大于,往前移
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }

            // c == 0 表示,找到一個key相等的節點,根據onlyIfAbsent參數來做判斷
            // onlyIfAbsent ==false,則通過casValue,替換value
            // onlyIfAbsent == true,傳回該value
            if (c == 0) {
                if (onlyIfAbsent || n.casValue(v, value)) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
        }

        // 将key-value包裝成一個node,插入
        z = new Node<K,V>(key, value, n);
        if (!b.casNext(n, z))
            break;         // restart if lost race to append to b
        break outer;
    }
           

找到合适的位置後,就是在該位置插入節點咯。插入節點的過程比較簡單,就是将key-value包裝成一個Node,然後通過casNext()方法加入到連結清單當中。當然是插入之前需要進行一系列的校驗工作。

在最下層插入節點後,下一步工作是什麼?建立索引。前面部落客提過,在插入節點的時候,會根據采用抛硬币的方式來決定新節點所插入的層次,由于存在并發的可能,ConcurrentSkipListMap采用ThreadLocalRandom來生成随機數。如下:

int rnd = ThreadLocalRandom.nextSecondarySeed();
           

抛硬币決定層次的思想很簡單,就是通過抛硬币如果硬币為正面則層次level + 1 ,否則停止,如下:

// 抛硬币決定層次
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
           

在闡述SkipList插入節點的時候說明了,決定的層次level會分為兩種情況進行處理,一是如果層次level大于最大的層次話則需要新增一層,否則就在相應層次以及小于該level的層次進行節點新增處理。

level <= headIndex.level

// 如果決定的層次level比最高層次head.level小,直接生成最高層次的index
            // 由于需要确認每一層次的down,是以需要從最下層依次往上生成
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
            }
           

從底層開始,小于level的每一層都初始化一個index,每次的node都指向新加入的node,down指向下一層的item,右側next全部為null。整個處理過程非常簡單:為小于level的每一層初始化一個index,然後加入到原來的index鍊條中去。

level > headIndex.level

// leve > head.level 則新增一層
            else { // try to grow by one level
                // 新增一層
                level = max + 1;

                // 初始化 level個item節點
                @SuppressWarnings("unchecked")
                ConcurrentSkipListMap.Index<K,V>[] idxs =
                        (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

                //
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    // 層次擴大了,需要重新開始(有新線程節點加入)
                    if (level <= oldLevel) // lost race to add level
                        break;
                    // 新的頭結點HeadIndex
                    ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
                    ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
                    // 生成新的HeadIndex節點,該HeadIndex指向新增層次
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

                    // HeadIndex CAS替換
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
           

當抛硬币決定的level大于最大層次level時,需要新增一層進行處理。處理邏輯如下:

  1. 初始化一個對應的index數組,大小為level + 1,然後為每個機關都建立一個index,個中參數為:Node為新增的Z,down為下一層index,right為null
  2. 通過for循環來進行擴容操作。從最高層進行處理,新增一個HeadIndex,個中參數:節點Node,down都為最高層的Node和HeadIndex,right為剛剛建立的對應層次的index,level為相對應的層次level。最後通過CAS把目前的head與新加入層的head進行替換。

通過上面步驟我們發現,盡管已經找到了前輩節點,也将node插入了,也确定确定了層次并生成了相應的Index,但是并沒有将這些Index插入到相應的層次當中,是以下面的代碼就是将index插入到相對應的層當中。

// 從插入的層次level開始
    splice: for (int insertionLevel = level;;) {
        int j = h.level;
        //  從headIndex開始
        for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
            if (q == null || t == null)
                break splice;

            // r != null;這裡是找到相應層次的插入節點位置,注意這裡隻橫向找
            if (r != null) {
                ConcurrentSkipListMap.Node<K,V> n = r.node;

                int c = cpr(cmp, key, n.key);

                // n.value == null ,解除關系,r右移
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;
                    r = q.right;
                    continue;
                }

                // key > n.key 右移
                if (c > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            // 上面找到節點要插入的位置,這裡就插入
            // 目前層是最頂層
            if (j == insertionLevel) {
                // 建立聯系
                if (!q.link(r, t))
                    break; // restart
                if (t.node.value == null) {
                    findNode(key);
                    break splice;
                }
                // 标志的插入層 -- ,如果== 0 ,表示已經到底了,插入完畢,退出循環
                if (--insertionLevel == 0)
                    break splice;
            }

            // 上面節點已經插入完畢了,插入下一個節點
            if (--j >= insertionLevel && j < level)
                t = t.down;
            q = q.down;
            r = q.right;
        }
    }
           

這段代碼分為兩部分看,一部分是找到相應層次的該節點插入的位置,第二部分在該位置插入,然後下移。

至此,ConcurrentSkipListMap的put操作到此就結束了。代碼量有點兒多,這裡總結下:

  1. 首先通過findPredecessor()方法找到前輩節點Node
  2. 根據傳回的前輩節點以及key-value,建立Node節點,同時通過CAS設定next
  3. 設定節點Node,再設定索引節點。采取抛硬币方式決定層次,如果所決定的層次大于現存的最大層次,則新增一層,然後建立一個Item連結清單。
  4. 最後,将建立的Item連結清單插入到SkipList結構中。

get操作

相比于put操作 ,get操作會簡單很多,其過程其實就隻相當于put操作的第一步:

private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }
           

與put操作第一步相似,首先調用findPredecessor()方法找到前輩節點,然後順着right一直往右找即可,同時在這個過程中同樣承擔了一個删除value為null的節點的職責。

remove操作

remove操作為删除指定key節點,如下:

public V remove(Object key) {
        return doRemove(key, null);
    }
           

直接調用doRemove()方法,這裡remove有兩個參數,一個是key,另外一個是value,是以doRemove方法即提供remove key,也提供同時滿足key-value。

final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;

                // 不一緻讀,重新開始
                if (n != b.next)                    // inconsistent read
                    break;

                // n節點已删除
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }

                // b節點已删除
                if (b.value == null || v == n)      // b is deleted
                    break;

                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;

                // 右移
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }

                /*
                 * 找到節點
                 */

                // value != null 表示需要同時校驗key-value值
                if (value != null && !value.equals(v))
                    break outer;

                // CAS替換value
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    // 清理節點
                    findPredecessor(key, cmp);      // clean index

                    // head.right == null表示該層已經沒有節點,删掉該層
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }
           

調用findPredecessor()方法找到前輩節點,然後通過右移,然後比較,找到後利用CAS把value替換為null,然後判斷該節點是不是這層唯一的index,如果是的話,調用tryReduceLevel()方法把這層幹掉,完成删除。

其實從這裡可以看出,remove方法僅僅是把Node的value設定null,并沒有真正删除該節點Node,其實從上面的put操作、get操作我們可以看出,他們在尋找節點的時候都會判斷節點的value是否為null,如果為null,則調用unLink()方法取消關聯關系,如下:

if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
           

size操作

ConcurrentSkipListMap的size()操作和ConcurrentHashMap不同,它并沒有維護一個全局變量來統計元素的個數,是以每次調用該方法的時候都需要去周遊。

public int size() {
        long count = 0;
        for (Node<K,V> n = findFirst(); n != null; n = n.next) {
            if (n.getValidValue() != null)
                ++count;
        }
        return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
    }
           

調用findFirst()方法找到第一個Node,然後利用node的next去統計。最後傳回統計資料,最多能傳回Integer.MAX_VALUE。注意這裡線上程并發下是安全的。

ConcurrentSkipListMap過程其實不複雜,相比于ConcurrentHashMap而言,是簡單的不能再簡單了。對跳表SkipList熟悉的話,ConcurrentSkipListMap 應該是盤中餐了。