天天看點

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

今天發一篇”水文”,可能很多讀者都會表示不了解,不過我想把它作為并發序列文章中不可缺少的一塊來介紹。本來以為花不了多少時間的,不過最終還是投入了挺多時間來完成這篇文章的。

網上關于 HashMap 和 ConcurrentHashMap 的文章确實不少,不過缺斤少兩的文章比較多,是以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。

閱讀建議:四節基本上可以進行獨立閱讀,建議初學者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 順序進行閱讀,可适當降低閱讀門檻。

閱讀前提:本文分析的是源碼,是以至少讀者要熟悉它們的接口使用,同時,對于并發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查找相關資料。

Java7 HashMap

HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支援并發操作,是以源碼也非常簡單。

首先,我們用下面這張圖來介紹 HashMap 的結構。

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
這個僅僅是示意圖,因為沒有考慮到數組要擴容的情況,具體的後面再說。

大方向上,HashMap 裡面是一個數組,然後數組中每個元素是一個單向連結清單。

上圖中,每個綠色的實體是嵌套類 Entry 的執行個體,Entry 包含四個屬性:key, value, hash 值和用于單向連結清單的 next。

capacity:目前數組容量,始終保持 2^n,可以擴容,擴容後數組大小為目前的 2 倍。

loadFactor:負載因子,預設為 0.75。

threshold:擴容的門檻值,等于 capacity * loadFactor

put 過程分析

還是比較簡單的,跟着代碼走一遍吧。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

public

V put(K key, V value) {

// 當插入第一個元素的時候,需要先初始化數組大小

if

(table == EMPTY_TABLE) {

inflateTable(threshold);

}

// 如果 key 為 null,感興趣的可以往裡看,最終會将這個 entry 放到 table[0] 中

if

(key ==

null

)

return

putForNullKey(value);

// 1. 求 key 的 hash 值

int

hash = hash(key);

// 2. 找到對應的數組下标

int

i = indexFor(hash, table.length);

// 3. 周遊一下對應下标處的連結清單,看是否有重複的 key 已經存在,

//    如果有,直接覆寫,put 方法傳回舊值就結束了

for

(Entry<K,V> e = table[i]; e !=

null

; e = e.next) {

Object k;

if

(e.hash == hash && ((k = e.key) == key || key.equals(k))) {

V oldValue = e.value;

e.value = value;

e.recordAccess(

this

);

return

oldValue;

}

}

modCount++;

// 4. 不存在重複的 key,将此 entry 添加到連結清單中,細節後面說

addEntry(hash, key, value, i);

return

null

;

}

數組初始化

在第一個元素插入 HashMap 的時候做一次數組的初始化,就是先确定初始的數組大小,并計算數組擴容的門檻值。

private

void

inflateTable(

int

toSize) {

// 保證數組大小一定是 2 的 n 次方。

// 比如這樣初始化:new HashMap(20),那麼處理成初始數組大小是 32

int

capacity = roundUpToPowerOf2(toSize);

// 計算擴容門檻值:capacity * loadFactor

threshold = (

int

) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY +

1

);

// 算是初始化數組吧

table =

new

Entry[capacity];

initHashSeedAsNeeded(capacity);

//ignore

}

這裡有一個将數組大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,隻不過實作的代碼稍微有些不同,後面再看到的時候就知道了。

計算具體數組位置

這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對數組長度進行取模就可以了。

static

int

indexFor(

int

hash,

int

length) {

// assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";

return

hash & (length-

1

);

}

這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在數組長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在數組中的下标位置。

添加節點到連結清單中

找到數組下标後,會先進行 key 判重,如果沒有重複,就準備将新值放入到連結清單的表頭。

void

addEntry(

int

hash, K key, V value,

int

bucketIndex) {

// 如果目前 HashMap 大小已經達到了門檻值,并且新值要插入的數組位置已經有元素了,那麼要擴容

if

((size >= threshold) && (

null

!= table[bucketIndex])) {

// 擴容,後面會介紹一下

resize(

2

* table.length);

// 擴容以後,重新計算 hash 值

hash = (

null

!= key) ? hash(key) :

;

// 重新計算擴容後的新的下标

bucketIndex = indexFor(hash, table.length);

}

// 往下看

createEntry(hash, key, value, bucketIndex);

}

// 這個很簡單,其實就是将新值放到連結清單的表頭,然後 size++

void

createEntry(

int

hash, K key, V value,

int

bucketIndex) {

Entry<K,V> e = table[bucketIndex];

table[bucketIndex] =

new

Entry<>(hash, key, value, e);

size++;

}

這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再将這個新的資料插入到擴容後的數組的相應位置處的連結清單的表頭。

數組擴容

前面我們看到,在插入新值的時候,如果目前的 size 已經達到了門檻值,并且要插入的數組位置上已經有元素,那麼就會觸發擴容,擴容後,數組大小為原來的 2 倍。

void

resize(

int

newCapacity) {

Entry[] oldTable = table;

int

oldCapacity = oldTable.length;

if

(oldCapacity == MAXIMUM_CAPACITY) {

threshold = Integer.MAX_VALUE;

return

;

}

// 新的數組

Entry[] newTable =

new

Entry[newCapacity];

// 将原來數組中的值遷移到新的更大的數組中

transfer(newTable, initHashSeedAsNeeded(newCapacity));

table = newTable;

threshold = (

int

)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY +

1

);

}

擴容就是用一個新的大數組替換原來的小數組,并将原來數組中的值遷移到新的數組中。

由于是雙倍擴容,遷移過程中,會将原來 table[i] 中的連結清單的所有節點,分拆到新的數組的 newTable[i] 和 newTable[i + oldLength] 位置上。如原來數組長度是 16,那麼擴容後,原來 table[0] 處的連結清單中的所有元素會被配置設定到新數組中 newTable[0] 和 newTable[16] 這兩個位置。代碼比較簡單,這裡就不展開了。

get 過程分析

相對于 put 過程,get 過程是非常簡單的。

  1. 根據 key 計算 hash 值。
  2. 找到相應的數組下标:hash & (length – 1)。
  3. 周遊該數組位置處的連結清單,直到找到相等(==或equals)的 key。

public

V get(Object key) {

// 之前說過,key 為 null 的話,會被放到 table[0],是以隻要周遊下 table[0] 處的連結清單就可以了

if

(key ==

null

)

return

getForNullKey();

//

Entry<K,V> entry = getEntry(key);

return

null

== entry ?

null

: entry.getValue();

}

getEntry(key):

final

Entry<K,V> getEntry(Object key) {

if

(size ==

) {

return

null

;

}

int

hash = (key ==

null

) ?

: hash(key);

// 确定數組下标,然後從頭開始周遊連結清單,直到找到為止

for

(Entry<K,V> e = table[indexFor(hash, table.length)];

e !=

null

;

e = e.next) {

Object k;

if

(e.hash == hash &&

((k = e.key) == key || (key !=

null

&& key.equals(k))))

return

e;

}

return

null

;

}

Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援并發操作,是以要複雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,是以很多地方都會将其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。

