一文吃透ConcurrentHashMap的前世与今生

2022-08-23 14:11:23 浏览数 (1)

前言

hello,everyone。上篇博客中介绍了HashMap,本文给大家带来ConcurrentHashMap。

【 一文吃透HashMap的前世与今生】:https://cloud.tencent.com/developer/article/2079820

上文中提到了,HashMap是线程不安全的类,k-v类型数据操作在多线程下推荐使用ConcurrentHashMap。本文将会延续HashMap的解读思路,对ConcurrentHashMap从关键成员变量,核心方法与常见面试点出发,帮助大家深入浅出的理解ConcurrentHashMap这个在java中核心数据结构。

一.ConcurrentHashMap介绍

ConcurrentHashMap与HashMap一样,同样是对K-V类型数据进行操作的数据结构。HashMap中通过modCount来避免多线程下的冲突,但是是通过fast-fail机制解决。无法解决在多线程情况下,对同一个key值进行有序的赋值动作。ConcurrentHashMap在1.7版本中使用segment分段锁的形式控制并发写入,JDK1.8版本的时候使用CAS synchronized关键字控制并发写入。本文将针对JDK1.8介绍ConcurrentHashMap的核心概念。

二.关键概念介绍

ConcurrentHashMap的关键数据结构概念与HashMap是一致的。

  • 数组
  • 线性链表
  • 二叉树
  • 哈希表
  • 哈希冲突

【一问吃透HashMap的前世与今生:https://juejin.cn/post/6959584804375363620】

三.关键成员变量

代码语言:javascript复制
//hashamp的最大容量,2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;

//构造hashmap时,默认初始化容量为16
private static final int DEFAULT_CAPACITY = 16;

//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//默认扩容的扩展因子,当hashmap中的元素个数达到当前容量的75%时,触发扩容
private static final float LOAD_FACTOR = 0.75f;

//ConcurrentHashMap数组节点上链表转换为红黑的的阈值,链表节点达到8个时转换为红黑树【注意:这里不是绝对,切往后看】
static final int TREEIFY_THRESHOLD = 8;

//链表节点数小于6个时,从红黑树转换为链表
static final int UNTREEIFY_THRESHOLD = 6;

//链表转化为红黑的第二个要求,与TREEIFY_THRESHOLD对应,最小的链转树的数组大小
static final int MIN_TREEIFY_CAPACITY = 64;

//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

//记录目标的hash值
static final int MOVED     = -1; 

//目标的红黑树根节点hash值
static final int TREEBIN   = -2; 

//ReservationNode的hash值
static final int RESERVED  = -3;

//获取当前CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();

//存放node的数组
transient volatile Node<K,V>[] table;

/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;

//默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方
transient volatile Node<K,V>[] table;

//默认为null,扩容时新生成的数组,其大小为原数组的两倍
private transient volatile Node<K,V>[] nextTable;

四.关键方法解析

4.1.构造方法

ConcurrentHashMap拥有空参与有参的构造方法。

4.1.1.空参构造方法

代码语言:javascript复制
/**
 * Creates a new, empty map with the default initial table size (16).
 */
public ConcurrentHashMap() {
}

从注释可以看出来,空参构造方法新建一个空的map,初始化大小为默认的16。空参构造的时候,当我们放入第一个元素的时候将会初始化map。我们来看一下put方法中putVal方法

initTable()方法解析

代码语言:javascript复制
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0)
            Thread.yield();
        //如果cas成功,修改sizeCtl变量未-1,表示正在初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                    //table为空,设置一个默认大小的数组
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //设置一个默认扩容大小为12,根据扩容因子0.75得到。因为这里是空参构造方法,所有参数
                    为默认
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

从上的值,空构造方本身没有什么含义,只会初始化一些有默认值的变量,真正的结构数据初始化需要等到第一次put元素进去。

4.1.2.有参构造方法

