juc系列-ConcurrentHashMap

2022-05-12 14:27:10 浏览数 (1)

1 概述

ConcurrentHashMap和HashMap一样都是基于散列的容器,ConcurrentHashMap可以认为是一种线程安全HashMap,它使用了一中完全不同的加锁策略提高并发性和伸缩性。 ConcurrentHashMap并不是将每个方法在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为“分段锁”。

2 静态结构

2.1 ConcurrentHashMap主要构件:

ConcurrentHashMap中主要实体类就是三个:

  • ConcurrentHashMap(整个Hash表)
  • Segment(桶)
  • HashEntry(节点)

对应下面的图可以看出之间的关系,静态内部类HashEntry用来封装映射表的键值对,Segment充当锁的角色,每个 Segment 对象维护散列表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

concurrenthashmap.png

下面是ConcurrentHashMap类的声明:

代码语言:javascript复制
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

其中实现了ConcurrentMap接口,定义如下:

代码语言:javascript复制
public interface ConcurrentMap<K, V> extendsMap<K, V> {
 
    V putIfAbsent(K key, V value);
 
    boolean remove(Object key, Object value);
 
    boolean replace(K key, V oldValue, V newValue);
 
    V replace(K key, V value);
}

其中定义了4个常用的复合操作:

V putIfAbsent(K key, V value);

如果没有这个key,则放入这个key-value,返回null,否则返回key对应的value。

boolean remove(Object key, Object value);

移除key和对应的value,如果key对应的不是value,移除失败

boolean replace(K key, V oldValue, V newValue);

替代key对应的值,仅当当前值为旧值

V replace(K key, V value);

替代key对应的值,只要当前有值

2.2 Segment 类

代码语言:javascript复制
static final class Segment<K,V> extends ReentrantLock implements Serializable {
    
        private static final long serialVersionUID = 2249069246763182397L;

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        transient volatile HashEntry<K,V>[] table;

        transient int count;  //本segment中HashEntry的个数

        transient int modCount; //table被更新的次数

        transient int threshold;  //当table中的HashEntry个数超过该值后,需要table再散列

        final float loadFactor; //装载因子

segment结构示意图:

segment.jpg

Segment是一个可重入锁,每个Segment就相当于一个HashMap,是一个数组和链表结构,维持了一个HashEntry数组。

2.3 HashEntry

Map内的键值对是用hashEntry类封装的,其中hash,key,next域都被声明为final属性,value被声明为volatile类型。下面是HashEntry源码:

代码语言:javascript复制
static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;    
        。。。
}

3 动态算法

3.1 init-初始化流程

ConcurrentHashMap初始化:

代码语言:javascript复制
public ConcurrentHashMap(int initialCapacity,
                        float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1; //ssize为segment个数(默认16)
        while (ssize < concurrencyLevel) {//找到最佳匹配参数(不小于concurrencyLevel的最小2次幂)
              sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;    //总容量/segment个数 = 每个segment容量
        if (c * ssize < initialCapacity)    //确保segment个数*segment容量大于总的容量
              c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY; //找到不小于c的最小2次幂,cap最终为每个segment的容量
        while (cap < c) 
            cap <<= 1;
        // create segments and segments[0]
//这里区别于JDK1.6区别的地方,只创建出segments[0],并且引入了UNSAFE类的putOrderedObject()方法,这个方法特点是低延时,速度快。而1.6中循环segments数组,为每个元素创建segment对象。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

为什么segments数组大小必须是2的n次方?

为了能通过按位与操作来定位segments数组的索引值。

segmentShift和segmentMask干什么用滴?

我们知道基于散列的容器是通过元素的hashCode值来确定元素在容器中的索引的,那么ConcurrentHashMap中定位一个元素至少需要两步:

  • 定位segment
  • 定位HashEntry

segmentShift和segmentMask的作用就是用来定位segment元素的,如下:

代码语言:javascript复制
//若ssize=16,即segments数组大小为16,那么sshift=4
this.segmentShift = 32 - sshift;//segmentShift=32-4=28
this.segmentMask = ssize - 1;//掩码segmentMask=15,对应二进制1111,每位上都是1

((hash >>> segmentShift) & segmentMask)//定位segmetns
(hash & (tab.length-1))//定位HashEntry

3.2 service-常用操作分析

3.2.1.get(Object key)

ConcurrentHashMap的get方法主要步骤如下:

  1. 通过key的hash值,确定所属的segment的索引。
代码语言:javascript复制
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT)   SBASE;

默认情况下segmentShift=28,segmentMask=15,u是hash后的值,为32位,h >>> segmentShift) & segmentMask)即让hash值得高4位参与到运算。

  1. 通过UNSAFE类的getObjectVolatile方法获取segment。
代码语言:javascript复制
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
  1. 通过hash值确定HashEntry数组位置,遍历链表找出entry
代码语言:javascript复制
        for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT)   TBASE);
             e != null; e = e.next) {
            K k;
            if((k = e.key) == key || (e.hash == h && key.equals(k)))
                returne.value;
        }

下面是get方法整体源码:

代码语言:javascript复制
publicV get(Object key) {
    Segment<K,V> s;
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT)   SBASE;
    if((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null&&
        (tab = s.table) != null) {
        for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT)   TBASE);
             e != null; e = e.next) {
            K k;
            if((k = e.key) == key || (e.hash == h && key.equals(k)))
                returne.value;
        }
    }
    return null;
}

性能分析:

可以看到整个get方法执行过程中并没有进行同步操作,get操作无需加锁时间复杂度O(1)

3.2.2.put(K key, V value)

put操作步骤:

代码语言:javascript复制
//ConcurrentHashMap的put源码
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)//value不允许为null
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;//定位segment元素
        if ((s = (Segment<K,V>)UNSAFE.getObject 
             (segments, (j << SSHIFT)   SBASE)) == null)
            s = ensureSegment(j);//第一次访问segment时,创建segment对象
        return s.put(key, hash, value, false);//调用segment的put方法
    }
    
    //下面是segment中的put方法:
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        //tryLock()加锁,不成功则执行scanAndLockForPut
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                //下面for循环是进行元素定位即更新操作
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        //key已存在,通过onlyIfAbsent决定是否更新值
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                  modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    //key不存在,构造node节点,通过UNSAFE类插入节点,注意:这里插入是头插法
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);//插入链表头node.next = first;
                        int c = count   1;
                        //是否需要扩容,注意:只对当前segment扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                          modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
需要注意的点:
  • 插入新节点的时候,是头插法。
  • 如果count > threshold需要扩容,那么只对当前segment扩容。
  • 若元素已存在,不一定更新,只有当onlyIfAbsent=false的时候才更新。

在put方法中,首先进行加锁操作,tryLock失败后并不会按照常规思路阻塞当前线程,而是执行scanAndLockForPut方法,下面重点分析下这个方法做了什么工作?

代码语言:javascript复制
//Segment类中的常量MAX_SCAN_RETRIES:最大扫描次数,在scanAndLockForPut方法中会用到
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
            
    //scanAndLockForPut的源码:
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            //定位HashEntry数组位置,获取第一个节点
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1;//扫描次数
            
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // 构造新节点
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (  retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // 首节点有变动,更新first,重新扫描
                    retries = -1;
                }
            }
            return node;
        }

scanAndLockForPut的作用:

  • 当前线程获取不到锁的时候并没有闲着,而是查找key是否已经存在,如果当前链表中没有查到,则新建一个HashEntry对象。
  • 新建HashEntry节点后,当retries <= MAX_SCAN_RETRIES时,不断通过tryLock尝试获取锁,retries > MAX_SCAN_RETRIES,则调用lock(),此时若还获取不到锁,那么当前线程就被阻塞,这点类似于自旋锁。
  • 在检索key的时候,别的线程可能正在对segment进行修改,所以要做如下检查:
代码语言:javascript复制
//每间隔一次循环,检查一次first节点是否改变
if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {
        e = first = f; // 首节点有变动,更新first,重新扫描
        retries = -1;
}

通过scanAndLockForPut方法,当前线程就可以在即使获取不到segment锁的情况下,完成潜在需要添加节点的实例化工作,当获取锁后,就可以直接将该节点插入链表即可。还实现了类似于自旋锁的功能,防止执行put操作的线程频繁阻塞,这些优化都提升了put操作的性能。

3.2.3.全局操作 size()/isEmpty()/clear()

由于分段技术的缘故,ConcurrentHashMap的全局操作(size/isEmpty/clear)就会变得复杂

size操作

要统计ConcurrentHashMap的元素个数,就要将所有segment的元素个数求和。直接累加?肯定不行,在并发环境中,这样得到的结果并不准确。对所有segment加锁再求和?这样做结果肯定正确,但是这违背了ConcurrentHashMap设计的初衷,在并发环境中要有良好的变现。ConcurrentHashMap中size方法实现选择了一个折中方案:

先尝试两次不对Segment加锁方式来统计count值,如果统计过程中count发生变化了,再加锁。如果没有改变,则返回size。

isEmpty操作

isEmpty操作会对segment对象的count进行检查,如果count!=0,那么直接返回false,同时对所有segment的modCount变量进行统计,统计两次的结果如果一致,那么表示容器在此之间没有发生变化,那么返回true.

clear操作

clear操作将容器中的所有元素移除,通过遍历segments数组,将每个segment对象中元素清除,源码如下:

代码语言:javascript复制
    public void clear() {
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length;   j) {
            Segment<K,V> s = segmentAt(segments, j);
            if (s != null)
                s.clear();
        }
    }

调用clear方法后,容器一定被清空了吗?

考虑这么一种情况,当A线程执行clear方法时,已经将segments[0]对象清空了,此时B线程执行put(key,value)方法,如果key散列到segments[0]上,那么A执行完后容器中还有元素!所以clear方法是弱一致的

ConcurrentHashMap弱一致性问题?

上面分析clear方法的若一致性,其实size()/isEmpty()/clear()都是在无锁清空下执行的,这带来的后果就是数据的不一致。所以在选择容器的时候要注意,如果非常强调全局方法的实时性和一致性,那么ConcurrentHashMap并不是一个好选择。

0 人点赞