簡單了解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,是以每次需要加鎖的操作鎖住的是一個 segment,這樣隻要保證每個 Segment 是線程安全的,也就實作了全局的線程安全。

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

concurrencyLevel:并行級别、并發數、Segment 數,怎麼翻譯不重要,了解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,是以理論上,這個時候,最多可以同時支援 16 個線程并發寫,隻要它們的操作分别分布在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 内部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證線程安全,是以處理起來要麻煩些。

初始化

initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 數組不可以擴容,是以這個負載因子是給每個 Segment 内部使用的。

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

public

ConcurrentHashMap(

int

initialCapacity,

float

loadFactor,

int

concurrencyLevel) {

if

(!(loadFactor >

) || initialCapacity <

|| concurrencyLevel <=

)

throw

new

IllegalArgumentException();

if

(concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments

int

sshift =

;

int

ssize =

1

;

// 計算并行級别 ssize,因為要保持并行級别是 2 的 n 次方

while

(ssize < concurrencyLevel) {

++sshift;

ssize <<=

1

;

}

// 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4

// 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值

this

.segmentShift =

32

- sshift;

this

.segmentMask = ssize -

1

;

if

(initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY;

// initialCapacity 是設定整個 map 初始的大小,

// 這裡根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小

// 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個

int

c = initialCapacity / ssize;

if

(c * ssize < initialCapacity)

++c;

// 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對于具體的槽上,

// 插入一個元素不至于擴容,插入第二個的時候才會擴容

int

cap = MIN_SEGMENT_TABLE_CAPACITY;

while

(cap < c)

cap <<=

1

;

// 建立 Segment 數組,

// 并建立數組的第一個元素 segment[0]

Segment<K,V> s0 =

new

Segment<K,V>(loadFactor, (

int

)(cap * loadFactor),

(HashEntry<K,V>[])

new

HashEntry[cap]);

Segment<K,V>[] ss = (Segment<K,V>[])

new

Segment[ssize];

// 往數組寫入 segment[0]

UNSAFE.putOrderedObject(ss, SBASE, s0);

// ordered write of segments[0]

this

.segments = ss;

}

初始化完成,我們得到了一個 Segment 數組。

我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:

  • Segment 數組長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始門檻值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至于為什麼要初始化 segment[0],後面的代碼會介紹
  • 目前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

我們先看 put 的主流程,對于其中的一些關鍵細節操作,後面會進行詳細介紹。

public

V put(K key, V value) {

Segment<K,V> s;

if

(value ==

null

)

throw

new

NullPointerException();

// 1. 計算 key 的 hash 值

int

hash = hash(key);

// 2. 根據 hash 值找到 Segment 數組中的位置 j

//    hash 是 32 位,無符号右移 segmentShift(28) 位,剩下低 4 位,

//    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的數組下标

int

j = (hash >>> segmentShift) & segmentMask;

// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,

// ensureSegment(j) 對 segment[j] 進行初始化

if

((s = (Segment<K,V>)UNSAFE.getObject         

// nonvolatile; recheck

(segments, (j << SSHIFT) + SBASE)) ==

null

)

//  in ensureSegment

s = ensureSegment(j);

// 3. 插入新值到 槽 s 中

return

s.put(key, hash, value,

false

);

}

第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 内部的 put 操作了。

Segment 内部是由 數組+連結清單 組成的。

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

final

V put(K key,

int

hash, V value,

boolean

onlyIfAbsent) {

// 在往該 segment 寫入前,需要先擷取該 segment 的獨占鎖

//    先看主流程,後面還會具體介紹這部分内容

HashEntry<K,V> node = tryLock() ?

null

:

scanAndLockForPut(key, hash, value);

V oldValue;

try

{

// 這個是 segment 内部的數組

HashEntry<K,V>[] tab = table;

// 再利用 hash 值,求應該放置的數組下标

int

index = (tab.length -

1

) & hash;

// first 是數組該位置處的連結清單的表頭

HashEntry<K,V> first = entryAt(tab, index);

// 下面這串 for 循環雖然很長,不過也很好了解,想想該位置沒有任何元素和已經存在一個連結清單這兩種情況

for

(HashEntry<K,V> e = first;;) {

if

(e !=

null

) {

K k;

if

((k = e.key) == key ||

(e.hash == hash && key.equals(k))) {

oldValue = e.value;

if

(!onlyIfAbsent) {

// 覆寫舊值

e.value = value;

++modCount;

}

break

;

}

// 繼續順着連結清單走

e = e.next;

}

else

{

// node 到底是不是 null,這個要看擷取鎖的過程,不過和這裡都沒有關系。

// 如果不為 null,那就直接将它設定為連結清單表頭;如果是null,初始化并設定為連結清單表頭。

if

(node !=

null

)

node.setNext(first);

else

node =

new

HashEntry<K,V>(hash, key, value, first);

int

c = count +

1

;

// 如果超過了該 segment 的門檻值,這個 segment 需要擴容

if

(c > threshold && tab.length < MAXIMUM_CAPACITY)

rehash(node);

// 擴容後面也會具體分析

else

// 沒有達到門檻值,将 node 放到數組 tab 的 index 位置,

// 其實就是将新的節點設定成原連結清單的表頭

setEntryAt(tab, index, node);

++modCount;

count = c;

oldValue =

null

;

break

;

}

}

}

finally

{

// 解鎖

unlock();

}

return

oldValue;

}

整體流程還是比較簡單的,由于有獨占鎖的保護,是以 segment 内部的操作并不複雜。至于這裡面的并發問題,我們稍後再進行介紹。

到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

初始化槽: ensureSegment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對于其他槽來說,在插入第一個值的時候進行初始化。

這裡需要考慮并發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過隻要有一個成功了就可以。

private

Segment<K,V> ensureSegment(

int

k) {

final

Segment<K,V>[] ss =

this

.segments;

long

u = (k << SSHIFT) + SBASE;

// raw offset

Segment<K,V> seg;

if

((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) ==

null

) {

// 這裡看到為什麼之前要初始化 segment[0] 了,

// 使用目前 segment[0] 處的數組長度和負載因子來初始化 segment[k]

// 為什麼要用“目前”,因為 segment[0] 可能早就擴容過了

Segment<K,V> proto = ss[

];

int

cap = proto.table.length;

float

lf = proto.loadFactor;

int

threshold = (

int

)(cap * lf);

// 初始化 segment[k] 内部的數組

HashEntry<K,V>[] tab = (HashEntry<K,V>[])

new

HashEntry[cap];

if

((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

==

null

) {

// 再次檢查一遍該槽是否被其他線程初始化了。

Segment<K,V> s =

new

Segment<K,V>(lf, threshold, tab);

// 使用 while 循環,内部用 CAS,目前線程成功設值或其他線程成功設值後,退出

while

((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

==

null

) {

if

(UNSAFE.compareAndSwapObject(ss, u,

null

, seg = s))

break

;

}

}

}

return

seg;

}

總的來說,ensureSegment(int k) 比較簡單,對于并發操作使用 CAS 進行控制。

我沒搞懂這裡為什麼要搞一個 while 循環,CAS 失敗不就代表有其他線程成功了嗎,為什麼要再進行判斷?

擷取寫入鎖: scanAndLockForPut

前面我們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速擷取該 segment 的獨占鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來擷取鎖。

下面我們來具體分析這個方法中是怎麼控制加鎖的。

private

HashEntry<K,V> scanAndLockForPut(K key,

int

hash, V value) {

HashEntry<K,V> first = entryForHash(

this

, hash);

HashEntry<K,V> e = first;

HashEntry<K,V> node =

null

;

int

retries = -

1

;

// negative while locating node

// 循環擷取鎖

while

(!tryLock()) {

HashEntry<K,V> f;

// to recheck first below

if

(retries <

) {

if

(e ==

null

) {

if

(node ==

null

)

// speculatively create node

// 進到這裡說明數組該位置的連結清單是空的,沒有任何元素

// 當然,進到這裡的另一個原因是 tryLock() 失敗,是以該槽存在并發,不一定是該位置

node =

new

HashEntry<K,V>(hash, key, value,

null

);

retries =

;

}

else

if

(key.equals(e.key))

retries =

;

else

// 順着連結清單往下走

e = e.next;

}

// 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞隊列等待鎖

//    lock() 是阻塞方法,直到擷取鎖後傳回

else

if

(++retries > MAX_SCAN_RETRIES) {

lock();

break

;

}

else

if

((retries &

1

) ==

&&

// 這個時候是有大問題了,那就是有新的元素進到了連結清單,成為了新的表頭

//     是以這邊的政策是,相當于重新走一遍這個 scanAndLockForPut 方法

(f = entryForHash(

this

, hash)) != first) {

e = first = f;

// re-traverse if entry changed

retries = -

1

;

}

}

return

node;

}

這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。

這個方法就是看似複雜,但是其實就是做了一件事,那就是擷取該 segment 的獨占鎖,如果需要的話順便執行個體化了一下 node。

擴容: rehash

重複一下,segment 數組不能擴容,擴容是 segment 數組某個位置内部的數組 HashEntry\[] 進行擴容,擴容後,容量為原來的 2 倍。

首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導緻該 segment 的元素個數超過門檻值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。

該方法不需要考慮并發,因為到這裡的時候,是持有該 segment 的獨占鎖的。

60

// 方法參數上的 node 是這次擴容後,需要添加到新的數組中的資料。

private

void

rehash(HashEntry<K,V> node) {

HashEntry<K,V>[] oldTable = table;

int

oldCapacity = oldTable.length;

// 2 倍

int

newCapacity = oldCapacity <<

1

;

threshold = (

int

)(newCapacity * loadFactor);

// 建立新數組

HashEntry<K,V>[] newTable =

(HashEntry<K,V>[])

new

HashEntry[newCapacity];

// 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進制 ‘000...00011111’

int

sizeMask = newCapacity -

1

;

// 周遊原數組,老套路,将原數組位置 i 處的連結清單拆分到 新數組位置 i 和 i+oldCap 兩個位置

for

(

int

i =

; i < oldCapacity ; i++) {

// e 是連結清單的第一個元素

HashEntry<K,V> e = oldTable[i];

if

(e !=

null

) {

HashEntry<K,V> next = e.next;

// 計算應該放置在新數組中的位置,

// 假設原數組長度為 16,e 在 oldTable[3] 處,那麼 idx 隻可能是 3 或者是 3 + 16 = 19

int

idx = e.hash & sizeMask;

if

(next ==

null

)  

// 該位置處隻有一個元素,那比較好辦

newTable[idx] = e;

else

{

// Reuse consecutive sequence at same slot

// e 是連結清單表頭

HashEntry<K,V> lastRun = e;

// idx 是目前連結清單的頭結點 e 的新位置

int

lastIdx = idx;

// 下面這個 for 循環會找到一個 lastRun 節點,這個節點之後的所有元素是将要放到一起的

for

(HashEntry<K,V> last = next;

last !=

null

;

last = last.next) {

int

k = last.hash & sizeMask;

if

(k != lastIdx) {

lastIdx = k;

lastRun = last;

}

}

// 将 lastRun 及其之後的所有節點組成的這個連結清單放到 lastIdx 這個位置

newTable[lastIdx] = lastRun;

// 下面的操作是處理 lastRun 之前的節點,

//    這些節點可能配置設定在另一個連結清單中,也可能配置設定到上面的那個連結清單中

for

(HashEntry<K,V> p = e; p != lastRun; p = p.next) {

V v = p.value;

int

h = p.hash;

int

k = h & sizeMask;

HashEntry<K,V> n = newTable[k];

newTable[k] =

new

HashEntry<K,V>(h, p.key, v, n);

}

}

}

}

// 将新來的 node 放到新數組中剛剛的 兩個連結清單之一 的 頭部

int

nodeIndex = node.hash & sizeMask;

// add the new node

node.setNext(newTable[nodeIndex]);

newTable[nodeIndex] = node;

table = newTable;

}

這裡的擴容比之前的 HashMap 要複雜一些,代碼難懂一點。上面有兩個挨着的 for 循環,第一個 for 有什麼用呢?

仔細一看發現,如果沒有第一個 for 循環,也是可以工作的,但是,這個 for 循環下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們隻需要克隆 lastRun 前面的節點,後面的一串節點跟着 lastRun 走就是了,不需要做任何操作。

我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是連結清單的最後一個元素或者很靠後的元素,那麼這次周遊就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用預設的門檻值,大約隻有 1/6 的節點需要克隆。

相對于 put 來說,get 真的不要太簡單。

  1. 計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”
  2. 槽中也是一個數組,根據 hash 找到數組中具體的位置
  3. 到這裡是連結清單了,順着連結清單進行查找即可

public

V get(Object key) {

Segment<K,V> s;

// manually integrate access methods to reduce overhead

HashEntry<K,V>[] tab;

// 1. hash 值

int

h = hash(key);

long

u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

// 2. 根據 hash 找到對應的 segment

if

((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) !=

null

&&

(tab = s.table) !=

null

) {

// 3. 找到segment 内部數組相應位置的連結清單,周遊

for

(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile

(tab, ((

long

)(((tab.length -

1

) & h)) << TSHIFT) + TBASE);

e !=

null

; e = e.next) {

K k;

if

((k = e.key) == key || (e.hash == h && key.equals(k)))

return

e.value;

}

}

return

null

;

}

并發問題分析

現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮并發問題。

添加節點的操作 put 和删除節點的操作 remove 都是要加 segment 上的獨占鎖的,是以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。

  1. put 操作的線程安全性。
    • 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。
    • 添加節點到連結清單的操作是插入到表頭的,是以,如果這個時候 get 操作在連結清單周遊的過程已經到了中間,是不會影響的。當然,另一個并發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    • 擴容。擴容是新建立了數組,然後進行遷移資料,最後面将 newTable 設定給屬性 table。是以,如果 get 操作此時也在進行,那麼也沒關系,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
  2. remove 操作的線程安全性。

    remove 操作我們沒有分析源碼,是以這裡說的讀者感興趣的話還是需要到源碼中去求實一下的。

    get 操作需要周遊連結清單,但是 remove 操作會”破壞”連結清單。

    如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。

    如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要将頭結點的 next 設定為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 并不能提供數組内部操作的可見性保證,是以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要删除的節點不是頭結點,它會将要删除節點的後繼節點接到前驅節點中,這裡的并發保證就是 next 屬性是 volatile 的。

Java8 HashMap

Java8 對 HashMap 進行了一些修改,最大的不同就是利用了紅黑樹,是以其由 數組+連結清單+紅黑樹 組成。

根據 Java7 HashMap 的介紹,我們知道,查找的時候,根據 hash 值我們能夠快速定位到數組的具體下标,但是之後的話,需要順着連結清單一個個比較下去才能找到我們需要的,時間複雜度取決于連結清單的長度,為 O(n)。

為了降低這部分的開銷,在 Java8 中,當連結清單中的元素超過了 8 個以後,會将連結清單轉換為紅黑樹,在這些位置進行查找的時候可以降低時間複雜度為 O(logN)。

來一張圖簡單示意一下吧:

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
注意,上圖是示意圖,主要是描述結構,不會達到這個狀态的,因為這麼多資料的時候早就擴容了。

下面,我們還是用代碼來介紹吧,個人感覺,Java8 的源碼可讀性要差一些,不過精簡一些。

Java7 中使用 Entry 來代表每個 HashMap 中的資料節點,Java8 中使用 Node,基本沒有差別,都是 key,value,hash 和 next 這四個屬性,不過,Node 隻能用于連結清單的情況,紅黑樹的情況需要使用 TreeNode。

我們根據數組元素中,第一個節點資料類型是 Node 還是 TreeNode 來判斷該位置下是連結清單還是紅黑樹的。

61

62

63

public

V put(K key, V value) {

return

putVal(hash(key), key, value,

false

,

true

);

}

// 第三個參數 onlyIfAbsent 如果是 true,那麼隻有在不存在該 key 時才會進行 put 操作

// 第四個參數 evict 我們這裡不關心

final

V putVal(

int

hash, K key, V value,

boolean

onlyIfAbsent,

boolean

evict) {

Node<K,V>[] tab; Node<K,V> p;

int

n, i;

// 第一次 put 值的時候,會觸發下面的 resize(),類似 java7 的第一次 put 也要初始化數組長度

// 第一次 resize 和後續的擴容有些不一樣,因為這次是數組從 null 初始化到預設的 16 或自定義的初始容量

if

((tab = table) ==

null

|| (n = tab.length) ==

)

n = (tab = resize()).length;

// 找到具體的數組下标,如果此位置沒有值,那麼直接初始化一下 Node 并放置在這個位置就可以了

if

((p = tab[i = (n -

1

) & hash]) ==

null

)

tab[i] = newNode(hash, key, value,

null

);

else

{

// 數組該位置有資料

Node<K,V> e; K k;

// 首先,判斷該位置的第一個資料和我們要插入的資料,key 是不是"相等",如果是,取出這個節點

if

(p.hash == hash &&

((k = p.key) == key || (key !=

null

&& key.equals(k))))

e = p;

// 如果該節點是代表紅黑樹的節點,調用紅黑樹的插值方法,本文不展開說紅黑樹

else

if

(p

instanceof

TreeNode)

e = ((TreeNode<K,V>)p).putTreeVal(

this

, tab, hash, key, value);

else

{

// 到這裡,說明數組該位置上是一個連結清單

for

(

int

binCount =

; ; ++binCount) {

// 插入到連結清單的最後面(Java7 是插入到連結清單的最前面)

if

((e = p.next) ==

null

) {

p.next = newNode(hash, key, value,

null

);

// TREEIFY_THRESHOLD 為 8,是以,如果新插入的值是連結清單中的第 9 個

// 會觸發下面的 treeifyBin,也就是将連結清單轉換為紅黑樹

if

(binCount >= TREEIFY_THRESHOLD -

1

)

// -1 for 1st

treeifyBin(tab, hash);

break

;

}

// 如果在該連結清單中找到了"相等"的 key(== 或 equals)

if

(e.hash == hash &&

((k = e.key) == key || (key !=

null

&& key.equals(k))))

// 此時 break,那麼 e 為連結清單中[與要插入的新值的 key "相等"]的 node

break

;

p = e;

}

}

// e!=null 說明存在舊值的key與要插入的key"相等"

// 對于我們分析的put操作,下面這個 if 其實就是進行 "值覆寫",然後傳回舊值

if

(e !=

null

) {

V oldValue = e.value;

if

(!onlyIfAbsent || oldValue ==

null

)

e.value = value;

afterNodeAccess(e);

return

oldValue;

}

}

++modCount;

// 如果 HashMap 由于新插入這個值導緻 size 已經超過了門檻值,需要進行擴容

if

(++size > threshold)

resize();

afterNodeInsertion(evict);

return

null

;

}

和 Java7 稍微有點不一樣的地方就是,Java7 是先擴容後插入新值的,Java8 先插值再擴容,不過這個不重要。

resize() 方法用于初始化數組或數組擴容,每次擴容後,容量為原來的 2 倍,并進行資料遷移。

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

final

Node<K,V>[] resize() {

Node<K,V>[] oldTab = table;

int

oldCap = (oldTab ==

null

) ?

: oldTab.length;

int

oldThr = threshold;

int

newCap, newThr =

;

if

(oldCap >

) {

// 對應數組擴容

if

(oldCap >= MAXIMUM_CAPACITY) {

threshold = Integer.MAX_VALUE;

return

oldTab;

}

// 将數組大小擴大一倍

else

if

((newCap = oldCap <<

1

) < MAXIMUM_CAPACITY &&

oldCap >= DEFAULT_INITIAL_CAPACITY)

// 将門檻值擴大一倍

newThr = oldThr <<

1

;

// double threshold

}

else

if

(oldThr >

)

// 對應使用 new HashMap(int initialCapacity) 初始化後,第一次 put 的時候

newCap = oldThr;

else

{

// 對應使用 new HashMap() 初始化後,第一次 put 的時候

newCap = DEFAULT_INITIAL_CAPACITY;

newThr = (

int

)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);

}

if

(newThr ==

) {

float

ft = (

float

)newCap * loadFactor;

newThr = (newCap < MAXIMUM_CAPACITY && ft < (

float

)MAXIMUM_CAPACITY ?

(

int

)ft : Integer.MAX_VALUE);

}

threshold = newThr;

// 用新的數組大小初始化新的數組

Node<K,V>[] newTab = (Node<K,V>[])

new

Node[newCap];

table = newTab;

// 如果是初始化數組,到這裡就結束了,傳回 newTab 即可

if

(oldTab !=

null

) {

// 開始周遊原數組,進行資料遷移。

for

(

int

j =

; j < oldCap; ++j) {

Node<K,V> e;

if

((e = oldTab[j]) !=

null

) {

oldTab[j] =

null

;

// 如果該數組位置上隻有單個元素,那就簡單了,簡單遷移這個元素就可以了

if

(e.next ==

null

)

newTab[e.hash & (newCap -

1

)] = e;

// 如果是紅黑樹,具體我們就不展開了

else

if

(e

instanceof

TreeNode)

((TreeNode<K,V>)e).split(

this

, newTab, j, oldCap);

else

{

// 這塊是處理連結清單的情況,

// 需要将此連結清單拆成兩個連結清單,放到新的數組中,并且保留原來的先後順序

// loHead、loTail 對應一條連結清單,hiHead、hiTail 對應另一條連結清單,代碼還是比較簡單的

Node<K,V> loHead =

null

, loTail =

null

;

Node<K,V> hiHead =

null

, hiTail =

null

;

Node<K,V> next;

do

{

next = e.next;

if

((e.hash & oldCap) ==

) {

if

(loTail ==

null

)

loHead = e;

else

loTail.next = e;

loTail = e;

}

else

{

if

(hiTail ==

null

)

hiHead = e;

else

hiTail.next = e;

hiTail = e;

}

}

while

((e = next) !=

null

);

if

(loTail !=

null

) {

loTail.next =

null

;

// 第一條連結清單

newTab[j] = loHead;

}

if

(hiTail !=

null

) {

hiTail.next =

null

;

// 第二條連結清單的新的位置是 j + oldCap,這個很好了解

newTab[j + oldCap] = hiHead;

}

}

}

}

}

return

newTab;

}

相對于 put 來說,get 真的太簡單了。

  1. 計算 key 的 hash 值,根據 hash 值找到對應數組下标: hash & (length-1)
  2. 判斷數組該位置處的元素是否剛好就是我們要找的,如果不是,走第三步
  3. 判斷該元素類型是否是 TreeNode,如果是,用紅黑樹的方法取資料,如果不是,走第四步
  4. 周遊連結清單,直到找到相等(==或equals)的 key

public

V get(Object key) {

Node<K,V> e;

return

(e = getNode(hash(key), key)) ==

null

?

null

: e.value;

}

final

Node<K,V> getNode(

int

hash, Object key) {

Node<K,V>[] tab; Node<K,V> first, e;

int

n; K k;

if

((tab = table) !=

null

&& (n = tab.length) >

&&

(first = tab[(n -

1

) & hash]) !=

null

) {

// 判斷第一個節點是不是就是需要的

if

(first.hash == hash &&

// always check first node

((k = first.key) == key || (key !=

null

&& key.equals(k))))

return

first;

if

((e = first.next) !=

null

) {

// 判斷是否是紅黑樹

if

(first

instanceof

TreeNode)

return

((TreeNode<K,V>)first).getTreeNode(hash, key);

// 連結清單周遊

do

{

if

(e.hash == hash &&

((k = e.key) == key || (key !=

null

&& key.equals(k))))

return

e;

}

while

((e = e.next) !=

null

);

}

}

return

null

;

}

Java8 ConcurrentHashMap

Java7 中實作的 ConcurrentHashMap 說實話還是比較複雜的,Java8 對 ConcurrentHashMap 進行了比較大的改動。建議讀者可以參考 Java8 中 HashMap 相對于 Java7 HashMap 的改動,對于 ConcurrentHashMap,Java8 也引入了紅黑樹。

說實話,Java8 ConcurrentHashMap 源碼真心不簡單,最難的在于擴容,資料遷移操作不容易看懂。

我們先用一個示意圖來描述下其結構:

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

結構上和 Java8 的 HashMap 基本上一樣,不過它要保證線程安全性,是以在源碼上确實要複雜一些。

// 這構造函數裡,什麼都不幹

public

ConcurrentHashMap() {

}

public

ConcurrentHashMap(

int

initialCapacity) {

if

(initialCapacity <

)

throw

new

IllegalArgumentException();

int

cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>>

1

)) ?

MAXIMUM_CAPACITY :

tableSizeFor(initialCapacity + (initialCapacity >>>

1

) +

1

));

this

.sizeCtl = cap;

}

