爆肝ConcurrentHashMap

2021-05-11 15:16:46 浏览数 (1)

Hi~朋友,不点蓝字关注,我们哪来故事?

特别说明:除第一小节以外,其他均都是以JDK 1.8的ConcurrentHashMap进行分析,本文信息量略大,每一份坚持都是值得被尊重的,希望你可以坚持读完这篇文章,也希望这篇文章对各位读者朋友有所帮助。

摘要

  1. JDK 1.7 CourrentHashMap实现
  2. 为什么放弃分段锁
  3. JDK 1.8 CourrentHashMap实现
  4. ConcurrentHashMap数据结构
  5. ConcurrentHashMap初始化
  6. ConcurrentHashMap的hash算法
  7. Unsafe.getObjectVolatile方法
  8. ConcurrentHashMap的put操作
  9. ConcurrentHashMap如何判断扩容
  10. ConcurrentHashMap扩容
  11. 红黑树退化为链表
  12. 学会问自己

1. JDK 1.7 CourrentHashMap实现

JDK1.7 CourrentHashMap采用分段锁的机制来保证线程安全,底层的数据结构是数组 链表的存储方式。

JDK1.7 中的CourrentHashMap内部有两个关键类,一个是Segement,另一个是HashEntry。

  • Segement继承ReentrantLock来充当锁,每个Segement对象保护若干个Hash桶的线程安全
  • HashEntry用来保存键值对
  • 每个桶是由若干个HashEntry构成的链表

2. JDK 1.8 为什么放弃分段锁

分段锁的优势是不必对所有Hash桶进行安全锁定,只有两个线程在操作同一个Segement中的某些Hash桶时才会有冲突进行线程安全同步。

JDK1.8放弃分段锁的原因是:

  • 分段锁在Segement中的元素越来越多时,锁的粒度会越来越大,分段锁的性能会下降

3. JDK 1.8 ConcurrentHashMap实现

JDK1.8 ConcurrentHashMap底层的数据结构采用的是数组 链表 红黑树的存储结构,并且放弃了分段锁,利用CAS Synchronized来保证线程安全。

4. JDK 1.8 ConcurrentHashMap数据结构

4.1 Node内部类

代码语言:javascript复制
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key   "="   val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

}

Node是数组或者链表或者红黑树的一个节点,代表着每一个KEY-VALUE元素, next是链表中的下一个节点元素。

4.2 TreeNode内部类

代码语言:javascript复制
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
}

TreeNode继承自Node,是红黑树中的节点。TreeNode会被包装在TreeBin中,由TreeBin完成红黑树的构造。

4.3 TreeBin内部类

代码语言:javascript复制
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
}

TreeBin继承自Node,Concurrent的table中如果是红黑树的形式,存储的是TreeBin并不是TreeNode,但TreeBin并不是红黑树的存储节点,TreeBin通过root属性来维护红黑树的根节点。

红黑树在旋转的时候根节点可能会被原来的子节点替换掉,在这个时间点,如果有其他线程要修改红黑树则会发生线程安全问题,所以TreeBin中使用waiter属性来表明正在操作红黑树的线程,防止其他线程进入

4.4 ForwardingNode内部类

代码语言:javascript复制
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

ForwardingNode是一个特殊的节点,他的hash值为-1,使用nextTable存储下一个节点的引用,他通常在扩容的时候起作用,在发生扩容时,他会被当做占位符放在table中,该节点的key、value都为null。

4.5 volatile Node<K,V>[] table内部属性

Hash桶,存储了HashMap的元素,每一项的元素就有可能是普通的Node(链表),也有可能是红黑树(TreeBin),默认为null,初始化发生在第一次插入操作,大小默认是16,扩容时大小是2的幂次方

4.6 volatile Node<K,V>[] nextTable内部属性

默认为null,扩容的时候使用的新数组,大小是原数组的两倍。

4.7 volatile int sizeCtl

