1.8 的ConcurrentHashMap要得不

2022-11-02 15:36:35 浏览数 (1)

Unsafe


JAVA中神奇的双刃剑--Unsafe - throwable - 博客园

使用Unsafe要注意以下几个问题:

  • 1、Unsafe有可能在未来的Jdk版本移除或者不允许Java应用代码使用,这一点可能导致使用了Unsafe的应用无法运行在高版本的Jdk。
  • 2、Unsafe的不少方法中必须提供原始地址(内存地址)和被替换对象的地址,偏移量要自己计算,一旦出现问题就是JVM崩溃级别的异常,会导致整个JVM实例崩溃,表现为应用程序直接crash掉。
  • 3、Unsafe提供的直接内存访问的方法中使用的内存不受JVM管理(无法被GC),需要手动管理,一旦出现疏忽很有可能成为内存泄漏的源头。

JAVA 1.8


构造方法

在1.8中它的5个构造方法

代码语言:javascript复制
// 空构造 默认长度16
ConcurrentHashMap defultChm = new ConcurrentHashMap<>();
// hashMap 32; 1.7 ConcurrentHashMap 32;  1.8 ConcurrentHashMap 64;
ConcurrentHashMap initialChm = new ConcurrentHashMap<>(32);
// putAll传递的Map 默认长度16
ConcurrentHashMap mapToChm = new ConcurrentHashMap<>(new HashMap<>());
// 设置初始长度和负载因子
ConcurrentHashMap initLoadChm = new ConcurrentHashMap<>(32,0.75f);
// 设置初始长度、负载因子、正在扩容的线程数
ConcurrentHashMap allChm = new ConcurrentHashMap<>(32,0.75f,1);
代码语言:javascript复制
    public ConcurrentHashMap() {
    }

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity   (initialCapacity >>> 1)   1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0   (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

而在初始化时除了空参构造外其他构造方法都使用了一个非常重要的参数sizeCtl

代码语言:javascript复制
    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1   the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

    /**
     * 表初始化和调整大小控制。 当为负时,
     * 表正在初始化或调整大小:-1 表示初始化,
     * else -(1   活动调整线程的数量)。 否则,
     * 当 table 为空时,保存要使用的初始表大小
     * 创建,或默认为 0。 初始化后,保持
     * 调整表格大小的下一个元素计数值。
     */
    private transient volatile int sizeCtl;

sizeCtl有控制表初始化和调整大小的含义。       * sizeCtl为0,代表未初始化,且数组的初始容量为16。      * sizeCtl为正数,如果数组未初始化记录的是初始容量,如果数组已经初始化代表的扩容阈值      * sizeCtl为-1,表示正在初始化      * sizeCtl小于-1,表示数组正在扩容,-(1 n)表示此时有n个线程正在共同完成数组的扩容

PUT方法

ConcurrentHashMap的初始容量并不是在初始化的时候创建的,而是在第一次put的时候实现的

代码语言:javascript复制
// 如果tab对象为空进行初始化
if (tab == null || (n = tab.length) == 0)
    tab = initTable();

初始化后在put的时候如果元素节点为空,创建一个新元素赋值

代码语言:javascript复制
    // 如果tab对象为空进行初始化
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    // 如果获取的节点为空
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // cas校验,如果节点为空创建一个新元素赋值
        if (casTabAt(tab, i, null,
            new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin 添加到空empty时没有锁定
    }

如果节点不为空并且hash为扩容hash进行协助扩容不进行节点新增

代码语言:javascript复制
    // 如果tab对象为空进行初始化
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    // 如果获取的节点为空
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // cas校验,如果节点为空创建一个新元素赋值
        if (casTabAt(tab, i, null,
            new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin 添加到空empty时没有锁定
    }    
    // 多线程协助扩容,如果当前节点正在扩容
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);

else进行数据新增,这时使用了synchronized锁来保证线程安全,sync锁的只有单个节点对象,不锁整个数组,其他节点可以同步操作,这样效率大大增加

代码语言:javascript复制
    synchronized (f) {
        // 防止有元素做完操作后将tab变为了树,元素发生了变化,有可能不是原来的对象
        if (tabAt(tab, i) == f) {
            // 如果hash值大于0则代表是链表结构
            if (fh >= 0) {
                binCount = 1;
                for (Node<K,V> e = f;;   binCount) {
                    K ek;
                    // 如果hash值一样,并且key相同
                    if (e.hash == hash &&
                            ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                        // 获取旧元素的val
                        oldVal = e.val;
                        // 是否替换值,false是替换,true是不替换
                        if (!onlyIfAbsent)
                            e.val = value;
                        break;
                    }
                    Node<K,V> pred = e; // 存放当前引用,下个next为空时以便关联节点
                    if ((e = e.next) == null) {
                        // 新增节点到链表末尾
                        pred.next = new Node<K,V>(hash, key,
                                value, null);
                        break;
                    }
                }
            }
            // 如果是树结构
            else if (f instanceof TreeBin) {
                Node<K,V> p;
                binCount = 2;
                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                        value)) != null) {
                    oldVal = p.val;
                    if (!onlyIfAbsent)
                        p.val = value;
                }
            }
        }
    }