這個初始化方法有點意思,通過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然後向上取最近的 2 的 n 次方】。如 initialCapacity 為 10,那麼得到 sizeCtl 為 16,如果 initialCapacity 為 11,得到 sizeCtl 為 32。

sizeCtl 這個屬性使用的場景很多,不過隻要跟着文章的思路來,就不會被它搞暈了。

如果你愛折騰,也可以看下另一個有三個參數的構造方法,這裡我就不說了,大部分時候,我們會使用無參構造函數進行執行個體化,我們也按照這個思路來進行源碼分析吧。

仔細地一行一行代碼看下去:

87

88

89

90

91

public

V put(K key, V value) {

return

putVal(key, value,

false

);

}

final

V putVal(K key, V value,

boolean

onlyIfAbsent) {

if

(key ==

null

|| value ==

null

)

throw

new

NullPointerException();

// 得到 hash 值

int

hash = spread(key.hashCode());

// 用于記錄相應連結清單的長度

int

binCount =

;

for

(Node<K,V>[] tab = table;;) {

Node<K,V> f;

int

n, i, fh;

// 如果數組"空",進行數組初始化

if

(tab ==

null

|| (n = tab.length) ==

)

// 初始化數組,後面會詳細介紹

tab = initTable();

// 找該 hash 值對應的數組下标,得到第一個節點 f

else

if

((f = tabAt(tab, i = (n -

1

) & hash)) ==

null

) {

// 如果數組該位置為空,

//    用一次 CAS 操作将這個新值放入其中即可,這個 put 操作差不多就結束了,可以拉到最後面了

//          如果 CAS 失敗,那就是有并發操作,進到下一個循環就好了

if

(casTabAt(tab, i,

null

,

new

Node<K,V>(hash, key, value,

null

)))

break

;                  

// no lock when adding to empty bin

}

// hash 居然可以等于 MOVED,這個需要到後面才能看明白,不過從名字上也能猜到,肯定是因為在擴容

else

if

((fh = f.hash) == MOVED)

// 幫助資料遷移,這個等到看完資料遷移部分的介紹後,再了解這個就很簡單了

tab = helpTransfer(tab, f);

else

{

// 到這裡就是說,f 是該位置的頭結點,而且不為空

V oldVal =

null

;

// 擷取數組該位置的頭結點的螢幕鎖

synchronized

(f) {

if

(tabAt(tab, i) == f) {

if

(fh >=

) {

// 頭結點的 hash 值大于 0,說明是連結清單

// 用于累加,記錄連結清單的長度

binCount =

1

;

// 周遊連結清單

for

(Node<K,V> e = f;; ++binCount) {

K ek;

// 如果發現了"相等"的 key,判斷是否要進行值覆寫,然後也就可以 break 了

if

(e.hash == hash &&

((ek = e.key) == key ||

(ek !=

null

&& key.equals(ek)))) {

oldVal = e.val;

if

(!onlyIfAbsent)

e.val = value;

break

;

}

// 到了連結清單的最末端,将這個新值放到連結清單的最後面

Node<K,V> pred = e;

if

((e = e.next) ==

null

) {

pred.next =

new

Node<K,V>(hash, key,

value,

null

);

break

;

}

}

}

else

if

(f

instanceof

TreeBin) {

// 紅黑樹

Node<K,V> p;

binCount =

2

;

// 調用紅黑樹的插值方法插入新節點

if

((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

value)) !=

null

) {

oldVal = p.val;

if

(!onlyIfAbsent)

p.val = value;

}

}

}

}

// binCount != 0 說明上面在做連結清單操作

if

(binCount !=

) {

// 判斷是否要将連結清單轉換為紅黑樹,臨界值和 HashMap 一樣,也是 8

if

(binCount >= TREEIFY_THRESHOLD)

// 這個方法和 HashMap 中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉換,

// 如果目前數組的長度小于 64,那麼會選擇進行數組擴容,而不是轉換為紅黑樹

//    具體源碼我們就不看了,擴容部分後面說

treeifyBin(tab, i);

if

(oldVal !=

null

)

return

oldVal;

break

;

}

}

}