默认为0,用于控制table的初始化和扩容。

  • -1代表正在初始化
  • -N表示有N-1个线程在进行扩容操作
  • 正数或0表示Hash桶还没被初始化,这个数值表示初始化或下一次扩容时的大小,大小始终为table容量的0.75

4.8 CounterCell[] counterCells & volatile long baseCount

baseCount和counterCells共同协作用来计算Map中的元素个数,baseCount是在没有竞争的时候使用。

作为一个高并发的安全集合,如果单单依靠baseCount来统计Map的元素个数,意味着多线程之间修改它需要进行锁或者长时间的自旋,为了避免这种情况的发生。在竞争发生时,我们通过CounterCell[]这个数组来存储,通过数组分片来存储元素的部分数量,降低并发竞争带来的损耗。

代码语言:javascript复制
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length;   i) {
            if ((a = as[i]) != null)
                sum  = a.value;
        }
    }
    return sum;
}

上述代码就是计算Map中的元素个数的方法,可以看到是将baseCount和counterCells数组元素中的所有值相加。

5. JDK 1.8 ConcurrentHashMap初始化

ConcurrentHashMap初始化过程并不是在构造的时候,而是在第一次进行put操作的时候通过initTable()方法来进行初始化。

代码语言:javascript复制
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果sizeCtl < 0表示Hash桶正在被其他线程初始化,所以该线程就可以让出CPU等待初始化完成。
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // 当前线程成功获取初始化table的权限,通过把sizeCtl改为-1可以告知其他线程Hash桶正在被初始化
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 如果sizeCtl > 0,初始化table的size就为sizeCtl
                    // 否则,初始化table的size为16(DEFAULT_CAPACITY)
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 计算sizeCtl的新值,当Hash桶的数量达到sizeCtl值时就需要进行扩容,该值恒等于当前容量的0.75
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

6. hash算法

代码语言:javascript复制
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

我们在定位Map的key在hash桶的位置的时候是通过以下方式进行定位的:

代码语言:javascript复制
key的hash值 & (hash桶的size-1)

为什么需要通过spread来重新计算key的hash值,而不是直接使用key.hashCode作为hash值?

如果只是用key.hashCode() & (hash桶的size-1)来确定元素的位置时,其实只是利用了key.hashCode()低若干位信息,高位信息是无效的,这样会加大hash冲突的概率。为了使key更加分散,减少冲突,我们通过spread方法(异或)可以充分使用key.hashCode()高16位的信息,并且避免了Hash值是负数(&HASH_BITS)。

通过spread方法计算的hash值也会在Node节点中通过hash属性进行记录,便于下一次使用。

7. Unsafe.getObjectVolatile方法

在我们获取table中的元素时我们并没有使用table[i]直接去获取,而是通过getObjectVolatile方法去内存中获取指定的数据?

Java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,通过Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

8. put操作

put操作采用CAS synchronized实现并发插入或更新操作。当一个Hash桶中的元素个数大于等于8时,需要将链表转换为红黑树。

代码语言:javascript复制
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 初始化HashTable
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // i = (n - 1) & hash计算key在table中的位置
            // 如果该位置没有任何元素,将该key-value包装Node存入该位置
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;;   binCount) {
                            K ek;
                            // 设置链表的元素
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 设置红黑树元素
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 如果链表中的元素超过8,调用treeifyBin方法将链表转换为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 检查当前容量是否需要扩容
    addCount(1L, binCount);
    return null;
}

9. 如何判断扩容

在判断时是否扩容的逻辑需要调用resizeStamp方法:

代码语言:javascript复制
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros(n)这里的作用是返回高位所有0的个数,假设n是2(二进制为10),那么该结果为30,假设n为20(二进制为10100),那么结果为27。由于n是数组的长度,最小为16,最大是 1 << 30,也就是Integer.numberOfLeadingZeros(n)的结果最小是1,最大是27。

RESIZE_STAMP_BITS - 1的值为15,(1 << (RESIZE_STAMP_BITS - 1))相当于 1 << 15,也就是10000000 00000000。

最后将两步进行或操作,最终返回的高16位全为0,第16位固定为1,低15位就是1-27之间的值。

