前言
hello,everyone。上篇博客中介绍了HashMap,本文给大家带来ConcurrentHashMap。
【 一文吃透HashMap的前世与今生】:https://cloud.tencent.com/developer/article/2079820
上文中提到了,HashMap是线程不安全的类,k-v类型数据操作在多线程下推荐使用ConcurrentHashMap。本文将会延续HashMap的解读思路,对ConcurrentHashMap从关键成员变量,核心方法与常见面试点出发,帮助大家深入浅出的理解ConcurrentHashMap这个在java中核心数据结构。
一.ConcurrentHashMap介绍
ConcurrentHashMap与HashMap一样,同样是对K-V类型数据进行操作的数据结构。HashMap中通过modCount来避免多线程下的冲突,但是是通过fast-fail机制解决。无法解决在多线程情况下,对同一个key值进行有序的赋值动作。ConcurrentHashMap在1.7版本中使用segment分段锁的形式控制并发写入,JDK1.8版本的时候使用CAS synchronized关键字控制并发写入。本文将针对JDK1.8介绍ConcurrentHashMap的核心概念。
二.关键概念介绍
ConcurrentHashMap的关键数据结构概念与HashMap是一致的。
- 数组
- 线性链表
- 二叉树
- 哈希表
- 哈希冲突
【一问吃透HashMap的前世与今生:https://juejin.cn/post/6959584804375363620】
三.关键成员变量
代码语言:javascript复制//hashamp的最大容量,2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;
//构造hashmap时,默认初始化容量为16
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//默认扩容的扩展因子,当hashmap中的元素个数达到当前容量的75%时,触发扩容
private static final float LOAD_FACTOR = 0.75f;
//ConcurrentHashMap数组节点上链表转换为红黑的的阈值,链表节点达到8个时转换为红黑树【注意:这里不是绝对,切往后看】
static final int TREEIFY_THRESHOLD = 8;
//链表节点数小于6个时,从红黑树转换为链表
static final int UNTREEIFY_THRESHOLD = 6;
//链表转化为红黑的第二个要求,与TREEIFY_THRESHOLD对应,最小的链转树的数组大小
static final int MIN_TREEIFY_CAPACITY = 64;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//记录目标的hash值
static final int MOVED = -1;
//目标的红黑树根节点hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//获取当前CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
//默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方
transient volatile Node<K,V>[] table;
//默认为null,扩容时新生成的数组,其大小为原数组的两倍
private transient volatile Node<K,V>[] nextTable;
四.关键方法解析
4.1.构造方法
ConcurrentHashMap拥有空参与有参的构造方法。
4.1.1.空参构造方法
代码语言:javascript复制/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}
从注释可以看出来,空参构造方法新建一个空的map,初始化大小为默认的16。空参构造的时候,当我们放入第一个元素的时候将会初始化map。我们来看一下put方法中putVal方法
initTable()方法解析
代码语言:javascript复制private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0)
Thread.yield();
//如果cas成功,修改sizeCtl变量未-1,表示正在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//table为空,设置一个默认大小的数组
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//设置一个默认扩容大小为12,根据扩容因子0.75得到。因为这里是空参构造方法,所有参数
为默认
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
从上的值,空构造方本身没有什么含义,只会初始化一些有默认值的变量,真正的结构数据初始化需要等到第一次put元素进去。
4.1.2.有参构造方法
代码语言:javascript复制public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//将当前容量设置为1.5倍 1
tableSizeFor(initialCapacity (initialCapacity >>> 1) 1));
this.sizeCtl = cap;
}
容量设置保证为2的倍数。这里一个有一个特别重要的地方,在上面备注中已经指出来,传入的容量在初始化的时候会设置为传入容量的1.5倍 1,这么做的原因是,如果你需要7个元素的map容量,你传入了7,按照常理会设置一个8容量给你,但是你已经知道容量是7,元素个数到了第6个时,将会发生扩容。这里为了避免初始化容量设置不合理,导致后续扩容的耗时操作,则根据0.75f的扩容因子进行了初始化容量的设置。
如果输入7,则容量为16
如果输入15,则容量为32
4.2.put方法
代码语言:javascript复制public V put(K key, V value) {
return putVal(key, value, false);
}
代码语言:javascript复制//onlyIfAbsent参数为,当key值不存在的时候在进行设置
final V putVal(K key, V value, boolean onlyIfAbsent) {
//这里与HashMap不同,key与value都不允许为空
if (key == null || value == null) throw new NullPointerException();
//计算key的hash
int hash = spread(key.hashCode());
int binCount = 0;
//数组遍历
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果为空参构造方法,table参数还未被初始化,则先进行初始化动作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果根据hash定位到的数组节点为空则使用cas进行新节点的设置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果定位到的hash值为-1,则说明正在扩容,当前线程去帮忙进行扩容操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//最后一种情况,说明不是直接在数组上的节点,则遍历链表或者红黑树,遍历时使用synchronized加锁
V oldVal = null;
synchronized (f) {
//在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改
if (tabAt(tab, i) == f) {
//根据在第二点的值,hash值大于0时为链表节点,则进行链表遍历
if (fh >= 0) {
//链表节点计数
binCount = 1;
//链表遍历
for (Node<K,V> e = f;; binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//这里判断节点是不是等于TreeBin,如果是,则进行红黑树遍历
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//红黑树设置值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//计数值如果大于8
if (binCount != 0) {
//计数大于8则链表转换红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果key值节点已经存在则进行返回
if (oldVal != null)
return oldVal;
break;
}
}
}
//容量增加,决定是否扩容
addCount(1L, binCount);
return null;
}
put方法是整个ConcurrentHashMap的核心,使用cas synchronized关键字的方式保证了各个节点的线程安全。在这里不知道小伙伴有没有疑问为什么要使用cas synchronized加锁,替换了JDK1.7版本里面Segment ReentrantLock的方式。这里的主要原因是因为锁被细化了,Segment ReentrantLock是分段加锁,锁的冲突相对频率高、而cas synchronized是针对数组节点或者数组上整条链而言的,大大减少了锁的竞争冲突。JDK1.6之后,对synchronized关键字又做了关键的性能提升,做了各种的锁粗化,升级,消除等等一系列优化。
因此在ConcurrentHashMap这么细粒度下加锁,性能是不会比ReentrantLock差的。
synchronized锁升级过程:https://www.jianshu.com/p/704eb56aa52a
4.3.get方法
代码语言:javascript复制public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//hash值计算
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果数组上的节点符合直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//小于0,则进行红黑树遍历,其中find方法点击进入,根据注释的值,将会调用子类TreeBin内部find方法,红黑树遍历查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//链表遍历
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法的逻辑比较简单,全程也没有加锁的动作出现。这里大家应该会有一点疑问,那我在扩容的过程中,节点重新hash了,还能正确遍历吗?这里就要看一下存储数据的Node<K,V>
代码语言:javascript复制static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
node节点的数据都是用volatile关键字进行修饰的,可以从主存中加载到最新的地址信息数据,也就不需要加锁来实现了。
4.4.remove方法
代码语言:javascript复制final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
remove方法整体上与put方法类似,都是属于写操作,直接定位到数组头结点则直接删除,否则加synchronized关键字进行处理。
4.5.扩容
扩容发生的时机是当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。
触发的入口分析:
1.put方法,塞入元素个数达到扩容的阈值
2.触发了tryPresize。这个动作有两种可能触发,
第一种:但数组有一个单位的链表节点达到了8个,但是数组长度还是小于64的情况,会尝试去扩容。
第二种:则是调用方法,putAll时。
ok,知道了入口,我们逐层方法点击定位到扩容的方法为:transfer
代码语言:javascript复制private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 多线程扩容,每核处理的量小于16,则强制赋值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// nextTab 为空,先实例化一个新的数组
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
// 新数组的大小是原来的两倍
ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新成员变量
nextTable = nextTab;
// 更新转移下标,就是 老的 tab 的 length
transferIndex = n;
}
// bound :该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
// advance: 该参数
int nextn = nextTab.length;
// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
// advance 变量指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
boolean advance = true;
// 完成状态,如果是 true,表示扩容结束
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
ConcurrentHashMap.Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 这儿多判断一次,是否为了防止可能出现的remove()操作
if (tabAt(tab, i) == f) {
// 旧链表上该节点的数据,会被分成低位和高位,低位就是在新链表上的位置跟旧链表上一样,
// 高位就是在新链表的位置是旧链表位置加上旧链表的长度
ConcurrentHashMap.Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
ConcurrentHashMap.Node<K,V> lastRun = f;
for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 该节点哈希值与旧链表长度与运算,结果为0,则在低位节点上,反之,在高位节点上
if ((ph & n) == 0)
ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
else
hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
// 在nextTable i n 位置处插上链表
setTabAt(nextTab, i n, hn);
// 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ConcurrentHashMap.TreeBin) {
// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
// 红黑树的逻辑跟节点一模一样,最后也会分高位和低位
ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
五.总结
本文详细介绍了ConcurrentHashMap数据结构的核心逻辑与实现。现做以下总结
1.ConcurrentHashMap初始化容量为2的幂次,空参构造默认容量为16,有参构造容量为大于1.5倍 1参数的最小2的幂次。
2.put方法中使用数组cas自旋设值,链表或者红黑树synchronized关键字加锁设值,根据判断数组上的hash值判断当前节点状态,-1为正在发生扩容,大于0为hash值直接进行值的覆盖,节点类型为TreeBin则为红黑树,否则链表遍历赋值。
3.红黑树转换要求,链表节点数大于等于8,且数组长度大于等于64.
4.get方法不加锁,支持并发,通过volatile关键字保证内存可见性,拿到最新的节点数据。
六.参考
https://juejin.cn/post/6844904018729254926
https://www.jianshu.com/p/cf5e024d9432
https://segmentfault.com/a/1190000021237438