//

addCount(1L, binCount);

return

null

;

}

put 的主流程看完了,但是至少留下了幾個問題,第一個是初始化,第二個是擴容,第三個是幫助資料遷移,這些我們都會在後面進行一一介紹。

初始化數組:initTable

這個比較簡單,主要就是初始化一個合适大小的數組,然後會設定 sizeCtl。

初始化方法中的并發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。

private

final

Node<K,V>[] initTable() {

Node<K,V>[] tab;

int

sc;

while

((tab = table) ==

null

|| tab.length ==

) {

// 初始化的"功勞"被其他線程"搶去"了

if

((sc = sizeCtl) <

)

Thread.yield();

// lost initialization race; just spin

// CAS 一下,将 sizeCtl 設定為 -1,代表搶到了鎖

else

if

(U.compareAndSwapInt(

this

, SIZECTL, sc, -

1

)) {

try

{

if

((tab = table) ==

null

|| tab.length ==

) {

// DEFAULT_CAPACITY 預設初始容量是 16

int

n = (sc >

) ? sc : DEFAULT_CAPACITY;

// 初始化數組,長度為 16 或初始化時提供的長度

Node<K,V>[] nt = (Node<K,V>[])

new

Node<?,?>[n];

// 将這個數組指派給 table,table 是 volatile 的

table = tab = nt;

// 如果 n 為 16 的話,那麼這裡 sc = 12

// 其實就是 0.75 * n

sc = n - (n >>>

2

);

}

}

finally

{

// 設定 sizeCtl 為 sc,我們就當是 12 吧

sizeCtl = sc;

}

break

;

}

}

return

tab;

}

