文章目录
- 面试
-
- JDK中的CHM的数据结构?
- JDK7/8中的CHM是怎么保证并发安全的?
- JDK7在高并发下如何保证取得的元素是最新的?
- JDK7和JDK8中的CHM的不同点
- CHM一定线程安全吗?
- 说说jdk8的扩容
- 1.7详解
-
- 核心
- hash
- put
-
- 总结
- get
- size
- rehash
- CAS操作
- 1.8
-
- 核心
- 重要参数
- 五个构造方法
- put
-
- initTable
- unsafe
- addCount
- TreeBin
- tryPresize
- transfer 扩容
- get
- clear
面试
JDK中的CHM的数据结构?
1.7: Segment数组+HashEntry数组+链表
1.8:Node数组+链表/红黑树
JDK7/8中的CHM是怎么保证并发安全的?
JDK7:
主要利⽤Unsafe操作+ReentrantLock+分段锁思想。
分段思想是为了提⾼CHM的
并发量
,分段数越⾼则⽀持的最⼤并发量越⾼,程序员可以 通过
concurrencyLevel
参数来指定
并发量
。
put数据流程
当调⽤CHM的put⽅法时,先获得Segment的可重⼊锁,然后获得HashEntry的锁,加锁成功后,再将待插⼊的
key,value
插⼊到HashEntry的链表中,插⼊完成后
解锁
。
JDK8
主要利⽤Unsafe操作+synchronized+CAS。
JDK7在高并发下如何保证取得的元素是最新的?
用于存储键值对数据的HashEntry,在设计上,它的成员变量value跟next都是
volatile
修饰的,这样就保证别的线程对value值的修改,get方法
可以马上看到
。
JDK7和JDK8中的CHM的不同点
核心差异
- 取消了segment分段锁数组,直接在Node数组的每个位置上加
锁,并利用CAS优化性能 ,使得锁的粒度更细,进一步减少并发冲突的概率Synchronized
- JDK8中的扩容性能更⾼,⽀持多线程同时扩容,虽然JDK7中也⽀持多线程扩容,但是性能没有JDK8⾼,因为JDK8中任意⼀个线程都可以去帮助扩容
方法差异
put
1.7:先定位Segment,再定位桶,put全程加锁,没有获取锁的线程提前找桶的位置,并最多自旋
64
次获取锁,超过则挂起。
1.8:由于移除了Segment,类似HashMap可以直接定位到桶,拿到首节点后进行判断
- 为空则CAS插入;
- 若正在扩容则跟着一起扩容;
- 其他情况则加锁put(类似1.7)
get
基本类似,由于value声明为
volatile
,保证了修改的可见性,因此不需要加锁。
resize
1.7:跟HashMap步骤一样,只不过是搬到单线程中执行,避免了HashMap在1.7中扩容时
死循环
的问题,保证线程安全。
1.8:支持并发扩容
1.8的HashMap扩容由
头插改为尾插
(为了避免死循环问题)CHM也一样。
迁移也是从尾部开始,扩容前在桶的头部放置一个hash值为-1的节点,这样别的线程访问时就能判断该桶是否已经被其他线程处理过了。
size
1.7:很经典的思路:计算两次,如果不变则返回计算结果,若不一致,则锁住所有的Segment求和。
1.8:用
baseCount
来存储当前的节点个数
CHM一定线程安全吗?
不一定,看下面的代码
结果:
get和 put都是原子操作,但
组合在一起就不是原子操作
了,所以最终结果并不是200
说说jdk8的扩容
和扩容有关的重要参数:sizeCtl
- 0: 表示
未初始化,记录table需要初始化的大小,例如默认为16table
- 正数:说明已初始化,记录数组的扩容阈值
。(capcity*0.75)
- -1 :说明正在初始化
-
-N:
纠错:不是代表有
个线程。而是表示取对应的二进制的N-1
数值为低16位
,代表此时有M
个M-1
扩容线程
扩容流程
新增节点之后,会调用
addCount
方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值时(8和64),会触发扩容
具体扩容流程:首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素f,初始化一个实例
forwardNode
fwd
如果f == null,则在table中的i位置放入fwd
否则采用头插法的方式把当前旧table数组的指定任务范围的数据给迁移到新的数组中,然后给旧table原位置赋值fwd。
直到遍历过所有的节点以后就完成了复制工作,把table指向nextTable,并更新
为新数组大小的
sizeCtl
0.75
倍 ,扩容完成。
在此期间如果其他线程的有读写操作都会判断head节点是否为forwardNode节点,如果是就帮助扩容。
扩容时是否可以进行读写
- 对于
get
读操作
如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时get线程
会帮助扩容
。
如果当前节点有数据,还没迁移完成,此时不影响读,能够正常进行。
- 对于
put/remove
写操作
如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时写线程
如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被会帮助扩容
,直到扩容完成。阻塞
1.7详解
核心
ConcurrentHashMap使用
分段锁
技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现
真正的并发访问
。如下图是ConcurrentHashMap的内部结构图:
可以看到组成CHM最重要的两个数据结构:
一个
不可扩容的Segment数组结构
+
可以扩容的HashEntry数组
。
Segment+HashEntry
默认 Segment 的个数是
16
个,且一旦初始化完成就
不能改变
,相当于
ConcurrentHashMap
默认支持最多 16 个线程并发。
Segment继承自
可重入锁(ReentrantLock)
,在
ConcurrentHashMap
中作为锁来
确保线程安全
;
一个
Segment
里包含一个
HashEntry
(HashEntry类似于HashMap),所以segment可以理解为:每个segment都是
实现了Lock功能的HashMap
,利用
Segment
数组来实现
锁分离
。
CHM定位一个数据需要进行
两次Hash操作
。第一次Hash定位到
Segment
,第二次Hash定位到元素所在的
HashEntry链表
的头部。
HashEntry数组初始大小,默认16
HashEntry数组类似于 HashMap 的结构。所以每一个HashEntry数组的
初始化大小
都是
2的次幂
,默认
16
。
loadFactor: 扩容因子,默认
0.75
当一个Segment存储的元素数量大于
initialCapacity* loadFactor
时,该Segment会进行一次
扩容
,扩容为原来的
1.5倍
。
concurrencyLevel:并发度,默认16
并发度可以理解为程序运行时能够
「同时更新」
ConccurentHashMap且
不产生锁竞争
的
最大线程数
,实际上就是
ConcurrentHashMap
中的
分段锁个数
,即
Segment[]
的数组长度。
如果并发度设置的
过小
,会带来严重的
锁竞争
问题;
如果并发度设置的
过大
,原本位于
同一个Segment
内的访问会扩散到不同的Segment中,引起
程序性能下降
。
hash
不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。
整体思想就是通过多次不同方式的位运算来努力将数据均匀的分布到目标table中,都是些扰动函数
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
put
计算插入的索引值通过:
key.hashcode & segmentMask
(segmentMask=ssize-1, 类似hashmap的n-1)
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
// 其实也就是把高4位与segmentMask(1111)做与运算
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 如果查找到的 Segment 为空,初始化
s = ensureSegment(j);
return s.put(key, hash, value, false); //待分析
}
上面的源码分析了 ConcurrentHashMap 在 put 一个数据时的处理流程,下面梳理下具体流程。
1、计算要 put 的 key 的位置,获取指定位置的 Segment。
2、如果指定位置的 Segment 为空,则初始化这个 Segment.
3、初始化 Segment 流程:
检查计算得到的位置的 Segment 是否为null.
为 null 继续初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组。
再次检查计算得到的指定位置的 Segment 是否为null.
使用创建的 HashEntry 数组初始化这个 Segment.
自旋判断计算得到的指定位置的 Segment 是否为null,使用 CAS 在这个位置赋值为 Segment.
Segment.put 插入 key,value 值。
上面探究了获取 Segment 段和初始化 Segment 段的操作。最后一行的 Segment 的 put 方法还没有查看,继续分析。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取 ReentrantLock 独占锁,如果获取不到,则用scanAndLockForPut 循环获取。
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 计算要put的数据位置
int index = (tab.length - 1) & hash;
// CAS 获取 index 坐标的值
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
// 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// first 有值,说明 index 位置已经有值了,有冲突,链表头插法。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 容量大于扩容阀值,小于最大容量,进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
由于 Segment 继承了 ReentrantLock,所以 Segment 内部可以很方便的获取锁,put 流程就用到了这个功能。下面是对上面源码的说明:
1、tryLock() 获取锁,获取不到使用 scanAndLockForPut 方法继续获取。
2、计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry 。
3、遍历 put 新元素,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。
当这个位置上的 HashEntry 不存在:
如果当前容量大于扩容阀值,小于最大容量,进行扩容。
小于扩容直接头插法插入。
当这个位置上的 HashEntry 存在:
判断链表当前元素 Key 和 hash 值是否和要 put 的 key 和 hash 值一致。
一致则替换值
不一致,获取链表下一个节点,直到发现相同进行值替换,如果链表里没有相同的,则:
如果当前容量大于扩容阀值,小于最大容量,进行扩容。
小于扩容直接链表头插法插入。
4、如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null.
scanAndLockForPut 分析
第一步中的 scanAndLockForPut 操作这里没有介绍,这个方法做的操作就是不断的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。在自旋时顺便获取下 hash 位置的 HashEntry。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 自旋获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 自旋达到指定次数后,阻塞等待直到获取到锁
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
总结
当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。
get
ConcurrentHashMap的get操作跟HashMap类似,只是ConcurrentHashMap第一次需要经过一次hash定位到Segment的位置,然后再hash定位到指定的HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key); // JDK7中标准的hash值获取算法
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到对应的segment上
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 无非就是获得hash值对应的segment 是否存在,
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
// 看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表
K k;
if ((k = e.key) key || (e.hash h && key.equals(k)))
return e.value;
}
}
return null;
}
size
计算ConcurrentHashMap的元素大小是一个有趣的问题,因为他是并发操作的,就是在你计算size的时候,他还在并发的插入数据,可能会导致你计算出来的size和你实际的size有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案
1、第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的
2、第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回
所以size方法要谨慎使用
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ RETRIES_BEFORE_LOCK) { // 超过2次则全部加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // 直接对全部segment加锁消耗性太大
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount; // 统计的是modCount,涉及到增删该都会加1
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum last) // 每一个前后的修改次数一样 则认为一样,但凡有一个不一样则直接break。
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
rehash
segment 数组初始化后就不可变了,也就是说
「并发性不可变」
,不过segment里的
table可以扩容为2倍
,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的rehash思路跟JDK8 中扩容后处理链表的思路一样。
// 方法参数中的node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置,
// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask; // 新位置
if (next null) // 该位置处只有一个元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是链表表头
HashEntry<K,V> lastRun = e;
// idx 是当前链表的头结点 e 的新位置
int lastIdx = idx;
// for 循环找到一个 lastRun 结点,这个结点之后的所有元素是将要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将 lastRun 及其之后的所有结点组成的这个链表放到 lastIdx 这个位置
newTable[lastIdx] = lastRun;
// 下面的操作是处理 lastRun 之前的结点,
//这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
CAS操作
在JDK7里在
ConcurrentHashMap
中通过
原子操作sun.misc.Unsafe
来
查找元素、替换元素和设置元素
。通过这样的硬件级别获得数据可以保证即使是多线程,每次获得的数据也是最新的。这些原子操作起着非常关键的作用,你可以在所有ConcurrentHashMap的基本功能中看到。
1.8
核心
JDK1.8已经摒弃了Segment的概念,而是直接用Node数组+链表/红黑树的数据结构来实现,并发控制使用
Synchronized+CAS
,
初始化,扩容等都和HashMap一样,所以CHM整个看起来就像是
优化过且线程安全的HashMap
虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本
JDK1.8的Node节点中
value和next
都用
volatile
修饰,保证并发的可见性。
synchronized 只锁定当前链表或红⿊树的
⾸节点
,这样只要 hash 不冲突,就不会产⽣并发,效率⼜提升 N 倍。
3、JDK1.7版本锁的粒度是基于
Segment
即
HashEntry[]
数组,而JDK1.8锁的粒度就是
Node
节点,锁的粒度粗化使得锁竞争变小,性能更高。
5、JDK1.8为什么使用
内置锁synchronized
来代替
重入锁ReentrantLock
,有以下几点:
- 因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差
-
JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
在大量的数据操作下,对于JVM的内存压力,基于API的
会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据。ReentrantLock
重要参数
private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值
private static final int DEFAULT_CAPACITY = 16; // 默认数组长度
static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件
static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的第二个条件
static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的条件
static final int MOVED = -1; // 表示正在扩容转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数
transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组
static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数
private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null
private transient volatile long baseCount; //保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。
private transient volatile int sizeCtl;
siezctl详解
五个构造方法
1、ConcurrentHashMap()
无参构造函数用于创建一个带有默认
初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16)
的新的CHM对象。
2、ConcurrentHashMap(int
initialCapacity
)
得到一个
>initialCapacity
的
2的幂次方
的初始容量的CHM对象
和1.7以及hashMap不一样,即使initialCapacity是2的幂次方,也会计算得到一个大于initialCapacity的值,
例如initialCapacity为16,会初始化为32
3、ConcurrentHashMap(
int initialCapacity, float loadFactor
)
创建一个带有指定
初始容量、加载因子和默认 concurrencyLevel
的新的空映射。
注意看下面框起来的,concurrencyLevel=1
4、ConcurrentHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel
)
创建一个
>=initialCapacity
的
2的幂次方
的初始容量的CHM对象
如initialCapacity=15,则sizeCtl=16,
若initialCapacity为16,则sizeCtl为16。若initialCapacity大小超过了允许的最大值,则sizeCtl为最大值。值得注意的是,构造函数中的concurrencyLevel参数已经在JDK1.8中的意义发生了很大的变化,其并不代表所允许的并发数,其只是用来
确定sizeCtl大小
,在JDK1.8中的并发控制都是针对具体的桶而言,即有多少个桶就可以允许多少个并发数。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
5、该构造函数用于构造一个与给定映射具有相同映射关系的新映射
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
put
put⼀个key,value时的流程:
- ⾸先根据key计算对应的
,如果该位置没有元素,则通过数组下标i
的方式向该位置赋值。⾃旋
- 如果该位置有元素,则会
加锁synchronized
-
加锁成功之后,再判断该元素的类型
a. 如果是
则进⾏添加节点到链表节点
链表
中
b. 如果是
则添加节点到红⿊树
红⿊树
- 添加成功后,判断是否需要进⾏树化
- 执行
,这个⽅法的意思是addCount
的元素个数ConcurrentHashMap
,但是这个操作也是需要+1
的,并且元素个数加1成功后,会继续判断并发安全
,如果需要,则会进⾏扩容,所以这个⽅法 很重要。是否要进⾏扩容
-
同时⼀个线程在put时如果发现当前ConcurrentHashMap正在进⾏扩容,则会去帮助扩容。
synchronized 只锁定当前链表或红黑树的
,这样只要 hash 不冲突,就不会产生并发,效率又首节点
。提升 N 倍
源码分析:
public V put(K key, V value) {
return putVal(key, value, false);
}
put函数底层调用了putVal进行数据的插入,对于putVal函数的流程大体如下:
① 判断存储的key、value是否为空,若为空,则抛出异常,不为空进入步骤②
② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③
③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将
key、value、hash
值生成的结点放入桶中。否则,进入步骤④
④ 若该结点的的hash值为
MOVED
,则对该桶中的结点进行转移,否则,进入步骤⑤
⑤ 对桶中的第一个结点进行
加锁
,对该桶进行遍历,桶中的结点的hash值和key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到
hash值与key值
和
指定的hash值与key值
相等的结点,则直接新生一个结点并插入到链表末尾。进入步骤⑥
⑥ 若
binCount
值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加
binCount
的值。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// ①键或值为空,抛出异常
if (key == null || value == null) throw new NullPointerException();
// 键的hash值经过计算获得hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 无限循环
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // 表为空或者表的长度为0
// ②初始化表
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表不为空并且表的长度大于0,并且该桶不为空
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null))) // ③比较并且交换值,如tab的第i项为空则用新生成的node替换
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED
// ④进行结点的转移(在扩容的过程中)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { //⑤ 加锁同步
if (tabAt(tab, i) == f) { // 找到table表下标为i的节点
if (fh >= 0) { // 该table表中该结点的hash值大于0
// binCount赋值为1
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 无限循环
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等
// 保存该结点的val值
oldVal = e.val;
if (!onlyIfAbsent) // 进行判断
// 将指定的value保存至结点,即进行了结点值的更新
e.val = value;
break;
}
// 保存当前结点
Node<K,V> pred = e;
if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
// 新生一个结点并且赋值给next域
pred.next = new Node<K,V>(hash, key,
value, null);
// 退出循环
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
// binCount赋值为2
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 将hash、key、value放入红黑树
// 保存结点的val
oldVal = p.val;
if (!onlyIfAbsent) // 判断
// 赋值结点value值
p.val = value;
}
}
}
}
if (binCount != 0) { // binCount不为0
//⑥ 如果binCount大于等于转化为红黑树的阈值
if (binCount >= TREEIFY_THRESHOLD)
// 进行转化
treeifyBin(tab, i);
if (oldVal != null) // 旧值不为空
// 返回旧值
return oldVal;
break;
}
}
}
// 增加binCount的数量
addCount(1L, binCount);
return null;
}
总结:
如果没有初始化就先调用
initTable()
方法来进行初始化过程
如果没有hash冲突就直接CAS插入
如果还在进行扩容操作就先进行扩容
如果存在hash冲突,就加锁来保证线程安全,这里有两种情况:
- 一种是链表形式就直接遍历到尾端插入,
- 一种是红黑树就按照红黑树结构插入,
最后一个如果Hash冲突时会形成
Node链表
,在链表长度超过8,Node数组超过64时会将链表结构转换为红黑树的结构,break再一次进入循环
如果添加成功就调用
addCount()
方法统计size,并且检查是否需要扩容
在putVal函数中会涉及到如下几个函数:
initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数
。下面对其中涉及到的函数进行分析。
initTable
initTable
只允许一个线程对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度。这样保证了表同时只会被一个线程初始化,
对于table的大小,会根据
sizeCtl
的值进行设置,如果没有设置
szieCtl
的值,那么默认大小为16.
// 容器初始化 操作
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) null || tab.length 0) {
if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在扩容。
Thread.yield(); // 进行线程让步等待
// 让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。
// 它可能会获取到,也有可能被其他线程获取到。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 比较sizeCtl的值与sc是否相等,相等则用 -1 替换,这表明我这个线程在进行初始化了!
try {
if ((tab = table) null || tab.length 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认为16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // sc = 0.75n
}
} finally {
sizeCtl = sc; //设置sizeCtl 类似threshold
}
break;
}
}
return tab;
}
unsafe
在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。
table[i]数据
是通过
Unsafe
对象通过
反射
获取的,
取数据直接table[index]不可以么,为什么要这么复杂?
在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,
Unsafe.getObjectVolatile
可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」。
addCount
主要就2件事:一是更新 baseCount,二是判断是否需要扩容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 首先如果没有并发 此时countCells is null, 此时尝试CAS设置数据值。
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 如果 counterCells不为空以为此时有并发的设置 或者 CAS设置 baseCount 失败了
CounterCell a; long v; int m;
boolean uncontended = true;
if (as null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 1. 如果没出现并发 此时计数盒子为 null
// 2. 随机取出一个数组位置发现为空
// 3. 出现并发后修改这个cellvalue 失败了
// 执行funAddCount
fullAddCount(x, uncontended);// 死循环操作
return;
}
if (check <= 1)
return;
s = sumCount(); // 吧counterCells数组中的每一个数据进行累加给baseCount。
}
// 如果需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);// 获得高位标识符
if (sc < 0) { // 是否需要帮忙去扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc rs + 1 ||
sc rs + MAX_RESIZERS || (nt = nextTable) null || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} // 第一次扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
baseCount添加ConcurrentHashMap提供了
baseCount、counterCells 两个辅助变量和一个 CounterCell辅助内部类
。
sumCount() 就是迭代 counterCells来统计 sum 的过程。
put 操作时,肯定会影响 size(),在 put() 方法最后会调用
addCount
()方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的ConcurrentHashMap。每一个Cell都用Contended修饰来避免伪共享。
JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。
JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。
TreeBin
主要功能就是当链表变化为红黑树时,这个红黑树用
TreeBin来包装
。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下:
TreeBin.first = 链表中第一个节点。
TreeBin.root = 红黑树中的root节点。
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
//创建空节点 hash = -2
this.first = b;
TreeNode<K,V> r = null; // root 节点
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r null) {
x.parent = null;
x.red = false;
r = x; // root 节点设置为x
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
// x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc null &&
(kc = comparableClassFor(k)) null) ||
(dir = compareComparables(kc, k, pk)) 0)
dir = tieBreakOrder(k, pk);
// 当key不可以比较,或者相等的时候采取的一种排序措施
TreeNode<K,V> xp = p;
// 放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。
// 找到了目前叶子节点才会进入 再放置数据
if ((p = (dir <= 0) ? p.left : p.right) null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
// 每次插入一个元素的时候都调用 balanceInsertion 来保持红黑树的平衡
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
tryPresize
当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时传入的map会调用
putAll
方法直接put一个map的话,在
「putAll」
方法中没有调用
initTable
方法去初始化table,而是直接调用了
tryPresize
方法,所以这里需要做一个是不是需要初始化table的判断。
PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。
transfer 扩容
这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其「是其他线程帮着去扩容的骚操作」。
1、 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。
2、每个线程进来会先领取自己的任务区间[bound,i],然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。
该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。
第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。
f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
3、几乎跟HashMap大致思路类似的遍历链表/红黑树然后扩容操作。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPU
stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 每个CPU处理最小长度个数
if (nextTab null) { // 新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // transferIndex 指向最后一个桶,方便从后向前遍历
}
int nextn = nextTab.length; // 新表长度
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
boolean advance = true; //是否继续向前查找的标志位
boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没
// 第一部分
// i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】 这样来跟线程划分任务。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点
// 一个桶一个桶的处理
while (advance) {// 每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据
int nextIndex, nextBound;
if (--i >= bound || finishing) { //通过此处进行任务区间的遍历
advance = false;
}
else if ((nextIndex = transferIndex) <= 0) {
i = -1;// 任务分配完了
advance = false;
}
// 更新 transferIndex
// 为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
// nextIndex本来等于末尾数字,
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 当前线程所有任务完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 已经完成转移 则直接赋值操作
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示当前线程任务完成。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
// 判断当前线程完成的线程是不是最后一个在扩容的,思路精髓
return;
}
finishing = advance = true;// 如果是则相应的设置参数
i = n;
}
}
else if ((f = tabAt(tab, i)) null) // 数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])
advance = casTabAt(tab, i, null, fwd); // 如果老节点数据是空的则直接进行CAS设置为fwd
else if ((fh = f.hash) MOVED) //已经是个fwd了,因为是多线程操作 可能别人已经给你弄好了,
advance = true; // already processed
else {
synchronized (f) { //加锁操作
if (tabAt(tab, i) f) {
Node<K,V> ln, hn;
if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点
// 关于链表的操作整体跟HashMap类似不过 感觉好像更扰一些。
int runBit = fh & n; // fh= f.hash first hash的意思,看第一个点 放老位置还是新位置
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n; //n的值为扩张前的数组的长度
if (b != runBit) {
runBit = b;
lastRun = p;//最后导致发生变化的节点
}
}
if (runBit 0) { //看最后一个变化点是新还是旧 旧
ln = lastRun;
hn = null;
}
else {
hn = lastRun; //看最后一个变化点是新还是旧 旧
ln = null;
}
/*
* 构造两个链表,顺序大部分和原来是反的,不过顺序也有差异
* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) 0)
/*
* 假设runBit的值为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
ln = new Node<K,V>(ph, pk, pv, ln);
else
/*
* 假设runBit的值不为0,
* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点
* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
*/
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 该节点hash值是个负数否则的话是一个树节点
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; // 旧 头尾
TreeNode<K,V> hi = null, hiTail = null; //新头围
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) 0) {
if ((p.prev = loTail) null)
lo = p;
else
loTail.next = p; //旧头尾设置
loTail = p;
++lc;
}
else { // 新头围设置
if ((p.prev = hiTail) null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//ln 如果老位置数字<=6 则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); //老表中i位置节点设置下
advance = true;
}
}
}
}
}
}
get
- 计算hash值,定位到该table的索引位置
- 如果是首节点符合就返回
- 如果遇到扩容,查找该节点,匹配就返回
- 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回
null
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 表不为空并且表的长度大于0并且key所在的桶不为空
if ((eh = e.hash) == h) {
// 表中的元素的hash值与key的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 键相等
return e.val; // 返回值
}
//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable
else if (eh < 0)
// 在桶(链表/红黑树)中查找
return (p = e.find(h, key)) != null ? p.val : null;
//既不是首节点也不是ForwardingNode,那就往下遍历
while ((e = e.next) != null) { // 对于结点hash值大于0的情况
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
clear
关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。
public void clear() {
long delta = 0L;
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
if (f null)
++i; //这个桶是空的直接跳过
else if ((fh = f.hash) MOVED) { // 这个桶的数据还在扩容中,要去扩容同时等待。
tab = helpTransfer(tab, f);
i = 0; // restart
}
else {
synchronized (f) { // 真正的删除
if (tabAt(tab, i) f) {
Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
//循环到链表/者红黑树的尾部
while (p != null) {
--delta; // 记录删除了多少个
p = p.next;
}
//利用CAS无锁置null
setTabAt(tab, i++, null);
}
}
}
}
if (delta != 0L)
addCount(delta, -1); //调整count
}