在put操作完成后,我们调用addCount方法,该方法里面有一段逻辑会判断是否扩容,如果需要扩容,则进行扩容操作。

代码语言:javascript复制
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    
    // 省略其他代码,这里是修改baseCount和counterCells
    ....
    
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
        // table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
             
            // 根据length获取一个标识符
            int rs = resizeStamp(n);
            // 如果正在扩容
            if (sc < 0) {
                // 如果sc的低16位不等于标识符(校验异常sizeCtl变化了)
                // 如果sc==标识符 1(扩容结束了,不再有线程进行扩容)(默认第一个线程设置sc==rs左移16位 2,当第一个线程结束扩容了,就会将sc减一。这个时候,sc就等于rs 1)
                // 如果sc==标识符 65535(帮助线程数已经达到最大)
                // 如果 nextTable == null(结束扩容了)
                // 如果 transferIndex <= 0 (转移状态变化了)
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs   1 ||
                    sc == rs   MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    // 结束循环
                    break;
                // 如果需要帮助扩容,则需要将sizeCtl 1,如果设置成功,则执行扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc   1))
                    transfer(tab, nt);
            }
            // 如果没有扩容
            // 将sizeCtl更新:标识符左移16位然后 2,也就是变成一个负数,高16位是标识符,低16位初始值为2
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT)   2))
                //开始扩容
                transfer(tab, null);
            // 再次统计数组元素
            s = sumCount();
        }
    }
}

10. 扩容

ConcurrentHashMap的transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)方法负责完成整个Map的扩容操作。在该方法中,有以下几个重要的方法变量:

  • boolean advance:首次推进为 true,如果等于 true,说明需要再次推进一个下标(处理的桶区间)(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
  • boolean finishing:是否需要结束扩容操作
  • bound:当前线程可以处理的Hash桶区间的最小下标

扩容步骤主要有以下几个操作。

10.1 计算每个线程需要处理的Hash桶个数

代码语言:javascript复制
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

通过计算CPU核心数和Map中Hash桶的个数得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理16个桶。因此,如果长度是16的时候,扩容的时候只会有一个线程扩容。

10.2 初始化nextTab

代码语言:javascript复制
if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}

如果nextTab为空(扩容操作未初始化),需要对nextTable和transferIndex初始化,每一次扩容初始化,都会将原来的Hash桶的数组长度扩容成2倍(n << 1)。

10.3 For循环扩容转移操作

在经过计算每个线程处理的桶数和初始化以后,便进入一个死循环,开始真正的扩容转移操作。

代码语言:javascript复制
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
    ....扩容代码
}

10.3.2 分配桶区间给线程

代码语言:javascript复制
Node<K,V> f; int fh;
while (advance) {
    int nextIndex, nextBound;
    //判断i-1是否大于等于bound(正常情况下,如果大于bound不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
    //如果对i-大于等于bound(还需要继续做任务,或者扩容结束,修改推进状态为false,不能推进了。任务成功后修改推进状态为true。
    //通常,第一次进入循环,i--这个判断会无法通过,从而走下面 nextIndex的CAS赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将i-1,然后修改成不可推进。如果i对应的桶处理成功了,改成可以推进。
    if (--i >= bound || finishing)
        advance = false;
    //如果transferIndex小于等于0,说明没有区间了,i改成-1,推进状态变成false,扩容结束了,当前线程可以退出了
     // 这个-1会在下面的后续的扩容代码里使用将finishing改为true,然后表示扩容结束
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    // CAS修改transferIndex,即length-区间值,留下剩余的区间值供后面的线程使用
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        //
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}

此处的While循环主要是为了分配Hash桶区间给线程,并且确认要处理的Hash桶的下标然后跳出while循环执行扩容转移操作。

10.3.2 扩容转移

扩容转移的具体操作步骤如下:

  1. 首先判断是否扩容转移结束,如果结束,重置一些协助属性(nextTable),修改sizeCtl的值
  2. 判断处理的桶是否是空桶,如果是空桶,就使用fwd占位,继续下一个桶的处理
  3. 判断处理的桶是否已被其他线程处理,如果是其他线程已经在处理,继续下一个桶的处理
  4. 上述三个条件如果均为false,那么开始处理该桶,执行元素转移
