JDK 7 ConcurrentHashMap源码解读

2021-01-29 11:09:57 浏览数 (1)

原理分析

HashMap存在并发问题,jdk有提供HashTable,这个HashTable是对HashMap中的所以方法加锁以达到线程安全,但是,这种方式会使得性能下降,看下面的图,假如有两个线程分别要put kk3和kk4,第一个线程最快,它对kk3进行put操作,这时另一个线程要put kk4就要等待,问题是,这两个元素所要put的位置,互不相干,但还是需要等待,这造成了一种资源浪费,所以才会出现ConcurrentHashMap,它分段式加锁,就能很大程度上避免下面的情况。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1YhqJdMs-1594129846423)

下面的问题就是如何分段?

在ConcurrentHashMap中

它有这样一个结构:

ConcurrentHashMap

----Segment[]

--------HashEntry[]

在ConcurrentHashMap中有最上层的数组segment,segment里每一个元素里还有一个HashEntry,图示如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xaUVjUFw-1594129846425)

按结构来看,就像是segment将HashEntry进行了分段,每段都有相同个数的HashEntry。

源码

它有这些个属性:

代码语言:javascript复制
 	// 默认初始大小
	static final int DEFAULT_INITIAL_CAPACITY = 16;
	// 默认加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
	// 默认的并发级别
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
	// 最大的容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
	// 每段的最小容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
	// 最大段数
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
	// 重试次数
    static final int RETRIES_BEFORE_LOCK = 2;
	// hash种子
    private transient final int hashSeed = randomHashSeed(this);

    final int segmentMask;

    final int segmentShift;

    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;
代码语言:javascript复制
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 检查初始化值是否异常
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
     // 判断并发级别是否最大值
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 这里计算ssize和hashmap里的很像,就是找大于等于并发级别的2次方式数
    // 比如,16,那么这个ssize就是16,如果是17,就会是32
    
    // sshift下面再说
    while (ssize < concurrencyLevel) {
          sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 向上取整,计算,分段下的数组大小
    // 如果,initialCapacity=17,ssize=16,就是我们想让数组初始化大小为17时,c=1
   
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
          c;
     // (默认最小大小)MIN_SEGMENT_TABLE_CAPACITY=2
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 如果计算后的值大于最小值就左移(*2);当c=1,cap=2;c=3,cap=4;c=5,cap=8
    while (cap < c)
        cap <<= 1;
    // 创建segment数组,参数(加载因子,阈值,内部数组tab)
    // 原型参照对象
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

在构造器中为什么要创建这个s0呢??? 其实这类似一个原型的意思,打个比方,在外层segment数组下,需要一个HashEntry数组,那么每次去添加的时候,都要new HashEntry,那么是不是都需要再计算一次刚刚的步骤呢?没有必要,所以创建这个s0作为一个原型参照。

需要注意的是,创建好segment后,segment的大小就不会变了,变的是segment里的HashEntry

来看put方法

代码语言:javascript复制
 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
     // 计算下标
        int j = (hash >>> segmentShift) & segmentMask;
     // 判断计算的下标处,segment元素是否null,并获取segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT)   SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

计算下标,注意segmentShiftsegmentMask

代码语言:javascript复制
   int j = (hash >>> segmentShift) & segmentMask;

segmentShiftsegmentMask这两个值的计算在构造方法中,这里截出一段。

假设concurrencyLevel = 16,走下面代码

代码语言:javascript复制
	int sshift = 0;
    int ssize = 1;
    
    // 1 < 16 => ssize = 2,sshift = 1
	// 2 < 16 => ssize = 4,sshift = 2
	// 4 < 16 => ssize = 8,sshift = 3
	// 8 < 16 => ssize = 16,sshift = 4
	// 16 < 16 退出循环
    while (ssize < concurrencyLevel) {
          sshift;
        ssize <<= 1;
    }
	// 32 - 4 = 28
	// 不懂这里为什么要32 - 4(待补充)
    this.segmentShift = 32 - sshift;
	// 16 - 1 = 15 => 0000 0000 0000 0000 0000 0000 0000 1111
    this.segmentMask = ssize - 1;

那么hash >>> segmentShift就是右移28位,就会得到4位随机有效值:

代码语言:javascript复制
hash =>

1010 1010 1010 1010 1010 1010 1010 1010

hash >>> 28

0000 0000 0000 0000 0000 0000 0000 1010

segmentMask =>

0000 0000 0000 0000 0000 0000 0000 1111

(hash >>> segmentShift) & segmentMask =>

0000 0000 0000 0000 0000 0000 0000 1010

这里的计算是为了使数组随机散列的更均匀。

然后是接下去的一句,意思是取segments数组的(j << SSHIFT) SBASE个位置

代码语言:javascript复制
UNSAFE.getObject(segments, (j << SSHIFT)   SBASE)

查看SSHIFT发现它的值在静态代码快里定义好了:31 - Integer.numberOfLeadingZeros(ss),那么上面的表达式可以转为:

(j << 31 - Integer.numberOfLeadingZeros(ss)) SBASE

代码语言:javascript复制
 static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            // 获取数组基本偏移量
            TBASE = UNSAFE.arrayBaseOffset(tc);
            SBASE = UNSAFE.arrayBaseOffset(sc);
            // 获取元素间隔比例
            ts = UNSAFE.arrayIndexScale(tc);
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
            throw new Error("data type scale not a power of two");
     // 取高位前面的0的个数
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }

