经过对hashMap的学习,现在我们来学习一下ConcurrentHashMap的机理。我们知道juc包是高并发工具包,按照之前提供的高并发集合,那么ConcurrentHashMap也是用来解决多线程中HashMap的问题。那么高并发中HashMap又有什么问题要让人家特意开发这样一个ConrrentHashMap里?我们知道高并发的本质是将多副本代码的运行,但是数据是公用的,也就是多副本代码多数据的操作要一致。而HashMap也不是内存可见的。那么多线程条件下肯定会有问题。在添加的时候我们知道是通过hashMap是通过key的hash值定位的,如果线程1刚定位到下标,然后线程二也定位到了这个地址然后进行了操作,那么线程1肯定会把线程2的value覆盖掉。除此之外比如在扩容的时候,线程1扩容到一半,需要重新计算下标进行移动的时候发生了阻塞,这时候线程2完成了扩容并将元素移动已经移动了新的地址空间。那么线程1的移动必然会覆盖线程2的移动结果。在jdk1.7中的扩容阶段,会导致死循环,死循环的原因主要是线程1扩容到一半,然后线程2完成了扩容,根据java中数据存储于堆的原则,线程1操作的数据不再是之前的数据。因此从逻辑上说可能会发生死循环。比如线程2移动了,线程1去移动移动的地址可能指向的时候线程2移动的地址。所以会导致死循环,而且还可能会产生数据缺失,缺失的原因是对线程2的覆盖。其基本原因还是因为死循环的产生过程。另外在java8的扩容中,要重新添加元素,如果多线程添加的话,会有一个问题。线程1添加一个元素,容量 1,在判断是否要扩容的时候它挂起了。然后线程2完成了这一步操作,之后线程1再给容量 1,再去判断,也就是两次加1操作的最终结果只是添加1而不是2.所以这样在多线程条件下肯定会有问题。虽然不太明显。但是必然会提高hash冲突的可能性了。
从类结构上看,ConcurrentHashMap的代码还是挺多的。有很多内部类也说明concurrentHashMap的功能比较复杂。当然这些内部类也是专门定制的。从类的属性上看有很多和hashmap类似的属性,但是增加了一些。那么这些增加的又是怎么组织的。还有如何做到高并发情况下数据的一致性?是使用了CAS和volitile吗?带着这些疑问,我们向concurrentHashMap开炮!
代码语言:javascript复制 public ConcurrentHashMap() {
}
//传入concurrentHashMap的容量
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
如果大于最大值得右移一位,那么就取最大值,否则进行扩展两倍,然后再求其最接近的2的次幂数据
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity (initialCapacity >>> 1) 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
默认为16
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
设置一个最小的容量
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 (long)initialCapacity / loadFactor);
设置容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
可以看出初始化的过程和hashMap没啥两样。都是对限制条件的完善。
再添加元素的时候
public V put(K key, V value) {
return putVal(key, value, false);
}
static final int HASH_BITS = 0x7fffffff
//这里的异或运算主要也是为了找到对应的index,因为高位在的都在,地位不一定。所以区分度还是看传入的数据。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null)
//concurrentHashMap的key和value都不允许为空
throw new NullPointerException();
计算hash值,这块与hashmap的好像不一样。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
初始化tab
tab = initTable();
这里的tabAt是我们探索concurrentHashMap的结构的基础。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
找到hash对应的index的数据,这块是CasTabAt(index),大概得意思是使用cas策略来更新这个元素,我们看到concurrentHashMap中好多数据都是用volitile修饰的。
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
设置成功之后,直接中断
}
//意思是数组中没有找到
使用fh作为hash值得缓存
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
这里加锁了,线程安全。可以看出这里的锁是加载链表上的
synchronized (f) {
if (tabAt(tab, i) == f) {
查看两次取到的值是否一样
if (fh >= 0) {
binCount = 1;
f是节点,遍历f的链表
for (Node<K,V> e = f;; binCount) {
K ek;
if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
如果已经存在的话就覆盖
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
否则的话用链表的方式添加到末尾
pred.next = new Node<K,V>(hash, key,value, null);
break;
}
}
}
//如果他的hash值为负数,那么判断是不是tree
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
更新红黑树的值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
如果链表的长度大于8的话就进行转红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
在tabAt方法中,我们看到了熟悉的Unsafe,这块是把下标和Node数组的首地址的偏移量进行计算正式的地址。
代码语言:javascript复制@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) ABASE);
}
这里的AddCount是做什么的?按理需要处理的也做完了。看字面意思应该就是怼binCount进行 1,但是又好像不是这么回事。再说这样有什么意义。但是bincount肯定是链表定的长度了。
代码语言:javascript复制private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
这里check是链表长度
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
sc是扩容上线
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {
新的容量
int rs = resizeStamp(n);
if (sc < 0) {
???
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs 1 ||
sc == rs MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
容量进行 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))
扩容
transfer(tab, nt);
}
扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) 2))
transfer(tab, null);
获取总容量
s = sumCount();
}
}
}
addCount的大概就是扩容了。这块逻辑上看还是挺复杂的
在获取元素的时候。
代码语言:javascript复制 public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
先定位到tab,然后通过tab的hash计算到具体的链表,如果在tab这里找到的话,就直接返回。
if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
如果其hash值小于0,说明是红黑树哦,那么通过红黑树进行查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
否则遍历链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
综合上述:concurrentHashMap是基于分段锁实现的,其中加锁的地方在链表和树节点。如果分三层去看就是将锁添加到最后一层。这样做也是为了让添加节点没有后顾之忧。其他部分与hashMap差异不大。