​让我们来看看,多线程下的Map是如何实现线程安全的

2022-05-05 11:40:26 浏览数 (1)

多线程下的Map集合

前语

上一篇文章<<手撕HashMap>>是在大考周前写的有关HashMap的文章,在其开头开头提到过ConcurrentHashMap和HashTable,因为既然讲到了Map那么就绕不开,HashMap、HashTable、ConcurrentHashMap这三兄弟,先简单介绍一下这两个新朋友:HashTable是遗留类,ConcurrentHashMap类是有点高级的,说实话在写这篇文章前,这两个类我是没用过,不是说它不重要,只能说我层次还没到。在阅读本篇文章时,我强烈建议大家先去看看<<手撕HashMap>>

背景

HashMap在多线程环境下是不安全的,jdk1.7中是因为采用的是头插法,在多线程环境下两个线程同时扩容时会出现环链导致死循环;而jdk1.8中改用尾插法,避免了这个情况,但是其put操作在多线程环境下会发生覆盖,导致线程不安全。那在多线程环境下我们应该怎么做呢?

  • 使用HashTable类
  • 使用Collections.synchronizedMap(Map)创建线程安全的map
  • 使用ConcurrentHashMap类

那么接下来我们就来细聊一下这三种方式:

HashTable

HashTable是遗留类,遗留类是指在jdk的集合框架出现前,就已经存在了,并且所有遗留类都重新设计为支持JDK5中的通用类。HashTable的很多功能都与HashMap类似,那么通过查看源代码可以发现,说HashTable是线程安全的是因为HashTable使用了synchronized关键字来保证线程同步。

我们都知道使用synchronized属于独占式的悲观锁,加上的是重量级锁,当一个线程访问HashTable的同步方法时候,其它的线程只能是阻塞或轮询状态,所以HashTable的并发性是比较差的,效率比较低。翻阅源码还可以得到以下结论:

  1. HashTable的默认容量为11,HashMap为16,而且HashTable不要求底层数组容量一定为2的整数次幂,HashMap则要求一定为2的整数次幂!
  2. HashTable没有使用红黑树,无论多大,一直是采用尾插的链表
  3. HashTable扩容时容量为原来的2倍 1,而HashMap为原来的二倍
  4. HashTable直接通过对象的hashCode()方法来获得哈希值,而HashMap中是通过HashCode方法算出的Hash值,然后与其高十六位做异或运算,得到最终的哈希值。
  5. HashTable不允许key和value为NULL。

为什么HashTable的key-value不能为NULL?

在源码注释中有这样一句话:

大致意思是要从Hashtable成功存储和检索对象,用作键key的对象必须实现hashCode方法和equals方法,显然我们的NULL是不具备这两个方法的。

而value值不为空具体的表现为在其源码中有一个判空处理,为空抛出NullPointerException异常。

那么为什么要对value加上这样一个判空处理,来确保value值不为空呢? 我们现在假设允许key-value为null,那么我们使用map.get(key)时,如果返回的结果为null,那么对应的key到底是不存在,还是其value为null呢?在单线程的情况下我们当然可以通过调用map.containsKey(key)来确定key是否存在,而在多线程情况下,为了保证contains和get操作的原子性,显然这种做法在多线程的情况下我们是无法使用的。

Collections.synchronizedMap(Map)

HashTable由jdk1.1中引入,它那种直接采用Synchronized关键字来修饰方法的暴力操作,在很大程度上限制了其拓展性,因此在jdk1.2中引入了Collections.synchronizedMap(Map),这种相对较灵活的方式来保证Map的线程安全。那么让我们来看看它的源码:

通过翻阅源码我们可以看出,在我们调用Collections.synchronizedMap(Map)方法后,它将返回一个名为SynchronizedMap的内部类。

内部类SynchronizedMap类维护了一个final类型的Map对象,以及一个互斥锁mutex。SynchronizedMap类有两个构造方法,一个只是把Map作为参数,那么就将排斥锁mutex赋值为该Map。如果再传入一个mutex参数,那么将对象排斥锁赋值为传入的对象,也就是说通过该参数,对Map方法的锁定将仅在该Object(mutex)上进行,因此,其灵活性要优于Hashtable,可以特定的为某个对象上锁。

然后我们发现,该Map的相应方法已经使用Synchronized关键字修饰,上锁。所以它同HashTable相比其实只是在灵活性上更强了,而且是允许NULL的存在的!

ConcurrentHashMap

ConcurrentHashMap的出现就是为了提高Map的并发能力,而且同HashMap一样在DJK1.8中也对其做出来较大的改造,那么接下来就让我们从jdk1.7到jdk1.8,来聊系统的一下这个高级的ConcurrentHashMap类。

JDK1.7中的ConcurrentHashMap类