在新增完元素后需要判断链表长度判断是否转为树

代码语言:javascript复制
    if (binCount != 0) {
        // 如果链表长度大于等于8时不一定转树
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

最后一步增加集合长度和协助扩容

代码语言:javascript复制
    // 集合长度增加
    addCount(1L, binCount);

在线程对ConcurrentHashMap进行元素新增后要对元素个数进行维护

addCount方法分为两部分:

1. 维护集合长度 如果counterCells数组为空,再通过cas的方式修改BASECOUNT,如果修改成功则不需要对counterCells数组进行维护

2. 协助扩容

如果新增后的元素个数大于等于扩容阈值,并且table长度小于1073741824时进行协助扩容逻辑,然后判断当前扩容动作是否完成,当前线程有没有达到最大线程,如果符合直接break不参与扩容,否则通过cas的方式增加SIZECTL的值来同步增加的线程扩容数量,然后进行协助扩容操作

代码语言:javascript复制
    /**
     * Adds to count, and if table is too small and not already      添加计数,如果表太小且还没有
     * resizing, initiates transfer. If already resizing, helps      调整大小,启动传输。 如果已经调整大小,有帮助
     * perform transfer if work is available.  Rechecks occupancy    如果有工作,执行转移。 重新检查入住率
     * after a transfer to see if another resize is already needed   传输后查看是否已经需要再次调整大小
     * because resizings are lagging additions.                      因为调整大小是滞后的添加。
     *
     * @param x the count to add  要添加的计数
     * @param check if <0, don't check resize, if <= 1 only check if uncontended  如果 <0,不检查调整大小,如果 <= 1 只检查是否无竞争
     */
    private final void addCount(long x, int check) {
        /*
         维护集合长度
          */
        CounterCell[] as; long b, s;
        // counterCells是内部长度数组,默认是空
        // 如果当前baseCount与内存BASECOUNT相等对baseCount x
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b   x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 判断内部长度数组是否为空
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandomLab.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v   x))) {
                // 如果没有对BASECOUNT增加成功将会增加到counterCells内部长度数组中
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount(); // 获取baseCount和counterCells数组value值累加的总和
        }
        /*
         协助扩容
          */
        if (check >= 0) { // check在移除和替换时大于0
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n); // 获取标记
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs   1 ||
                            sc == rs   MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0) // 判断当前扩容动作有没有完成、当前线程有没有达到最大线程
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc   1)) // 协助扩容,并增加sc线程数量
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT)   2)) // (rs << RESIZE_STAMP_SHIFT)左移16位,SIZECTL变为负数,状态变更为扩容中
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

协助扩容:transfer方法

第一步:计算分配任务量

如果当前cpu核数是单个直接分配给全量长度,单线程不需要分配,如果核数>1则得到一个(1/4n) / 核数的值,如果得到的值小于16直接赋予16,最小执行任务量为16

第二步:如果新数组为空创建新数组

创建一个之前两倍大小的新Node数组

第三步:计算当前现场负责部分

同样使用cas进行更新要负责迁移数组的大小,例如之前数组64-16=48,更新为48-16=32

第四步:扩容是否完成逻辑

如果扩容完成finishing状态为True,nextTable置空,table引用nextTable,sizeCtl更新为下一次扩容阈值

如果扩容没有完成返回继续协助扩容

第五步:如果当前位置没有元素不进行迁移,在此位置增加fwd元素标识

第六步:如果当前节点已经是fwd节点不需再迁移

第七步:实际迁移动作,对当前对象进行synchronized加锁操作,防止被新增

代码语言:javascript复制
    /**
     * Moves and/or copies the nodes in each bin to new table. See ( 将每个 bin 中的节点移动和/或复制到新表。)
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        /*
        计算最小任务量
         */
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 判断cpu大于1的时候进行多线程协助
            stride = MIN_TRANSFER_STRIDE; // subdivide range ( 细分范围 ) 最小的单元为16
        /*
        创建新数组
         */
        if (nextTab == null) {            // initiating ( 开始 )
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 新建一个2倍容量的数组
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab; // nextTable记录新的数组
            transferIndex = n;  // 使用transferIndex记录当前旧数组的size
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 定义为fwd节点代表当前位置正在被迁移,并且hash也是MOVED
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                /*
                计算当前线程负责那一部分
                 */
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ? // 如果下一次的节点大于16进行分割
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i   n >= nextn) {
                int sc;
                /**
                 * 扩容完成
                 */
                if (finishing) {
                    nextTable = null;
                    table = nextTab; // table引用
                    sizeCtl = (n << 1) - (n >>> 1); // 设置sizeCtl (2n) - (1/2n) = 1.5n = 0.75 * 2n
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    /**
                     * 扩容没有完成
                     */
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null) // 如果当前位置没有元素无需迁移,增加fwd元素标识
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED) // 如果已经迁移过则不需再迁移
                advance = true; // already processed
            else {
                /**
                 * 迁移操作,加锁防止其他线程增加元素导致数据丢失
                 */
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i   n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                      lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                      hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i   n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

0 人点赞