連結清單轉紅黑樹: treeifyBin

前面我們在 put 源碼分析也說過,treeifyBin 不一定就會進行紅黑樹轉換,也可能是僅僅做數組擴容。我們還是進行源碼分析吧。

private

final

void

treeifyBin(Node<K,V>[] tab,

int

index) {

Node<K,V> b;

int

n, sc;

if

(tab !=

null

) {

// MIN_TREEIFY_CAPACITY 為 64

// 是以,如果數組長度小于 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數組擴容

if

((n = tab.length) < MIN_TREEIFY_CAPACITY)

// 後面我們再詳細分析這個方法

tryPresize(n <<

1

);

// b 是頭結點

else

if

((b = tabAt(tab, index)) !=

null

&& b.hash >=

) {

// 加鎖

synchronized

(b) {

if

(tabAt(tab, index) == b) {

// 下面就是周遊連結清單,建立一顆紅黑樹

TreeNode<K,V> hd =

null

, tl =

null

;

for

(Node<K,V> e = b; e !=

null

; e = e.next) {

TreeNode<K,V> p =

new

TreeNode<K,V>(e.hash, e.key, e.val,

null

,

null

);

if

((p.prev = tl) ==

null

)

hd = p;

else

tl.next = p;

tl = p;

}

// 将紅黑樹設定到數組相應位置中

setTabAt(tab, index,

new

TreeBin<K,V>(hd));

}

}

}

}

}