在JDK1.7中ConcurrentHashMap类采用的是分段锁的思想来实现并发操作的,其具体的数据结构是由一个Segment数组和多个HashEntry组成的。采用了分段锁技术,ConcurrentHashMap持有一组锁(segment),并将数据尽可能的分散在不同的锁断中(HashEntry),这样一来不同segment就不会相互竞争锁,从而提高并发效率,保证线程安全。

接下来让我们结合源码来看一下,特此说明下,因为我本机为jdk1.8,所以这里使用的是OpenJDK7。

通过源码我们可以看出每个Segment都继承ReentranLock(可重入锁)并单独加锁,因此每次进行加锁操作时锁住的就是一个Segment,这样我们只要保证每个Segment都是线程安全的,就能保证全局的线程安全。

ConcurrentHashMap的重要属性:

代码语言:javascript复制
//散列映射表的默认初始容量为 16。
 static final int DEFAULT_INITIAL_CAPACITY = 16;
 
 //散列映射表的默认装载因子为 0.75,用于表示segment中包含的HashEntry元素的个数与HashEntry[]数组长度的比值。当某个segment中包含的HashEntry元素的个数超过了HashEntry[]数组的长度与装载因子的乘积时,将触发扩容操作。
 static final float DEFAULT_LOAD_FACTOR = 0.75f;
 
 //散列表的默认并发级别为 16。该值表示segment[]数组的长度,也就是锁的总数。可以在构造函数中更改
 static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 
 //散列表的最大容量
 static final int MAXIMUM_CAPACITY = 1 << 30;
 
 //segment中HashEntry[]数组最小长度
 static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
 
 //散列表的最大段数,也就是segment[]数组的最大长度
 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
 
 //在执行size()和containsValue(value)操作时,ConcurrentHashMap的做法是先尝试 RETRIES_BEFORE_LOCK 次( 即,2次 )通过不锁住segment的方式来统计、查询各个segment,如果2次执行过程中,容器的modCount发生了变化,则再采用加锁的方式来操作所有的segment
 static final int RETRIES_BEFORE_LOCK = 2;
 
 //segmentMask用于定位segment在segment[]数组中的位置。segmentMask是与运算的掩码,等于segment[]数组size减1,掩码的二进制各个位的值都是1( 因为,数组长度总是2^N )。
 final int segmentMask;
 
 //segmentShift用于决定H(key)参与segmentMask与运算的位数(高位),这里指在从segment[]数组定位segment:通过key的哈希结果的高位与segmentMask进行与运算哈希的结果。(详见下面举例)
 final int segmentShift;
 
 //Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。
 final ConcurrentHashMap.Segment<K, V>[] segments;

接下来让我们来看一下ConcurrentHashMap的put操作:

从上图源码中我们可以看出ConcurrentHashMap插入一个元素的过程需要进行两次Hash操作。第一次也就是上面的代码Hash定位到Segment,第二次如下所示Hash定位到元素所在的链表的具体的位置。

代码语言:javascript复制
/*****************put方法****************/
final V put(K key, int hash, V value, boolean onlyIfAbsent) { 
  //尝试获取锁,失败则表明已被其它线程锁住,调用scanAndLockForPut()方法,自旋获取锁。
  HashEntry<K,V> node = tryLock() ? null :
      scanAndLockForPut(key, hash, value);
  V oldValue;
  try {
      HashEntry<K,V>[] tab = table;
      int index = (tab.length - 1) & hash;
      HashEntry<K,V> first = entryAt(tab, index);
      //遍历
      for (HashEntry<K,V> e = first;;) {
          if (e != null) {
              K k;
              if ((k = e.key) == key ||
                  (e.hash == hash && key.equals(k))) {
                  oldValue = e.value;
                  if (!onlyIfAbsent) {
                      e.value = value;
                        modCount;
                  }
                  break;
              }
              e = e.next;
          }
          else {
              if (node != null)
                  node.setNext(first);
              else
                  node = new HashEntry<K,V>(hash, key, value, first);
              int c = count   1;
              if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                  rehash(node);
              else
                  setEntryAt(tab, index, node);
                modCount;
              count = c;
              oldValue = null;
              break;
          }
      }
  } finally {
      //释放锁 
      unlock();
  }
  return oldValue;
}
/************scanAndLockForPut方法**************/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  HashEntry<K,V> first = entryForHash(this, hash);
  HashEntry<K,V> e = first;
  HashEntry<K,V> node = null;
  int retries = -1; // negative while locating node
  while (!tryLock()) {
      HashEntry<K,V> f; // to recheck first below
      if (retries < 0) {
          if (e == null) {
              if (node == null) // speculatively create node
                  node = new HashEntry<K,V>(hash, key, value, null);
              retries = 0;
          }
          else if (key.equals(e.key))
              retries = 0;
          else
              e = e.next;
      }
      else if (  retries > MAX_SCAN_RETRIES) {
          lock();
          break;
      }
      else if ((retries & 1) == 0 &&
               (f = entryForHash(this, hash)) != first) {
          e = first = f; // re-traverse if entry changed
          retries = -1;
      }
  }
  return node;
}