代码语言:javascript复制
if (i < 0 || i >= n || i   n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        //判断sc-2是否等于标识符左移16位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}
else if ((f = tabAt(tab, i)) == null)
    //获取旧的Hash桶i下标位置的节点,如果是 null,就使用fwd占位,推进下一个桶的处理
    advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
    //如果节点是hash是MOVED,说明别的线程已经处理过该桶了,不再重复处理,结束转移,推进下一个桶的处理
    advance = true; // already processed
else {
    //假如代码走到这里,说明这个位置有实际值了,且不是占位符,需要开始进行处理。对这个节点上锁。为什么上锁,防止putVal的时候向链表插入数据。
    synchronized (f) {
        //  判断i下标处的桶节点是否和f相同,只有相等才开始处理。
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            if (fh >= 0) {
                ....链表转移
                advance = true;
            }
            else if (f instanceof TreeBin) {
                ...树转移
                advance = true;
            }
        }
    }
}

11. 红黑树退化为链表

11.1 remove元素时发生退化

红黑树退化为链表发生在我们对ConcurrentHashMap的元素进行移除时,也就是调用其remove方法时,会有下面一段逻辑:

代码语言:javascript复制
else if (f instanceof TreeBin) {
    validated = true;
    TreeBin<K,V> t = (TreeBin<K,V>)f;
    TreeNode<K,V> r, p;
    if ((r = t.root) != null &&
        (p = r.findTreeNode(hash, key, null)) != null) {
        V pv = p.val;
        if (cv == null || cv == pv ||
            (pv != null && cv.equals(pv))) {
            oldVal = pv;
            if (value != null)
                p.val = value;
            // 这里判断移除
            else if (t.removeTreeNode(p))
                setTabAt(tab, i, untreeify(t.first));
        }
    }
}

在remove方法中,如果Hash桶中是一个红黑树,则会调用removeTreeNode()方法对元素进行移除,在removeTreeNode中会有如下判断:

代码语言:javascript复制
// 该Hash桶中没有元素,退化为链表
if (first == null) {
    root = null;
    return true;
}
// 红黑树节点过少时,也会退化为链表
if ((r = root) == null || r.right == null || // too small
    (rl = r.left) == null || rl.left == null)
    return true;

通过判断removeTreeNode方法的返回值,如果为true,我们则需要将该Hash桶中的元素由红黑树修改为链表。

11.2 扩容时发生退化

扩容时每个桶中的元素会发生转移,当某一个桶中的元素数量过少时,将会从红黑树退化为链表,这个元素的数量为固定值6,该段逻辑在扩容transfer方法中,如下:

代码语言:javascript复制
else if (f instanceof TreeBin) {
    .... 其他扩容逻辑
    // UNTREEIFY_THRESHOLD这里是固定值为6
    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
        (hc != 0) ? new TreeBin<K,V>(lo) : t;
    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
        (lc != 0) ? new TreeBin<K,V>(hi) : t;
    .... 其他扩容逻辑
}

12. 学会问自己

由于本篇知识量确实过大,各位读者朋友需要反复研读,反复思考,希望理解ConcurrentHashMap以后可以问一下自己能不能答上以下问题:

  1. 1.8为什么要选用CAS Synchronized Node来作为整个ConcurrentHashMap基础实现
  2. sizeCtl属性的作用是什么?
  3. 为什么需要counterCells和baseCount两个属性来共同计算ConcurrentHashMap的元素数量?
  4. CourrentHashMap的hash算法有什么优势?
  5. CourrentHashMap的put操作主要包含哪些流程?
  6. CourrentHashMap如何判断需要扩容以及扩容操作的流程
  7. CourrentHashMap的红黑树退化为链表的触发条件

本期的Java ConcurrentHashMap介绍到这,我是shysh95,我们下期再见!!!

0 人点赞