UNSAFE可以定位到对象属性位置,也可以修改值,但是他需要偏移量。

那么Integer.numberOfLeadingZeros(ss)又表示什么?

它表示的是高位前面的0的个数,比如8的二进制是0000 0000 0000 0000 0000 0000 0000 1000

28 = Integer.numberOfLeadingZeros(8)

然而(j << 31 - Integer.numberOfLeadingZeros(ss)) SBASE = j * ss和hashmap中计算下标一样。

补充证明过程;

如果,在计算的下标处,元素为null,就会进入ensureSegment方法。

因为会出现多个线程会在同一个位置申请空间,所以这里出现很多unsafe操作,确保只有一个线程能够进行赋值

代码语言:javascript复制
 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT)   SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            // 获取原型元素
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
   
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            // cap初始化大小,看上面几行代码,可以知道,它是获取的是原型元素ss[0]的,没有去计算,计算就和上面说过的一样,是在构造函数中初始化好的,以ss[0]作为原型去创建。
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            // 再次判断还是不是空的,
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                // 如果是空的,就生成一个新的
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                // 通过cas操作,将生成的放入
                // 这里while是以为可能存在多个线程,第一个线程更新中,但第二个线程也同样操作,但第二个操作更快,那么cas操作失败,循环,再次判断是否==null,这种写法比较严谨
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

最后进入segment的put方法

代码语言:javascript复制
 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                 // 这里和hashmap中的一样,
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                // 这里是获取hashEntry的头结点,不是第一个元素
                HashEntry<K,V> first = entryAt(tab, index);
                // 这里就有两种情况,实现的原理就和hashmap一样。
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            // key重复就覆盖,并返回旧值
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                  modCount;
                            }
                            break;
                        }
                        // 没有重复的就下一个节点,再次循环
                        e = e.next;
                    }
                    else {
                        // 如果已经new出node,就直接设置节点
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count   1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 扩容
                            rehash(node);
                        else
                            // 利用UNSAFE在指定位置设置值
                            setEntryAt(tab, index, node);
                          modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

中间的for有两种情况

1.头尾null

2.头不为空

它继承了reetrantlock,下面这个方法是为了加锁的,最后他会返回new的node,然而在上一层方法中他也会进行判断和new node,注意这里的处理方式,如果一个while循环里什么都不干,就空转,那会使CPU占用率高,在循环里做一些操作,使它循环的没那么快(这一知识点可以百度看一下),提高性能,既然要做一些操作,那么就可以去new 一个Entry。

代码语言:javascript复制
 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
     // 循环尝试加锁
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                // 这一串是目的是为了创建node,retries表示是否需要new这个对象
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                // 当达到一定次数就加锁
                else if (  retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                // 每一次重试都检查一次链表有没有发生变化,如果第一个节点发生变化,就重新遍历一下
                else if ((retries & 1) == 0 &&
                         // 获取当前数组的头结点位置
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

扩容方法

代码语言:javascript复制
 private void rehash(HashEntry<K,V> node) {
 
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i  ) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    // 新数组下标
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        // 如果下一个为空,就单个node,就放到新节点上
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        // 这一串:遍历hashEntry,判断下一个元素的下标是否相等,
                        // 不相等,则记录下一个节点的下标和对象,(补充画图)一直遍历
                        // 其结果就是记录hashEntry最后key相同的元素,
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            // 计算新的下标,
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // 把记录的最后的元素放到新数组
                        newTable[lastIdx] = lastRun;
                        // 头插法,遍历hashEntry,除刚刚记录的最后的元素外,全部通过头插法加入到新数组
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
     // 将我们新的添加对象放到新数组里
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

0 人点赞