从上面的源码我们可以看出,jdk1.7中是如何保证segment中的线程安全的:首次进入该方法时先尝试获取该segment的锁,若获取失败,则调用 scanAndLockForPut(key, hash, value)方法来尝试自旋获取锁,如果自旋的次数达到了MAX_SCAN_RETRIES(最大自旋次数),就会变成阻塞获取,确保可以成功获取锁;

接下来让我们来看一下ConcurrentHashMap的get操作:

我们可以得出,get()方法不用加锁,而且ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作。第一次Hash定位到Segment,第二次Hash定位到元素所在的具体位置上。

总结:

这种数组 链表的结构,在写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作,可以在很大程度上提高并发能力,但是所带来的副作用就是在查询数据的时候需要遍历链表,导致Hash过程较长,效率低下。

JDK1.8中的ConcurrentHashMap类

在JDK1.8中舍弃了Segment的概念,直接用Node数组 链表 红黑树的数据结构来实现,了解过JDK1.8中HashMap结构的同学,应该可以看出在JDK1.8中 ConcurrentHashMap结构基本上和HashMap一样,而且它们确实是有着有很多相同之处:

  1. 数组的默认容量是16,最大容量是1<<30
  2. 当添加元素的时候,将列表转成树的阈值是8
  3. 在对数组扩容的时候,当树中元素个数小于或等于6时,将树转成链表

从上图中我们可以看出JDK1.8使用Node来代替HashEntry。

而且我们从源码中也可以看出,其value值和next使用volatile修饰,保证了内存可见性,以及禁止指令重排。

非阻塞算法CAS

CAS(Compare And Swap):CAS算法包含三个参数CAS(V, E, N),V表示要更新的内存变量,E表示期望值(期望内存中的值,也就是原来的值),N表示新值,判断预期值E和内存旧值V是否相同(Compare),如果相等用新值N覆盖旧值V(Swap),否则失败,表示已经有其它线程做了更新,当前线程被告知失败(不是阻塞);

jdk1.8中实现分段锁的原理

首先ConcurrentHashMap中Node数组被volatile数组修饰,保证了内存可见性,以及禁止指令重排。

代码语言:javascript复制
transient volatile Node<K,V>[] table;

再使用CAS Synchronized结合来实现赋值的操作,用以确保当前线程操作只锁住该线程操作的节点所在的链表或红黑树,来保证线程安全。让我们通过put源码来具体了解这句话的含义!

代码语言:javascript复制
public V put(K key, V value) {
    //简简单单一个跳转
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
//真正的put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
            break;                   
    }
    // 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table。
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
    else {
        V oldVal = null;
        // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
        synchronized (f) {
            if (tabAt(tab, i) == f) {
                if (fh >= 0) {
                    binCount = 1;
                    for (Node<K,V> e = f;;   binCount) {
                        K ek;
                        // 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        // 如果没有找到值为key的节点,直接新建Node并加入链表即可。
                        Node<K,V> pred = e;
                        if ((e = e.next) == null) {
                            pred.next = new Node<K,V>(hash, key,
                                                      value, null);
                            break;
                        }
                    }
                }
                // 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                   value)) != null) {
                        oldVal = p.val;
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                }
            }
        }
        if (binCount != 0) {
            // 如果节点数>=8,那么转换链表结构为红黑树结构。
            if (binCount >= TREEIFY_THRESHOLD)
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

put操作的具体流程:

  1. 先计算hash值;
  2. 若首次插入,则初始化;
  3. 查找并尝试插入,如果为空,则利用CAS算法尝试写入;
  4. 如果(fh = f.hash) == MOVED,则表示需要扩容;
  5. 以上情况均不满足,也表明该位置有值,则利用Synchronized锁住这个位置的值,然后插入数据,如果长度大于TREEIFY_THRESHOLD则树化,即转为红黑树。(这里使用的Synchronized以及经过优化,是升级过后的锁,其性能得到了提升)

总结:

  1. 数据结构:取消了Segment分段锁的数据结构,取而代之的是数组 链表 红黑树的结构,提高了遍历的效率,从遍历链表的O(n),到遍历红黑树的O(logN)。
  2. 保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS Synchronized保证线程安全。
  3. 锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。

终于放假了,可以好好学习了!寒假期间,可以保证稳定更新!一起快乐的学Java吧!

0 人点赞