原理分析
HashMap存在并发问题,jdk有提供HashTable,这个HashTable是对HashMap中的所以方法加锁以达到线程安全,但是,这种方式会使得性能下降,看下面的图,假如有两个线程分别要put kk3和kk4,第一个线程最快,它对kk3进行put操作,这时另一个线程要put kk4就要等待,问题是,这两个元素所要put的位置,互不相干,但还是需要等待,这造成了一种资源浪费,所以才会出现ConcurrentHashMap,它分段式加锁,就能很大程度上避免下面的情况。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1YhqJdMs-1594129846423)
下面的问题就是如何分段?
在ConcurrentHashMap中
它有这样一个结构:
ConcurrentHashMap
----Segment[]
--------HashEntry[]
在ConcurrentHashMap中有最上层的数组segment,segment里每一个元素里还有一个HashEntry,图示如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xaUVjUFw-1594129846425)
按结构来看,就像是segment将HashEntry进行了分段,每段都有相同个数的HashEntry。
源码
它有这些个属性:
代码语言:javascript复制 // 默认初始大小
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默认的并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 最大的容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 每段的最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 最大段数
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
// 重试次数
static final int RETRIES_BEFORE_LOCK = 2;
// hash种子
private transient final int hashSeed = randomHashSeed(this);
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
代码语言:javascript复制public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 检查初始化值是否异常
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 判断并发级别是否最大值
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 这里计算ssize和hashmap里的很像,就是找大于等于并发级别的2次方式数
// 比如,16,那么这个ssize就是16,如果是17,就会是32
// sshift下面再说
while (ssize < concurrencyLevel) {
sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 向上取整,计算,分段下的数组大小
// 如果,initialCapacity=17,ssize=16,就是我们想让数组初始化大小为17时,c=1
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
c;
// (默认最小大小)MIN_SEGMENT_TABLE_CAPACITY=2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 如果计算后的值大于最小值就左移(*2);当c=1,cap=2;c=3,cap=4;c=5,cap=8
while (cap < c)
cap <<= 1;
// 创建segment数组,参数(加载因子,阈值,内部数组tab)
// 原型参照对象
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
在构造器中为什么要创建这个s0呢??? 其实这类似一个原型的意思,打个比方,在外层segment数组下,需要一个HashEntry数组,那么每次去添加的时候,都要new HashEntry,那么是不是都需要再计算一次刚刚的步骤呢?没有必要,所以创建这个s0作为一个原型参照。
需要注意的是,创建好segment后,segment的大小就不会变了,变的是segment里的HashEntry
来看put方法
代码语言:javascript复制 public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 计算下标
int j = (hash >>> segmentShift) & segmentMask;
// 判断计算的下标处,segment元素是否null,并获取segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
计算下标,注意segmentShift
和segmentMask
int j = (hash >>> segmentShift) & segmentMask;
segmentShift
和segmentMask
这两个值的计算在构造方法中,这里截出一段。
假设concurrencyLevel = 16,走下面代码
代码语言:javascript复制 int sshift = 0;
int ssize = 1;
// 1 < 16 => ssize = 2,sshift = 1
// 2 < 16 => ssize = 4,sshift = 2
// 4 < 16 => ssize = 8,sshift = 3
// 8 < 16 => ssize = 16,sshift = 4
// 16 < 16 退出循环
while (ssize < concurrencyLevel) {
sshift;
ssize <<= 1;
}
// 32 - 4 = 28
// 不懂这里为什么要32 - 4(待补充)
this.segmentShift = 32 - sshift;
// 16 - 1 = 15 => 0000 0000 0000 0000 0000 0000 0000 1111
this.segmentMask = ssize - 1;
那么hash >>> segmentShift
就是右移28位,就会得到4位随机有效值:
hash =>
1010 1010 1010 1010 1010 1010 1010 1010
hash >>> 28
0000 0000 0000 0000 0000 0000 0000 1010
segmentMask =>
0000 0000 0000 0000 0000 0000 0000 1111
(hash >>> segmentShift) & segmentMask =>
0000 0000 0000 0000 0000 0000 0000 1010
这里的计算是为了使数组随机散列的更均匀。
然后是接下去的一句,意思是取segments数组的(j << SSHIFT) SBASE
个位置
UNSAFE.getObject(segments, (j << SSHIFT) SBASE)
查看SSHIFT发现它的值在静态代码快里定义好了:31 - Integer.numberOfLeadingZeros(ss)
,那么上面的表达式可以转为:
(j << 31 - Integer.numberOfLeadingZeros(ss)) SBASE
代码语言:javascript复制 static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
// 获取数组基本偏移量
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
// 获取元素间隔比例
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("hashSeed"));
SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentShift"));
SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentMask"));
SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segments"));
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
// 取高位前面的0的个数
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
UNSAFE可以定位到对象属性位置,也可以修改值,但是他需要偏移量。
那么Integer.numberOfLeadingZeros(ss)
又表示什么?
它表示的是高位前面的0的个数,比如8的二进制是0000 0000 0000 0000 0000 0000 0000 1000
28 = Integer.numberOfLeadingZeros(8)
然而(j << 31 - Integer.numberOfLeadingZeros(ss)) SBASE = j * ss
和hashmap中计算下标一样。
补充证明过程;
如果,在计算的下标处,元素为null,就会进入ensureSegment
方法。
因为会出现多个线程会在同一个位置申请空间,所以这里出现很多unsafe操作,确保只有一个线程能够进行赋值
代码语言:javascript复制 private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 获取原型元素
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// cap初始化大小,看上面几行代码,可以知道,它是获取的是原型元素ss[0]的,没有去计算,计算就和上面说过的一样,是在构造函数中初始化好的,以ss[0]作为原型去创建。
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次判断还是不是空的,
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
// 如果是空的,就生成一个新的
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 通过cas操作,将生成的放入
// 这里while是以为可能存在多个线程,第一个线程更新中,但第二个线程也同样操作,但第二个操作更快,那么cas操作失败,循环,再次判断是否==null,这种写法比较严谨
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
最后进入segment的put方法
代码语言:javascript复制 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 这里和hashmap中的一样,
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 这里是获取hashEntry的头结点,不是第一个元素
HashEntry<K,V> first = entryAt(tab, index);
// 这里就有两种情况,实现的原理就和hashmap一样。
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
// key重复就覆盖,并返回旧值
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
modCount;
}
break;
}
// 没有重复的就下一个节点,再次循环
e = e.next;
}
else {
// 如果已经new出node,就直接设置节点
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 扩容
rehash(node);
else
// 利用UNSAFE在指定位置设置值
setEntryAt(tab, index, node);
modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
中间的for有两种情况
1.头尾null
2.头不为空
它继承了reetrantlock,下面这个方法是为了加锁的,最后他会返回new的node,然而在上一层方法中他也会进行判断和new node,注意这里的处理方式,如果一个while循环里什么都不干,就空转,那会使CPU占用率高,在循环里做一些操作,使它循环的没那么快(这一知识点可以百度看一下),提高性能,既然要做一些操作,那么就可以去new 一个Entry。
代码语言:javascript复制 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循环尝试加锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
// 这一串是目的是为了创建node,retries表示是否需要new这个对象
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
// 当达到一定次数就加锁
else if ( retries > MAX_SCAN_RETRIES) {
lock();
break;
}
// 每一次重试都检查一次链表有没有发生变化,如果第一个节点发生变化,就重新遍历一下
else if ((retries & 1) == 0 &&
// 获取当前数组的头结点位置
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
扩容方法
代码语言:javascript复制 private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i ) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 新数组下标
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 如果下一个为空,就单个node,就放到新节点上
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 这一串:遍历hashEntry,判断下一个元素的下标是否相等,
// 不相等,则记录下一个节点的下标和对象,(补充画图)一直遍历
// 其结果就是记录hashEntry最后key相同的元素,
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
// 计算新的下标,
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 把记录的最后的元素放到新数组
newTable[lastIdx] = lastRun;
// 头插法,遍历hashEntry,除刚刚记录的最后的元素外,全部通过头插法加入到新数组
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将我们新的添加对象放到新数组里
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}