擴容:tryPresize

如果說 Java8 ConcurrentHashMap 的源碼不簡單,那麼說的就是擴容操作和遷移操作。

這個方法要完完全全看懂還需要看之後的 transfer 方法,讀者應該提前知道這點。

這裡的擴容也是做翻倍擴容的,擴容後數組容量為原來的 2 倍。

// 首先要說明的是,方法參數 size 傳進來的時候就已經翻了倍了

private

final

void

tryPresize(

int

size) {

// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。

int

c = (size >= (MAXIMUM_CAPACITY >>>

1

)) ? MAXIMUM_CAPACITY :

tableSizeFor(size + (size >>>

1

) +

1

);

int

sc;

while

((sc = sizeCtl) >=

) {

Node<K,V>[] tab = table;

int

n;

// 這個 if 分支和之前說的初始化數組的代碼基本上是一樣的,在這裡,我們可以不用管這塊代碼

if

(tab ==

null

|| (n = tab.length) ==

) {

n = (sc > c) ? sc : c;

if

(U.compareAndSwapInt(

this

, SIZECTL, sc, -

1

)) {

try

{

if

(table == tab) {

@SuppressWarnings

(

"unchecked"

)

Node<K,V>[] nt = (Node<K,V>[])

new

Node<?,?>[n];

table = nt;

sc = n - (n >>>

2

);

// 0.75 * n

}

}

finally

{

sizeCtl = sc;

}

}

}

else

if

(c <= sc || n >= MAXIMUM_CAPACITY)

break

;

else

if

(tab == table) {

// 我沒看懂 rs 的真正含義是什麼,不過也關系不大

int

rs = resizeStamp(n);

if

(sc <

) {

Node<K,V>[] nt;

if

((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +

1

||

sc == rs + MAX_RESIZERS || (nt = nextTable) ==

null

||

transferIndex <=

)

break

;

// 2. 用 CAS 将 sizeCtl 加 1,然後執行 transfer 方法

//    此時 nextTab 不為 null

if

(U.compareAndSwapInt(

this

, SIZECTL, sc, sc +

1

))

transfer(tab, nt);

}

// 1. 将 sizeCtl 設定為 (rs << RESIZE_STAMP_SHIFT) + 2)

//     我是沒看懂這個值真正的意義是什麼?不過可以計算出來的是,結果是一個比較大的負數

//  調用 transfer 方法,此時 nextTab 參數為 null

else

if

(U.compareAndSwapInt(

this

, SIZECTL, sc,

(rs << RESIZE_STAMP_SHIFT) +

2

))

transfer(tab,

null

);

}

}

}

這個方法的核心在于 sizeCtl 值的操作,首先将其設定為一個負數,然後執行 transfer(tab, null),再下一個循環将 sizeCtl 加 1,并執行 transfer(tab, nt),之後可能是繼續 sizeCtl 加 1,并執行 transfer(tab, nt)。

是以,可能的操作就是執行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這裡怎麼結束循環的需要看完 transfer 源碼才清楚。

資料遷移:transfer

下面這個方法很點長,将原來的 tab 數組的元素遷移到新的 nextTab 數組中。

雖然我們之前說的 tryPresize 方法中多次調用 transfer 不涉及多線程,但是這個 transfer 方法可以在其他地方被調用,典型地,我們之前在說 put 方法的時候就說過了,請往上看 put 方法,是不是有個地方調用了 helpTransfer 方法,helpTransfer 方法會調用 transfer 方法的。

此方法支援多線程執行,外圍調用此方法的時候,會保證第一個發起資料遷移的線程,nextTab 參數為 null,之後再調用此方法的時候,nextTab 不會為 null。

閱讀源碼之前,先要了解并發操作的機制。原數組長度為 n,是以我們有 n 個遷移任務,讓每個線程每次負責一個小任務是最簡單的,每做完一個任務再檢測是否有其他沒做完的任務,幫助遷移就可以了,而 Doug Lea 使用了一個 stride,簡單了解就是步長,每個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務。是以,我們就需要一個全局的排程者來安排哪個線程執行哪幾個任務,這個就是屬性 transferIndex 的作用。

第一個發起資料遷移的線程會将 transferIndex 指向原數組最後的位置,然後從後往前的 stride 個任務屬于第一個線程,然後将 transferIndex 指向新的位置,再往前的 stride 個任務屬于第二個線程,依此類推。當然,這裡說的第二個線程不是真的一定指代了第二個線程,也可以是同一個線程,這個讀者應該能了解吧。其實就是将一個大的遷移任務分為了一個個任務包。

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

private

final

void

transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

int

n = tab.length, stride;

// stride 在單核下直接等于 n,多核模式下為 (n>>>3)/NCPU,最小值是 16

// stride 可以了解為”步長“,有 n 個位置是需要進行遷移的,

//   将這 n 個任務分為多個任務包,每個任務包有 stride 個任務

if

((stride = (NCPU >

1

) ? (n >>>

3

) / NCPU : n) < MIN_TRANSFER_STRIDE)

stride = MIN_TRANSFER_STRIDE;

// subdivide range

// 如果 nextTab 為 null,先進行一次初始化

//    前面我們說了,外圍會保證第一個發起遷移的線程調用此方法時,參數 nextTab 為 null

//       之後參與遷移的線程調用此方法時,nextTab 不會為 null

if

(nextTab ==

null

) {

try

{

// 容量翻倍

Node<K,V>[] nt = (Node<K,V>[])

new

Node<?,?>[n <<

1

];

nextTab = nt;

}

catch

(Throwable ex) {     

// try to cope with OOME

sizeCtl = Integer.MAX_VALUE;

return

;

}

// nextTable 是 ConcurrentHashMap 中的屬性

nextTable = nextTab;

// transferIndex 也是 ConcurrentHashMap 的屬性,用于控制遷移的位置

transferIndex = n;

}

int

nextn = nextTab.length;

// ForwardingNode 翻譯過來就是正在被遷移的 Node

// 這個構造方法會生成一個Node,key、value 和 next 都為 null,關鍵是 hash 為 MOVED

// 後面我們會看到,原數組中位置 i 處的節點完成遷移工作後,

//    就會将位置 i 處設定為這個 ForwardingNode,用來告訴其他線程該位置已經處理過了

//    是以它其實相當于是一個标志。

ForwardingNode<K,V> fwd =

new

ForwardingNode<K,V>(nextTab);

// advance 指的是做完了一個位置的遷移工作,可以準備做下一個位置的了

boolean

advance =

true

;

boolean

finishing =

false

;

// to ensure sweep before committing nextTab

/*

* 下面這個 for 循環,最難了解的在前面,而要看懂它們,應該先看懂後面的,然後再倒回來看

*

*/

// i 是位置索引,bound 是邊界,注意是從後往前

for

(

int

i =

, bound =

;;) {

Node<K,V> f;

int

fh;

// 下面這個 while 真的是不好了解

// advance 為 true 表示可以進行下一個位置的遷移了

//   簡單了解結局:i 指向了 transferIndex,bound 指向了 transferIndex-stride

while

(advance) {

int

nextIndex, nextBound;

if

(--i >= bound || finishing)

advance =

false

;

// 将 transferIndex 值賦給 nextIndex

// 這裡 transferIndex 一旦小于等于 0,說明原數組的所有位置都有相應的線程去處理了

else

if

((nextIndex = transferIndex) <=

) {

i = -

1

;

advance =

false

;

}

else

if

(U.compareAndSwapInt

(

this

, TRANSFERINDEX, nextIndex,

nextBound = (nextIndex > stride ?

nextIndex - stride :

))) {

// 看括号中的代碼,nextBound 是這次遷移任務的邊界,注意,是從後往前

bound = nextBound;

i = nextIndex -

1

;

advance =

false

;

}

}

if

(i <

|| i >= n || i + n >= nextn) {

int

sc;

if

(finishing) {

// 所有的遷移操作已經完成

nextTable =

null

;

// 将新的 nextTab 指派給 table 屬性,完成遷移

table = nextTab;

// 重新計算 sizeCtl:n 是原數組長度,是以 sizeCtl 得出的值将是新數組長度的 0.75 倍

sizeCtl = (n <<

1

) - (n >>>

1

);

return

;

}

// 之前我們說過,sizeCtl 在遷移前會設定為 (rs << RESIZE_STAMP_SHIFT) + 2

// 然後,每有一個線程參與遷移就會将 sizeCtl 加 1,

// 這裡使用 CAS 操作對 sizeCtl 進行減 1,代表做完了屬于自己的任務

if

(U.compareAndSwapInt(

this

, SIZECTL, sc = sizeCtl, sc -

1

)) {

// 任務結束,方法退出

if

((sc -

2

) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

return

;

// 到這裡,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,

// 也就是說,所有的遷移任務都做完了,也就會進入到上面的 if(finishing){} 分支了

finishing = advance =

true

;

i = n;

// recheck before commit

}

}

// 如果位置 i 處是空的,沒有任何節點,那麼放入剛剛初始化的 ForwardingNode ”空節點“

else

if

((f = tabAt(tab, i)) ==

null

)

advance = casTabAt(tab, i,

null

, fwd);

// 該位置處是一個 ForwardingNode,代表該位置已經遷移過了

else

if

((fh = f.hash) == MOVED)

advance =

true

;

// already processed

else

{

// 對數組該位置處的結點加鎖,開始處理數組該位置處的遷移工作

synchronized

(f) {

if

(tabAt(tab, i) == f) {

Node<K,V> ln, hn;

// 頭結點的 hash 大于 0,說明是連結清單的 Node 節點

if

(fh >=

) {

// 下面這一塊和 Java7 中的 ConcurrentHashMap 遷移是差不多的,

// 需要将連結清單一分為二,

//   找到原連結清單中的 lastRun,然後 lastRun 及其之後的節點是一起進行遷移的

//   lastRun 之前的節點需要進行克隆,然後分到兩個連結清單中

int

runBit = fh & n;

Node<K,V> lastRun = f;

for

(Node<K,V> p = f.next; p !=

null

; p = p.next) {

int

b = p.hash & n;

if

(b != runBit) {

runBit = b;

lastRun = p;

}

}

if

(runBit ==

) {

ln = lastRun;

hn =

null

;

}

else

{

hn = lastRun;

ln =

null

;

}

for

(Node<K,V> p = f; p != lastRun; p = p.next) {

int

ph = p.hash; K pk = p.key; V pv = p.val;

if

((ph & n) ==

)

ln =

new

Node<K,V>(ph, pk, pv, ln);

else

hn =

new

Node<K,V>(ph, pk, pv, hn);

}

// 其中的一個連結清單放在新數組的位置 i

setTabAt(nextTab, i, ln);

// 另一個連結清單放在新數組的位置 i+n

setTabAt(nextTab, i + n, hn);

// 将原數組該位置處設定為 fwd,代表該位置已經處理完畢,

//    其他線程一旦看到該位置的 hash 值為 MOVED,就不會進行遷移了

setTabAt(tab, i, fwd);

// advance 設定為 true,代表該位置已經遷移完畢

advance =

true

;

}

else

if

(f

instanceof

TreeBin) {

// 紅黑樹的遷移

TreeBin<K,V> t = (TreeBin<K,V>)f;

TreeNode<K,V> lo =

null

, loTail =

null

;

TreeNode<K,V> hi =

null

, hiTail =

null

;

int

lc =

, hc =

;

for

(Node<K,V> e = t.first; e !=

null

; e = e.next) {

int

h = e.hash;

TreeNode<K,V> p =

new

TreeNode<K,V>

(h, e.key, e.val,

null

,

null

);

if

((h & n) ==

) {

if

((p.prev = loTail) ==

null

)

lo = p;

else

loTail.next = p;

loTail = p;

++lc;

}

else

{

if

((p.prev = hiTail) ==

null

)

hi = p;

else

hiTail.next = p;

hiTail = p;

++hc;

}

}

// 如果一分為二後,節點數少于 8,那麼将紅黑樹轉換回連結清單

ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :

(hc !=

) ?

new

TreeBin<K,V>(lo) : t;

hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :

(lc !=

) ?

new

TreeBin<K,V>(hi) : t;

// 将 ln 放置在新數組的位置 i

setTabAt(nextTab, i, ln);

// 将 hn 放置在新數組的位置 i+n

setTabAt(nextTab, i + n, hn);

// 将原數組該位置處設定為 fwd,代表該位置已經處理完畢,

//    其他線程一旦看到該位置的 hash 值為 MOVED,就不會進行遷移了

setTabAt(tab, i, fwd);

// advance 設定為 true,代表該位置已經遷移完畢

advance =

true

;

}

}

}

}

}

}

說到底,transfer 這個方法并沒有實作所有的遷移任務,每次調用這個方法隻實作了 transferIndex 往前 stride 個位置的遷移工作,其他的需要由外圍來控制。

這個時候,再回去仔細看 tryPresize 方法可能就會更加清晰一些了。

get 方法從來都是最簡單的,這裡也不例外:

  1. 計算 hash 值
  2. 根據 hash 值找到數組對應位置: (n – 1) & h
  3. 根據該位置處結點性質進行相應查找
    • 如果該位置為 null,那麼直接傳回 null 就可以了
    • 如果該位置處的節點剛好就是我們需要的,傳回該節點的值即可
    • 如果該位置節點的 hash 值小于 0,說明正在擴容,或者是紅黑樹,後面我們再介紹 find 方法
    • 如果以上 3 條都不滿足,那就是連結清單,進行周遊比對即可

public

V get(Object key) {

Node<K,V>[] tab; Node<K,V> e, p;

int

n, eh; K ek;

int

h = spread(key.hashCode());

if

((tab = table) !=

null

&& (n = tab.length) >

&&

(e = tabAt(tab, (n -

1

) & h)) !=

null

) {

// 判斷頭結點是否就是我們需要的節點

if

((eh = e.hash) == h) {

if

((ek = e.key) == key || (ek !=

null

&& key.equals(ek)))

return

e.val;

}

// 如果頭結點的 hash 小于 0,說明 正在擴容,或者該位置是紅黑樹

else

if

(eh <

)

// 參考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)

return

(p = e.find(h, key)) !=

null

? p.val :

null

;

// 周遊連結清單

while

((e = e.next) !=

null

) {

if

(e.hash == h &&

((ek = e.key) == key || (ek !=

null

&& key.equals(ek))))

return

e.val;

}

}

return

null

;

}

簡單說一句,此方法的大部分内容都很簡單,隻有正好碰到擴容的情況,ForwardingNode.find(int h, Object k) 稍微複雜一些,不過在了解了資料遷移的過程後,這個也就不難了,是以限于篇幅這裡也不展開說了。

總結

其實也不是很難嘛,雖然沒有像之前的 AQS 和線程池一樣一行一行源碼進行分析,但還是把所有初學者可能會糊塗的地方都進行了深入的介紹,隻要是稍微有點基礎的讀者,應該是很容易就能看懂 HashMap 和 ConcurrentHashMap 源碼了。

參考文章

  • Java中的幾個HashMap/ConcurrentHashMap實作分析
  • HashMap 實作原理
  • Java HashMap 周遊方式性能探讨
  • 深入并發包 ConcurrentHashMap
  • Java 并發實踐 — ConcurrentHashMap 與 CAS
  • HashMap 和 HashTable 到底哪不同 ?
  • 深入分析ConcurrentHashMap1.8的擴容實作
  • ConcurrentHashMap的紅黑樹實作分析
  • 談談ConcurrentHashMap1.7和1.8的不同實作
  • JDK7與JDK8中HashMap的實作

繼續閱讀