代码语言:javascript复制
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               //将当前容量设置为1.5倍 1
               tableSizeFor(initialCapacity   (initialCapacity >>> 1)   1));
    this.sizeCtl = cap;
}

容量设置保证为2的倍数。这里一个有一个特别重要的地方,在上面备注中已经指出来,传入的容量在初始化的时候会设置为传入容量的1.5倍 1,这么做的原因是,如果你需要7个元素的map容量,你传入了7,按照常理会设置一个8容量给你,但是你已经知道容量是7,元素个数到了第6个时,将会发生扩容。这里为了避免初始化容量设置不合理,导致后续扩容的耗时操作,则根据0.75f的扩容因子进行了初始化容量的设置。

如果输入7,则容量为16

如果输入15,则容量为32

4.2.put方法

代码语言:javascript复制
public V put(K key, V value) {
    return putVal(key, value, false);
}
代码语言:javascript复制
//onlyIfAbsent参数为,当key值不存在的时候在进行设置
final V putVal(K key, V value, boolean onlyIfAbsent) {
        //这里与HashMap不同,key与value都不允许为空
    if (key == null || value == null) throw new NullPointerException();
    //计算key的hash
    int hash = spread(key.hashCode());
    int binCount = 0;
    //数组遍历
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果为空参构造方法,table参数还未被初始化,则先进行初始化动作
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
            //如果根据hash定位到的数组节点为空则使用cas进行新节点的设置
        else if ((f = tabAt(tab, i = (n - 1) &amp; hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果定位到的hash值为-1,则说明正在扩容,当前线程去帮忙进行扩容操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
                //最后一种情况,说明不是直接在数组上的节点,则遍历链表或者红黑树,遍历时使用synchronized加锁
            V oldVal = null;
            synchronized (f) {
                    //在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改
                if (tabAt(tab, i) == f) {
                        //根据在第二点的值,hash值大于0时为链表节点,则进行链表遍历
                    if (fh >= 0) {
                            //链表节点计数
                        binCount = 1;
                        //链表遍历
                        for (Node<K,V> e = f;;   binCount) {
                            K ek;
                            if (e.hash == hash &amp;&amp;
                                ((ek = e.key) == key ||
                                 (ek != null &amp;&amp; key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //这里判断节点是不是等于TreeBin,如果是,则进行红黑树遍历
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //红黑树设置值
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //计数值如果大于8
            if (binCount != 0) {
                    //计数大于8则链表转换红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                 //如果key值节点已经存在则进行返回
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //容量增加,决定是否扩容
    addCount(1L, binCount);
    return null;
}

put方法是整个ConcurrentHashMap的核心,使用cas synchronized关键字的方式保证了各个节点的线程安全。在这里不知道小伙伴有没有疑问为什么要使用cas synchronized加锁,替换了JDK1.7版本里面Segment ReentrantLock的方式。这里的主要原因是因为锁被细化了,Segment ReentrantLock是分段加锁,锁的冲突相对频率高、而cas synchronized是针对数组节点或者数组上整条链而言的,大大减少了锁的竞争冲突。JDK1.6之后,对synchronized关键字又做了关键的性能提升,做了各种的锁粗化,升级,消除等等一系列优化。

因此在ConcurrentHashMap这么细粒度下加锁,性能是不会比ReentrantLock差的。

synchronized锁升级过程:https://www.jianshu.com/p/704eb56aa52a

4.3.get方法

代码语言:javascript复制
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //hash值计算
    int h = spread(key.hashCode());
    if ((tab = table) != null &amp;&amp; (n = tab.length) > 0 &amp;&amp;
        (e = tabAt(tab, (n - 1) &amp; h)) != null) {
        //如果数组上的节点符合直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null &amp;&amp; key.equals(ek)))
                return e.val;
        }
        //小于0,则进行红黑树遍历,其中find方法点击进入,根据注释的值,将会调用子类TreeBin内部find方法,红黑树遍历查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //链表遍历
        while ((e = e.next) != null) {
            if (e.hash == h &amp;&amp;
                ((ek = e.key) == key || (ek != null &amp;&amp; key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法的逻辑比较简单,全程也没有加锁的动作出现。这里大家应该会有一点疑问,那我在扩容的过程中,节点重新hash了,还能正确遍历吗?这里就要看一下存储数据的Node<K,V>

代码语言:javascript复制
static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
}

node节点的数据都是用volatile关键字进行修饰的,可以从主存中加载到最新的地址信息数据,也就不需要加锁来实现了。

4.4.remove方法

代码语言:javascript复制
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) &amp; hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &amp;&amp;
                                ((ek = e.key) == key ||
                                 (ek != null &amp;&amp; key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null &amp;&amp; cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &amp;&amp;
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null &amp;&amp; cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

remove方法整体上与put方法类似,都是属于写操作,直接定位到数组头结点则直接删除,否则加synchronized关键字进行处理。

4.5.扩容

扩容发生的时机是当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。

触发的入口分析:

1.put方法,塞入元素个数达到扩容的阈值

2.触发了tryPresize。这个动作有两种可能触发,

第一种:但数组有一个单位的链表节点达到了8个,但是数组长度还是小于64的情况,会尝试去扩容。

第二种:则是调用方法,putAll时。

ok,知道了入口,我们逐层方法点击定位到扩容的方法为:transfer

代码语言:javascript复制
private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 多线程扩容,每核处理的量小于16,则强制赋值16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    // nextTab 为空,先实例化一个新的数组
    if (nextTab == null) { 
        try {
            @SuppressWarnings("unchecked")
            // 新数组的大小是原来的两倍
            ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) { 
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 更新成员变量
        nextTable = nextTab;
        // 更新转移下标,就是 老的 tab 的 length
        transferIndex = n;
    }
    // bound :该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
    // advance: 该参数
    int nextn = nextTab.length;
    // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
    ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
    // advance 变量指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
    boolean advance = true;
    // 完成状态,如果是 true,表示扩容结束
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
    for (int i = 0, bound = 0;;) {
        ConcurrentHashMap.Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i   n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            synchronized (f) {
            // 这儿多判断一次,是否为了防止可能出现的remove()操作
                if (tabAt(tab, i) == f) {
                    // 旧链表上该节点的数据,会被分成低位和高位,低位就是在新链表上的位置跟旧链表上一样,
                    // 高位就是在新链表的位置是旧链表位置加上旧链表的长度
                    ConcurrentHashMap.Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh &amp; n;
                        ConcurrentHashMap.Node<K,V> lastRun = f;
                        for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash &amp; n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            // 该节点哈希值与旧链表长度与运算,结果为0,则在低位节点上,反之,在高位节点上
                            if ((ph &amp; n) == 0)
                                ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        // 在nextTable i   n 位置处插上链表
                        setTabAt(nextTab, i   n, hn);
                        // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                        // 红黑树的逻辑跟节点一模一样,最后也会分高位和低位
                        ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                        ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                        ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h &amp; n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                  lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                  hc;
                            }
                        }
                        // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i   n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

五.总结

本文详细介绍了ConcurrentHashMap数据结构的核心逻辑与实现。现做以下总结

1.ConcurrentHashMap初始化容量为2的幂次,空参构造默认容量为16,有参构造容量为大于1.5倍 1参数的最小2的幂次。

2.put方法中使用数组cas自旋设值,链表或者红黑树synchronized关键字加锁设值,根据判断数组上的hash值判断当前节点状态,-1为正在发生扩容,大于0为hash值直接进行值的覆盖,节点类型为TreeBin则为红黑树,否则链表遍历赋值。

3.红黑树转换要求,链表节点数大于等于8,且数组长度大于等于64.

4.get方法不加锁,支持并发,通过volatile关键字保证内存可见性,拿到最新的节点数据。

六.参考

https://juejin.cn/post/6844904018729254926

https://www.jianshu.com/p/cf5e024d9432

https://segmentfault.com/a/1190000021237438

0 人点赞