Hi~朋友,不点蓝字关注,我们哪来故事?
特别说明:除第一小节以外,其他均都是以JDK 1.8的ConcurrentHashMap进行分析,本文信息量略大,每一份坚持都是值得被尊重的,希望你可以坚持读完这篇文章,也希望这篇文章对各位读者朋友有所帮助。
摘要
- JDK 1.7 CourrentHashMap实现
- 为什么放弃分段锁
- JDK 1.8 CourrentHashMap实现
- ConcurrentHashMap数据结构
- ConcurrentHashMap初始化
- ConcurrentHashMap的hash算法
- Unsafe.getObjectVolatile方法
- ConcurrentHashMap的put操作
- ConcurrentHashMap如何判断扩容
- ConcurrentHashMap扩容
- 红黑树退化为链表
- 学会问自己
1. JDK 1.7 CourrentHashMap实现
JDK1.7 CourrentHashMap采用分段锁的机制来保证线程安全,底层的数据结构是数组 链表的存储方式。
JDK1.7 中的CourrentHashMap内部有两个关键类,一个是Segement,另一个是HashEntry。
- Segement继承ReentrantLock来充当锁,每个Segement对象保护若干个Hash桶的线程安全
- HashEntry用来保存键值对
- 每个桶是由若干个HashEntry构成的链表
2. JDK 1.8 为什么放弃分段锁
分段锁的优势是不必对所有Hash桶进行安全锁定,只有两个线程在操作同一个Segement中的某些Hash桶时才会有冲突进行线程安全同步。
JDK1.8放弃分段锁的原因是:
- 分段锁在Segement中的元素越来越多时,锁的粒度会越来越大,分段锁的性能会下降
3. JDK 1.8 ConcurrentHashMap实现
JDK1.8 ConcurrentHashMap底层的数据结构采用的是数组 链表 红黑树的存储结构,并且放弃了分段锁,利用CAS Synchronized来保证线程安全。
4. JDK 1.8 ConcurrentHashMap数据结构
4.1 Node内部类
代码语言:javascript复制static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key "=" val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
}
Node是数组或者链表或者红黑树的一个节点,代表着每一个KEY-VALUE元素, next是链表中的下一个节点元素。
4.2 TreeNode内部类
代码语言:javascript复制static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}
TreeNode继承自Node,是红黑树中的节点。TreeNode会被包装在TreeBin中,由TreeBin完成红黑树的构造。
4.3 TreeBin内部类
代码语言:javascript复制static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
}
TreeBin继承自Node,Concurrent的table中如果是红黑树的形式,存储的是TreeBin并不是TreeNode,但TreeBin并不是红黑树的存储节点,TreeBin通过root属性来维护红黑树的根节点。
红黑树在旋转的时候根节点可能会被原来的子节点替换掉,在这个时间点,如果有其他线程要修改红黑树则会发生线程安全问题,所以TreeBin中使用waiter属性来表明正在操作红黑树的线程,防止其他线程进入。
4.4 ForwardingNode内部类
代码语言:javascript复制static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
ForwardingNode是一个特殊的节点,他的hash值为-1,使用nextTable存储下一个节点的引用,他通常在扩容的时候起作用,在发生扩容时,他会被当做占位符放在table中,该节点的key、value都为null。
4.5 volatile Node<K,V>[] table内部属性
Hash桶,存储了HashMap的元素,每一项的元素就有可能是普通的Node(链表),也有可能是红黑树(TreeBin),默认为null,初始化发生在第一次插入操作,大小默认是16,扩容时大小是2的幂次方
4.6 volatile Node<K,V>[] nextTable内部属性
默认为null,扩容的时候使用的新数组,大小是原数组的两倍。
4.7 volatile int sizeCtl
默认为0,用于控制table的初始化和扩容。
- -1代表正在初始化
- -N表示有N-1个线程在进行扩容操作
- 正数或0表示Hash桶还没被初始化,这个数值表示初始化或下一次扩容时的大小,大小始终为table容量的0.75
4.8 CounterCell[] counterCells & volatile long baseCount
baseCount和counterCells共同协作用来计算Map中的元素个数,baseCount是在没有竞争的时候使用。
作为一个高并发的安全集合,如果单单依靠baseCount来统计Map的元素个数,意味着多线程之间修改它需要进行锁或者长时间的自旋,为了避免这种情况的发生。在竞争发生时,我们通过CounterCell[]这个数组来存储,通过数组分片来存储元素的部分数量,降低并发竞争带来的损耗。
代码语言:javascript复制final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; i) {
if ((a = as[i]) != null)
sum = a.value;
}
}
return sum;
}
上述代码就是计算Map中的元素个数的方法,可以看到是将baseCount和counterCells数组元素中的所有值相加。
5. JDK 1.8 ConcurrentHashMap初始化
ConcurrentHashMap初始化过程并不是在构造的时候,而是在第一次进行put操作的时候通过initTable()方法来进行初始化。
代码语言:javascript复制private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl < 0表示Hash桶正在被其他线程初始化,所以该线程就可以让出CPU等待初始化完成。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 当前线程成功获取初始化table的权限,通过把sizeCtl改为-1可以告知其他线程Hash桶正在被初始化
try {
if ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl > 0,初始化table的size就为sizeCtl
// 否则,初始化table的size为16(DEFAULT_CAPACITY)
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算sizeCtl的新值,当Hash桶的数量达到sizeCtl值时就需要进行扩容,该值恒等于当前容量的0.75
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
6. hash算法
代码语言:javascript复制static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
我们在定位Map的key在hash桶的位置的时候是通过以下方式进行定位的:
代码语言:javascript复制key的hash值 & (hash桶的size-1)
为什么需要通过spread来重新计算key的hash值,而不是直接使用key.hashCode作为hash值?
如果只是用key.hashCode() & (hash桶的size-1)来确定元素的位置时,其实只是利用了key.hashCode()低若干位信息,高位信息是无效的,这样会加大hash冲突的概率。为了使key更加分散,减少冲突,我们通过spread方法(异或)可以充分使用key.hashCode()高16位的信息,并且避免了Hash值是负数(&HASH_BITS)。
通过spread方法计算的hash值也会在Node节点中通过hash属性进行记录,便于下一次使用。
7. Unsafe.getObjectVolatile方法
在我们获取table中的元素时我们并没有使用table[i]直接去获取,而是通过getObjectVolatile方法去内存中获取指定的数据?
在Java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,通过Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
8. put操作
put操作采用CAS synchronized实现并发插入或更新操作。当一个Hash桶中的元素个数大于等于8时,需要将链表转换为红黑树。
代码语言:javascript复制final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化HashTable
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// i = (n - 1) & hash计算key在table中的位置
// 如果该位置没有任何元素,将该key-value包装Node存入该位置
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 如果
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; binCount) {
K ek;
// 设置链表的元素
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 设置红黑树元素
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表中的元素超过8,调用treeifyBin方法将链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 检查当前容量是否需要扩容
addCount(1L, binCount);
return null;
}
9. 如何判断扩容
在判断时是否扩容的逻辑需要调用resizeStamp方法:
代码语言:javascript复制static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Integer.numberOfLeadingZeros(n)这里的作用是返回高位所有0的个数,假设n是2(二进制为10),那么该结果为30,假设n为20(二进制为10100),那么结果为27。由于n是数组的长度,最小为16,最大是 1 << 30,也就是Integer.numberOfLeadingZeros(n)的结果最小是1,最大是27。
RESIZE_STAMP_BITS - 1的值为15,(1 << (RESIZE_STAMP_BITS - 1))相当于 1 << 15,也就是10000000 00000000。
最后将两步进行或操作,最终返回的高16位全为0,第16位固定为1,低15位就是1-27之间的值。
在put操作完成后,我们调用addCount方法,该方法里面有一段逻辑会判断是否扩容,如果需要扩容,则进行扩容操作。
代码语言:javascript复制private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 省略其他代码,这里是修改baseCount和counterCells
....
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
// table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据length获取一个标识符
int rs = resizeStamp(n);
// 如果正在扩容
if (sc < 0) {
// 如果sc的低16位不等于标识符(校验异常sizeCtl变化了)
// 如果sc==标识符 1(扩容结束了,不再有线程进行扩容)(默认第一个线程设置sc==rs左移16位 2,当第一个线程结束扩容了,就会将sc减一。这个时候,sc就等于rs 1)
// 如果sc==标识符 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs 1 ||
sc == rs MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 结束循环
break;
// 如果需要帮助扩容,则需要将sizeCtl 1,如果设置成功,则执行扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))
transfer(tab, nt);
}
// 如果没有扩容
// 将sizeCtl更新:标识符左移16位然后 2,也就是变成一个负数,高16位是标识符,低16位初始值为2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) 2))
//开始扩容
transfer(tab, null);
// 再次统计数组元素
s = sumCount();
}
}
}
10. 扩容
ConcurrentHashMap的transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)方法负责完成整个Map的扩容操作。在该方法中,有以下几个重要的方法变量:
- boolean advance:首次推进为 true,如果等于 true,说明需要再次推进一个下标(处理的桶区间)(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
- boolean finishing:是否需要结束扩容操作
- bound:当前线程可以处理的Hash桶区间的最小下标
扩容步骤主要有以下几个操作。
10.1 计算每个线程需要处理的Hash桶个数
代码语言:javascript复制 int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
通过计算CPU核心数和Map中Hash桶的个数得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理16个桶。因此,如果长度是16的时候,扩容的时候只会有一个线程扩容。
10.2 初始化nextTab
代码语言:javascript复制if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
如果nextTab为空(扩容操作未初始化),需要对nextTable和transferIndex初始化,每一次扩容初始化,都会将原来的Hash桶的数组长度扩容成2倍(n << 1)。
10.3 For循环扩容转移操作
在经过计算每个线程处理的桶数和初始化以后,便进入一个死循环,开始真正的扩容转移操作。
代码语言:javascript复制int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
....扩容代码
}
10.3.2 分配桶区间给线程
代码语言:javascript复制Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//判断i-1是否大于等于bound(正常情况下,如果大于bound不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
//如果对i-大于等于bound(还需要继续做任务,或者扩容结束,修改推进状态为false,不能推进了。任务成功后修改推进状态为true。
//通常,第一次进入循环,i--这个判断会无法通过,从而走下面 nextIndex的CAS赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将i-1,然后修改成不可推进。如果i对应的桶处理成功了,改成可以推进。
if (--i >= bound || finishing)
advance = false;
//如果transferIndex小于等于0,说明没有区间了,i改成-1,推进状态变成false,扩容结束了,当前线程可以退出了
// 这个-1会在下面的后续的扩容代码里使用将finishing改为true,然后表示扩容结束
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CAS修改transferIndex,即length-区间值,留下剩余的区间值供后面的线程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
此处的While循环主要是为了分配Hash桶区间给线程,并且确认要处理的Hash桶的下标然后跳出while循环执行扩容转移操作。
10.3.2 扩容转移
扩容转移的具体操作步骤如下:
- 首先判断是否扩容转移结束,如果结束,重置一些协助属性(nextTable),修改sizeCtl的值
- 判断处理的桶是否是空桶,如果是空桶,就使用fwd占位,继续下一个桶的处理
- 判断处理的桶是否已被其他线程处理,如果是其他线程已经在处理,继续下一个桶的处理
- 上述三个条件如果均为false,那么开始处理该桶,执行元素转移
if (i < 0 || i >= n || i n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断sc-2是否等于标识符左移16位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//获取旧的Hash桶i下标位置的节点,如果是 null,就使用fwd占位,推进下一个桶的处理
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//如果节点是hash是MOVED,说明别的线程已经处理过该桶了,不再重复处理,结束转移,推进下一个桶的处理
advance = true; // already processed
else {
//假如代码走到这里,说明这个位置有实际值了,且不是占位符,需要开始进行处理。对这个节点上锁。为什么上锁,防止putVal的时候向链表插入数据。
synchronized (f) {
// 判断i下标处的桶节点是否和f相同,只有相等才开始处理。
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
....链表转移
advance = true;
}
else if (f instanceof TreeBin) {
...树转移
advance = true;
}
}
}
}
11. 红黑树退化为链表
11.1 remove元素时发生退化
红黑树退化为链表发生在我们对ConcurrentHashMap的元素进行移除时,也就是调用其remove方法时,会有下面一段逻辑:
代码语言:javascript复制else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
// 这里判断移除
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
在remove方法中,如果Hash桶中是一个红黑树,则会调用removeTreeNode()方法对元素进行移除,在removeTreeNode中会有如下判断:
代码语言:javascript复制// 该Hash桶中没有元素,退化为链表
if (first == null) {
root = null;
return true;
}
// 红黑树节点过少时,也会退化为链表
if ((r = root) == null || r.right == null || // too small
(rl = r.left) == null || rl.left == null)
return true;
通过判断removeTreeNode方法的返回值,如果为true,我们则需要将该Hash桶中的元素由红黑树修改为链表。
11.2 扩容时发生退化
扩容时每个桶中的元素会发生转移,当某一个桶中的元素数量过少时,将会从红黑树退化为链表,这个元素的数量为固定值6,该段逻辑在扩容transfer方法中,如下:
代码语言:javascript复制else if (f instanceof TreeBin) {
.... 其他扩容逻辑
// UNTREEIFY_THRESHOLD这里是固定值为6
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
.... 其他扩容逻辑
}
12. 学会问自己
由于本篇知识量确实过大,各位读者朋友需要反复研读,反复思考,希望理解ConcurrentHashMap以后可以问一下自己能不能答上以下问题:
- 1.8为什么要选用CAS Synchronized Node来作为整个ConcurrentHashMap基础实现
- sizeCtl属性的作用是什么?
- 为什么需要counterCells和baseCount两个属性来共同计算ConcurrentHashMap的元素数量?
- CourrentHashMap的hash算法有什么优势?
- CourrentHashMap的put操作主要包含哪些流程?
- CourrentHashMap如何判断需要扩容以及扩容操作的流程
- CourrentHashMap的红黑树退化为链表的触发条件
本期的Java ConcurrentHashMap介绍到这,我是shysh95,我们下期再见!!!