map是什么
❝in computer science, an associative array, map, symbol table, or dictionary is an abstract data type composed of a collection of (key, value) pairs, such that each possible key appears at most once in the collection. ❞
上面引用的是维基百科对map的定义,意思是说,在计算机学科中,map是一种抽象的数据结构,它由key和value组成组成键值对的集合,在集合中每个key最多出现一次。像关联数组、符号表、字典数据结构都是map的一种具体实现 map数据结构在实际的项目使用的非常频繁,很多语言都提供了mpa数据结构,像Java语言的HashMap,Go语言中的map和sync.Map数据类型。map基本操作包含添加key和value键值对,获取key对应的value, 删除key,遍历操作。
map实现方法
map的实现主要有两种方法:hash table(哈希表)和search tree(搜索树)
哈希表
哈希表背后的基本思想是通过索引访问数组元素是一个简单的、恒定时间的操作,对于给定的key通过哈希函数计算,得到一个数值,然后将该数值映射到数组下表,value就存储在这个位置。因此,哈希表操作的平均开销只是计算键的哈希值,并结合访问数组中的相应桶。因此,哈希表通常在 O(1) 时间内完成操作,并且在大多数情况下优于其他方案。哈希表的两个关键要素是哈希函数和哈希冲突的处理,理解了这两个元素,基本搞懂了哈希表。下面分别讲讲哈希函数和冲突处理。哈希函数是可以用于将任意大小的数据映射到固定大小值的函数,通俗解释是对一串数据data1进行杂糅,输出另一段固定长度的数据data2,作为这段数据的特征,常见的哈希函数有MD5、SHA。
那这个hash函数有什么特别的呢?是不是任意一个函数都可以作为hash函数,显然不是的。hash函数选择对于Map的性能启动关键性的作用,一个好的hash函数,应该具有以下5性:
- 高效性,hash函数计算复杂度直接影响到哈希表的性能,当然是效率越高越好
- 均匀性,一个好的哈希函数应该在其输出范围内尽可能均匀地映射,也就是说,应以大致相同的概率生成输出范围内的每个哈希值,具有很好的随机性
- 不可逆性,hash函数是单向的,只能从输入值计算得到一个输出值,但给定输出值不能逆向还原出输入值
- 雪崩性,微小的输入值变化也会让输出值发生巨大的变化,即对于两个相似度很高的输入数据,得到的数据值差异很大
- 确定性,hash函数计算出的值必须是确定性的,对于给定的输入值,它必须始终生成相同的哈希值
哈希冲突,根据前面哈希函数的定义,是将任意大小的输入数据映射到固定大小的输出,也就是说输入是无限的,输出是有限的,从无限到有限的集合映射很难找到一个一一映射,很可能存在着多个输入对应到一个输出的情况,即多个key映射到同一个value,这就是哈希冲突,既然哈希冲突不能避免,我们需要处理哈希冲突的方法。常用的哈希冲突解决方法有链地址法和开放寻址法两种。
- 链地址方法,链地址方法是使用一个链表数组来存储相应的数据,当hash遇到冲突的时候依次在链表末尾添加数据
- 开放寻址法,用大小为M的数组保存N个键值对(满足M>N),当出现哈希冲突时通过数组中空闲位置保存数据。根据空闲位置的选取方法不同,开放寻址法有多种不同的具体实现。常见的实现方法有线性探测法、二次探测法、和双重散列法。这里以线性探测法为例说下开放寻址法具体是怎么做的,线性探测法数学公式为 h(k,i)=(h(k,0) i)mod m。公式中i表示当前进行的是第几轮探测,正常情况下k落在h(k,0)mod m的位置,当前该位置已有已被占据时,进行第一轮探测,这时i=1, 即探测的是h(k,0)的下一个位置。如果h(k,0)下一个位置也被占据时,即i=2探测h(k,0)的下下一个位置,直到找到一个没有被占据的位置。
下面通过一个实际的例子,说明链地址方法和开放寻址法的一些细节。需要装载的数据为一组整数数据[21,2,1,56,6,40],哈希函数f(k)=k%7。通过链地址创建哈希的过程如下
通过线性探测法创建哈希的过程如下
上面介绍了哈希表中两个关键点哈希函数和哈希冲突解决方法,还有一个概念需要我们理解,此概念是装填因子。装填因子是表示哈希表中元素的填满程度。它的计算公式:装载因子=填入哈希表中的元素个数/哈希表的长度。装载因子越大,填入的元素越多,空间利用率就越高,但发生哈希冲突的几率就变大。反之,装载因子越小,填入的元素越少,冲突发生的几率减小,但空间浪费也会变得更多,而且还会提高扩容操作的次数。装载因子也是决定哈希表是否进行扩容的关键指标,在Go中,当装填因子达到6.5时就会触发扩容操作。
对比链地址法和开放寻址法,两种方法各有优缺点。
实现方法 | 优点 | 缺点 |
---|---|---|
链地址法 | 内存利用率较链表法更高,链表节点可以灵活创建,不像开放寻址是固定的,对大装填因子的容忍度更高 | 当节点规模比较小时,相比开放地址法比较费空间 |
开放寻址法 | 实现结构比较简单,只用一个数组结构存储数据,对CPU缓存友好,易于序列化 | 内存的利用率没有链地址法高,出现冲突时处理代价比较高,装填因子不能>1 |
实际使用的时候要根据具体的业务场景进行选择,当数据量已知,装填因子小的的时候,适合采用开放寻址法,当存储的数据量很大,大小不明确时,采用链地址法更灵活,冲突处理效率高。
搜索树
搜索树也是一种map的实现方式,无论是用链地址法还是开放寻址法实现的mpa,本质都是使用的线性数据结构实现的map。用搜索树实现map本质是用树形结构实现map.根据采用的树形结构不同,利用搜索树实现map也有多种实现方法,用的比较多的有二叉搜索树和平衡搜索二叉树。像Java中提供的TreeMap结构,就是用搜索树实现的map.
- 使用搜索树,就不会使用哈希函数了,避免了哈希的开销和选择哈希函数的困难。
- 核心操作的运行时间与log(N)成正比,并不像常数时间那么好,但仍然非常好
下面以二叉搜索树实现map为例,采用图形化方式解释以二叉搜索树实现的mpa是如何工作的。添加一组数据到map中{<21,"hello">,<2,"mingyong">,<1,"smile">,<56,"cheap">,<40,"shop">}
Go中map实现原理
Go语言中map是使用哈希表方式实现的,解决哈希冲突采用的是链地址法。整个结构由桶和链表组成,下面是它的整体结构图。先对map有个整体印象,后面会结合map的操作代码实现进行说明。桶指的是下图中buckets指向的[]bmap数组。bmap的overflow串联成链表的结构
下面会先讲map的结构定义,然后从创建、查找、赋值、遍历、删除和扩容6个方面分别详细讲述实现细节。注意下面分析Go源码是1.14版本,涉及到的文件有 src/runtim/map.go, src/cmd/compile/internal/gc/reflect.go,
map数据结构
hmap是map的结构体头,它维护了一个map对象的所有信息。申明一个map对象,var m map[int]int, 此时的m是一个nil指针。当通过m=make(map[int]int)初始化之后,m是指向一个hmap对象的指针。也就是说此时的m类型是*hmap.hmap结构体中最重要的一个字段是buckets,它指向bucket数组,此外还有记录map中已存放数据个数的count,描述桶的数量即bucket数组大小的B.
代码语言:javascript复制// map的结构体头信息
type hmap struct {
// 记录当前map中k-v键值对的数量,即map当前有多少元素
count int
// 标记状态map操作过程中的状态用的,后面会有介绍
flags uint8
// B决定哈希桶的数量,桶的数量为2^B个
B uint8
// 溢出桶的大概数量
noverflow uint16
// 哈希种子,计算key的哈希值时会用到
hash0 uint32
// 指向桶的数组,桶的个数为2^B个,每个桶中的元素是bmap,如果map中没有数据,buckets可能为nil
buckets unsafe.Pointer
// 指向旧桶的指针,map在扩容的时候会用到
oldbuckets unsafe.Pointer
// 标识扩容进度,小于此地址的buckets代表已搬迁完成,在后面的扩容讲解中会有说明
nevacuate uintptr
// extra是为了降低GC扫描而设计的。当key和value均不包含指针,并且都可以inline时使用,桶的数据是存在extra.overflow中的
extra *mapextra
}
type mapextra struct {
// 对于key和value中的值都不包含指针,并且可以内联(不大于128字节)的情况下,用hmap的extra字段来存储overflow buckets
// 可以避免GC扫描整个map. 然而bmap中overflow字段是个指针,为了保证overflow指向桶的生命周期,将溢出桶保存在extra.overflow
// 切片中,为了保证oldbucket中的桶溢出桶生命周期,将其保存在oldoverflow切片中
overflow *[]*bmap
oldoverflow *[]*bmap
// 指向预分配的溢出桶的位置
nextOverflow *bmap
}
申明一个map对象,var m map[int]int, 此时的m是一个nil指针。当通过m=make(map[int]int)初始化之后,m是指向一个hmap对象的指针。也就是说此时的m类型是*hmap. hmap结构体中最重要的一个字段是buckets,它指向bucket数组,此外还有记录map中已存放数据个数的count,描述桶的数量即bucket数组大小的B.
下面着重看bucket(桶)结构,定义在bmap结构中,包含的字段如下
代码语言:javascript复制type bmap struct {
// tophash是一个[8]uint8数组,存储该桶中每个键的哈希值的最高8位信息,因为该数组大小为8
// 所以也就是说每个桶能够存储的键数量有8个,注意该结构在编译期间会动态添加字段。添加字段存储
// 有8个key和8个value,还有一个*overflow字段。
tophash [bucketCnt]uint8
}
需要注意的是,上面看到的bmap只是静态结构,在编译期间会动态添加存储8个key和8个value,以及一个*overflow信息。动态添加处理的代码在src/cmd/compile/internal/gc/reflect.go中的bmap,下面附上的是添加的关键代码,省略了不其他内容。
代码语言:javascript复制// 动态创建bmap
func bmap(t *types.Type) *types.Type {
...
field := make([]*types.Field, 0, 5)
// The first field is: uint8 topbits[BUCKETSIZE].
// 第一个字段是[8]uint8
arr := types.NewArray(types.Types[TUINT8], BUCKETSIZE)
field = append(field, makefield("topbits", arr))
// 添加8个key
arr = types.NewArray(keytype, BUCKETSIZE)
arr.SetNoalg(true)
keys := makefield("keys", arr)
field = append(field, keys)
// 添加8个value
arr = types.NewArray(elemtype, BUCKETSIZE)
arr.SetNoalg(true)
elems := makefield("elems", arr)
field = append(field, elems)
...
return bucket
}
综上,可以得到bucket(桶)的内存结构,这里画了一张bucket的结构图,方便读者理解,图中每个tophash称之为一个槽位,一共有8个槽位。每个槽位对应了一个存放key和value的位置,每个tophash、key和value占用的内存空间都是固定的,所以根据tophash结合偏移量很容易得到key和value的位置。
如上图,并不是按照一个key一个value形式存储的,而是先存储8个key,然后在存储8个value,作者解释了这样存储的好处是是能让我们消除例如map[int64]int所需要的填充(padding)。最后一个字段overflow指针,该字段用于指向下一个桶的位置,因为一个桶中最多只能装8个键值对,如果有多余的键值对落到了当前桶,那么就需要再构建一个溢出桶,通过overflow指针串联起来。
map创建
map只能通过make进行初始化,在初始化的时候可以指定大小也可以不指定大小。
代码语言:javascript复制// 未指定初始化大小,实际上创建的大小为0
m:=make(map[int]int)
// 指定创建存储8个数据大小的map
m=make(map[int]int,8)
对于初始化小于等于8个数据的map时,会调用makemap_small函数。大于8个的时候,会调用makemap函数。
代码语言:javascript复制// makemap_small用于创建小数据量的map,当不指定初始化大小(即大小为0)或者当前初始化大小
// 小于等于8(bucketCnt)的时候,会调用次函数,并直接在堆上分配内存
func makemap_small() *hmap {
h := new(hmap)
h.hash0 = fastrand()
return h
}
创建map大小大于8时,会调用makemap函数,编译器会自动分析要创建的map或map中的bucket是否能够放在栈上。当桶的数量不小于16的时候,会产生2^(b-4)个溢出桶,采用了预分配的方法,提前多分配几个桶,保存在hmap.extra.nextOverflow中。
代码语言:javascript复制// makemap是通过make(map[key]value,9)创建一个map的实现
// 编译器会自动分析要创建的map或map中的bucket是否能够放在栈上,
// 如果不能放在栈上,会创建在堆上,具体什么情况下会放在栈上,下面
// 的解析中有说
// 在创建的时候,如果map结构头h已经非空,直接使用不会新建,同样
// 对于h中指向的bucket地址已存在,不会重新分配
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 计算创建map要分配的内存
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// 如果需要分配的内存超过最大可分配的内存大小或是内存过大数字溢出了,
// 强行设置hint为0,即初始化创建有0个元素的map
if overflow || mem > maxAlloc {
hint = 0
}
// 如果h为空,新分配map结构头h内存
if h == nil {
h = new(hmap)
}
// 产生一个随机数据,作为哈希计算时候的种子
h.hash0 = fastrand()
B := uint8(0)
// 根据传入的map的大小hint,找到一个合适的B值,这个B值是满足装填因子6.5*2^B不小于
// hint的一个最小值
// 如果创建map大小不大于8,则B值为0
for overLoadFactor(hint, B) {
B
}
h.B = B
// B值不为0,根据B值创建2^B个桶,也就是B值为4,会创建16个桶
// 对于B值为0的情况,这里不做处理,会延迟在mapassign即向map添加元素的时候分配
// 如果hint是一个很大的值,即B也会很大,清理这片内存会花费比较长的时间
if h.B != 0 {
var nextOverflow *bmap
// 创建桶,分配相应的内存
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
// 如果有溢出桶产生,将溢出桶的位置保存在h.extra.nextOverflow中
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// 计算桶的数量=2^b
base := bucketShift(b)
nbuckets := base
// 对于b很小的情况,桶的数量也会很少,使用到溢出桶的概率会很小,就不
// 分配溢出桶,也就不用做一些产生溢出桶相关的计算
if b >= 4 {
// 当b大于等于4,即桶的数量不小于16的时候,会产生2^(b-4)个溢出桶
nbuckets = bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
// 新创建nbuckets个桶
buckets = newarray(t.bucket, int(nbuckets))
} else {
// dirtyalloc已经有值,之前已经有分配过bucket,将之前的桶中内容清理掉,
// 继续复用之前已经分配的桶
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
// 清理掉有指针的buckets的数据
memclrHasPointers(buckets, size)
} else {
// 清理掉没有堆上指针的buckets的数据
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
// 在b大于等于4的时候,会预先分配一些溢出桶
// 为了将跟踪这些溢出桶的开销保持在最低限度,使用了下面的约定:
// 如果预先分配的桶的overflow字段值是nil的,通过碰撞指针可以获得更多的桶
// 对于最后一个预先分配的溢出桶last, 需要将其overflow指向一个非nil值,这里实际
// 指向的是第一个桶buckets的位置
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
读到这里,读者可能会有一个疑问,创建map的时候哈希函数是怎么选择的呢?其实哈希函数的选择是在运行的时候初始化的,不用我们关心。具体初始的逻辑在src/runtime/proc.go文件的schedinit函数中。我们先来看下schedinit函数。schedinit函数中的alginit函数就是处理哈希函数选取的。根据当前程序编译的目标架构判断是否支持aes,如果支持就选取ase hash,具体实现是用汇编写的,在runtime下的asm_{386,amd64,arm64}.s中。若不支持,采用启发时产生,实现在runtime下的hash32.go和hash64.go中。
代码语言:javascript复制func schedinit() {
...
stackinit()
mallocinit()
fastrandinit() // must run before mcommoninit
mcommoninit(_g_.m)
cpuinit() // must run before alginit
// 在使用map的时候,alginit函数必须已经被调用
alginit() // maps must not be used before this call
modulesinit() // provides activeModules
...
}
func alginit() {
// Install AES hash algorithms if the instructions needed are present.
if (GOARCH == "386" || GOARCH == "amd64") &&
cpu.X86.HasAES && // AESENC
cpu.X86.HasSSSE3 && // PSHUFB
cpu.X86.HasSSE41 { // PINSR{D,Q}
initAlgAES()
return
}
if GOARCH == "arm64" && cpu.ARM64.HasAES {
initAlgAES()
return
}
getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
hashkey[0] |= 1 // make sure these numbers are odd
hashkey[1] |= 1
hashkey[2] |= 1
hashkey[3] |= 1
}
maptype(runtime下的type.go文件中)结构体定义的是map的type类型,里面有个hasher字段,该字段会存储上面选择的哈希函数。hasher函数接收2个入参,第一个参数就是指向key的地址,第二个参数是h.hash0,h.hash0是通过fastrand()得到的。调用haser函数之后,就得到了key对应的哈希值。
map查找
map查找就是获取map中key对应的value.有两种获取方法。分别是
代码语言:javascript复制// 只有一个返回值,如果key在map中存在,就返回vlaue的值。如果key不在map中,返回的value是一个零值,是一个无效的值。哪怎么知道value到底是有效的还是无效的,于是就有了下面的这种带有2个返回值的调用
value:=m[key]
// 第二返回值表示key在map中是否存在,如果存在,返回true,否则返回false.
value,ok:=m[key]
对应到实现来说,就是mapaccess1和mapaccess2函数。在分析查找实现前,我们先来看一个key的定位问题,就是给一个key怎么确定它在哪个桶的哪位槽位。理解了这个,下面的mapaccess1就很好理解了。假设hmap中的B为5,即map的桶有2^5=32个,对添加的key通过哈希函数计算后得到的值为64bit数据。现在有一个键值对{k,v}被添加到map中,对k计算出来的哈希值为hash(k),现在如果是你设计一个方法将hash(k)放在哪个桶,你会采用什么方法?对hash(k)取模就可以,非常棒。官方作者就是这么实现的,落在桶的公式为hash(k)2=hash(k)%(2^B), 就是将对key计算出来的哈希值模与桶的数量,得到的余数肯定是在[0,2^B)中。实现的时候,作者并没有采用%运算,而是更巧妙的对二进制进行&运算。bucketMask计算的是通的掩码,它的返回值是2^B-1, 对 hash(k) &bucketMask实际就是hash(k)%(2^B). 那落在桶内的哪个槽位是怎么确定的呢?因为桶内的槽位有8个,取哈希值的最高8bit对应的值放在tophash中,8bit的最大值为255,tophash是uint8类型是完全可用放入的。具体放在哪个槽位,是根据从小到大顺序。先放到槽位0即tophash0的位置,如果该位置已放有数据,就继续看一下个位置tophash1,如果tophahs0到tophash7都有数据了,那说明当前桶已经满了,继续看它的溢出桶即overflow指向的桶。这里注意一点,怎么判断一个槽位是否被占用。采用的方法是判断tophash值,tophash值的0到4是状态标志。最小有效的值为高8bit的值 5.具体状态标志见下面的tophash值特殊含义。
上图中B为5,一共有34个桶,其中32和33号是溢出桶,常规桶是32个(0至31号),key经过哈希函数计算之后得到的64bit数据为0010110000001010100000111010101101010011100001110101010101000111,根据上述规则,最后5个bit位00111,对应的十进制为7,所以此key落在第8个桶(编号为7的桶),然后在根据最高8位是00101100,对应的十进制是44,在桶7中从左向右找到一个未被占用的槽位,找到了第5个槽位(绿色标注的)未被占用,然后将44放在第5桶的hashtop中,最后将原本的key和value放到第5key槽位和第5个value槽位。
代码语言:javascript复制// bucketMask桶的掩码,值为2^b-1
func bucketMask(b uint8) uintptr {
return bucketShift(b) - 1
}
代码语言:javascript复制// mapaccess1 返回map中key对应的value,如果key在map中不存在,不会返回nil值,而是返回
// value类型的零值,需要注意的是返回指针可能导致map长时间存活,所以不能长时间持有返回的指针
// 对应到应用层的使用方式是 value:=m[key]
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := funcPC(mapaccess1)
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
// h为nil表示map还没初始化,h.count为0说明map中还没有元素
// 这里两种情况下,直接返回value类型的零值,并不是返回nil
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
// map正在扩容,即map已经有写操作,现在又在进行读操作,不被允许,panic
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
// 计算key的哈希值
hash := t.hasher(key, uintptr(h.hash0))
// 计算桶的掩码,假如B为5,那桶的数量=2^B=2^5=32个,编号从0到31
// bucketMask计算出来值为32-1,即5个1
m := bucketMask(h.B)
// 所有的桶是一个数组,根据数组第一个元素即第一个桶h.buckets的位置
// hash&m计算出了key所在桶数组的下标,桶的大小t.bucketsize
// 根据第一个桶的位置,加上桶的偏移量(下标)*桶的大小,可以定位到key所在桶
// 的地址
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
if c := h.oldbuckets; c != nil {
// 判断map是否正常扩容,如果没有正在扩容,将m扩大一倍
if !h.sameSizeGrow() {
// m变为原来的一半,新桶backet是旧桶oldbuckets的两倍大
m >>= 1
}
// 获取到key在旧桶中的地址
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i {
// key的哈希值高位不匹配
if b.tophash[i] != top {
// 当前索引位置已经没有数据,所以不用继续循环查找更大索引位置,也不用再overflow中继续查找了
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// 根据基地址b和偏移量计算出key所在的位置
k := add(unsafe.Pointer(b), dataOffset i*uintptr(t.keysize))
// 如果key是间接引用,获取它间接引用的值
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 比较用户传入的key和map存储的k是否相同,如果相等,获取value的值并返回
if t.key.equal(key, k) {
// 根据基地址的位置 8个key占用的空间大小 当前value的偏移量计算出value所在的位置
e := add(unsafe.Pointer(b), dataOffset bucketCnt*uintptr(t.keysize) i*uintptr(t.elemsize))
// 如果value元素是间接引用,获取它间接引用的值
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
// 返回value
return e
}
}
}
// key不在map中,返回value类型的零值,并不是返回nil
return unsafe.Pointer(&zeroVal[0])
}
// mapaccess2 也是查询key是否在map中,与mapaccess1不同的是有两个返回值,第二个返回值
// 表示key在不在map中,对应到应用使用方式是 value,ok:=m[key]
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
...
// mapaccess2处理逻辑除多了一个bool类型的返回值,其他与mapaccess1一模一样,这里省略了
return unsafe.Pointer(&zeroVal[0]), false
}
tophash值的特殊值含义
代码语言:javascript复制// emptyRest 当前的索引位置是空的,比当前位置还大的索引位置没有数据了,在溢出桶overflows中的bucket也没有数据了
emptyRest = 0
// 表示槽位是空的,即还没有放数据
emptyOne = 1
// 表示槽位正在扩容,数据会被搬迁到新桶的前半部分,前后半部分是指新桶是旧的2倍大小,新桶的前一半为前半部分
evacuatedX = 2
// 表示槽位正在扩容,数据会被搬迁到新桶的后半部分
evacuatedY = 3
// 表示该槽位已经搬迁完毕
evacuatedEmpty = 4
// tophash的最小值
minTopHash = 5
上面这张图描述了mapaccess的处理流程,结合前面key的定位过程,就非常容易理解,整个大体流程就是找到key所在的桶之后,在外层循环访问桶,内层循环访问桶中的槽位,先判断tophash值是否相等,不相等肯定不是同一个key,相等之后要进一步取出key进行完全比较,只有全相等才算找到了。如果当前桶没有找到,继续在当前桶指向的溢出桶中继续查找。
map赋值
赋值操作是在mapassign中处理的,当添加的键值对中的key存在时,更新value的值,如果不存在,找到key落在的桶的合适槽位,将其key和value的值保存起来。
代码语言:javascript复制// mapassign 向map中添加元素
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// h为nil,即map还未初始化,没有执行make操作,向里面添加数据,直接产生panic
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
// 检查data race
if raceenabled {
callerpc := getcallerpc()
pc := funcPC(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
// map 正在扩容,抛出异常
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
// 计算key的哈希值
hash := t.hasher(key, uintptr(h.hash0))
// 设置map正在写入标记,从这里开始,其他地方不能再对map进行读取和写入操作了
// 设置标记这一步操作要放在计算hash之后,因为t.hasher可能产生panic.但是
// map还未开始真正写入
h.flags ^= hashWriting
// 桶buckets还未初始化,调用newobject先初始化buckets,桶的大小为1
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
again:
// 计算key落在哪个桶
bucket := hash & bucketMask(h.B)
// 检查map是否正在扩容,判断依据是oldbuckets是否不为nil
if h.growing() {
growWork(t, h, bucket)
}
// 计算出bucket桶所在的内存地址,计算法方法=基地址(h.buckets) 落在哪个桶(bucket)*桶的大小(t.bucketsize)
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) bucket*uintptr(t.bucketsize)))
// 提取出hash最高位8个bit对应的值,如果值小于5会被设置为基础值5
top := tophash(hash)
var inserti *uint8
var insertk unsafe.Pointer
var elem unsafe.Pointer
bucketloop:
for {
// 在桶内按照槽位cell从0到7顺序查找是否有key存在,每个桶装有8个k-v数据
for i := uintptr(0); i < bucketCnt; i {
// hash出来的top值不相同,进一步判断b.tophash[i]是不是空的
if b.tophash[i] != top {
// 判断当前的第i个槽位是不是空的,如果是空的,很开心终于找到一个可以插入的位置
// inserti、insertk和elem是一组赋值变量记录插入元素的槽位地址、插入的key所在的地址、插入的value所在的地址
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset bucketCnt*uintptr(t.keysize) i*uintptr(t.elemsize))
}
// 槽位值0和1都表示是空操作,这里进一步判断槽位值是不是0,如果是0表示比当前还大的槽位k-v已经被重置了,
// 就不用继续循环检查后面槽位是否存在相同的key了,因为肯定是不存在的
if b.tophash[i] == emptyRest {
// 后面槽位中肯定不存在key值了,直接跳出循环
break bucketloop
}
// 这里continue,就是还要继续走循环处理逻辑,为啥还要继续呢?因为当前的槽位key不相同,并不代表后面槽位中
// 不存在该key了。因为正常来说向map中添加数据,肯定是从槽位0-7这样的顺序添加的,即
// h[0],h[1],h[2],empty,empty,empty,empty,empty (empty表示当前槽位还未有数据)
// 但随着对map的删除,会出现空洞,即可能不像上面这样,没有槽位是连续的并集中在右边,出现的是下面的情况
// h[0],empty,h[2],empty,empty,empty,empty,empty
continue
}
// 走到这里,说明key计算出的哈希值值的高8bit值和b.tophash[i]相同
k := add(unsafe.Pointer(b), dataOffset i*uintptr(t.keysize))
// 如果直接存储的不是key,进行一级寻址获取里面的内容
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 真正判断key是否相同,前面只是判断了高8bit相同,并不一定key就是相同的
if !t.key.equal(key, k) {
continue
}
// already have a mapping for key. Update it.
// 走到这里表示map中key值已经存在了,如果有必要更新,就更新key值
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
// elem保存key对应value的地址
elem = add(unsafe.Pointer(b), dataOffset bucketCnt*uintptr(t.keysize) i*uintptr(t.elemsize))
// key已经存在了,不用继续循环查找了,直接跳到done的地方执行后面的处理
goto done
}
// 当前的桶中8个槽位没有找到,继续在溢出桶中查找
ovf := b.overflow(t)
// 如果没有溢出桶,跳出循环,key在当前的map中不存在的
if ovf == nil {
break
}
b = ovf
}
// 走到这里,说明在当前桶和它的溢出桶中都没有找到合适的槽位来保存key和value
// 检查map是否需要扩容,扩容的触发条件有overLoadFactor和tooManyOverflowBuckets
// 满足任何一个都会触发,overLoadFactor检查装填因子是否大于6.5,大于6.5会触发扩容
// tooManyOverflowBuckets检查map中溢出桶的数量是否过多,过多也会触发扩容,
// 详细分析见map扩容
if !h.growing() && (overLoadFactor(h.count 1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
// 走到这里key肯定是不在map中的,如果key在map中,会直接跳到下面done的位置
// inserti为空,就是在当前桶的槽位中和当前桶的溢出桶的槽位中都没有找到插入位置
// 表明已经没有插入位置,需要新添加一个溢出桶,将数据添加到里面
if inserti == nil {
// all current buckets are full, allocate a new one.
// 新建一个溢出桶,将数据插在0号槽位
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}
// 将key-value值添加到待添加到insertk和elme保存的地址中
if t.indirectkey() {
// new一个key类型的对象kmem,将insertk中的内容设置为kmem地址
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
// new一个elem类型的对象vmem,将elem中的内容设置为vmem地址
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
// 将key内容拷贝到insertk的位置
// 注意这里只看到有将key的内容拷贝到insertk保存起来,但没有看到将elem
// 保存起来,是漏了吗?是编译器做了额外的处理,可以通过反汇编查看
typedmemmove(t.key, insertk, key)
*inserti = top
// map中元素 1
h.count
done:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
// 清理掉map的真正写操作标记
h.flags &^= hashWriting
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
// 返回数据elem的地址
return elem
}
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
// makemap的时候有给h.extra和h.extra.nextOverflow赋值
// h.extra.nextOverflow记录的是溢出桶的位置,在make创建
// 一个新map的时候,会预分配一些溢出桶,这里在需要新的桶的时候
// 先从预分配的溢出桶里面获取,如果没有获取到,才会new一个新
// bucket
if h.extra != nil && h.extra.nextOverflow != nil {
ovf = h.extra.nextOverflow
// 检查ovf是不是预分配溢出桶的最后一个桶,预分配的溢出桶的最后一个桶的
// overflow做了特殊的指向处理,指向的是map的一个bucket地址,不是最后
// 一个桶的overflow是空的
if ovf.overflow(t) == nil {
// 不是最后一个预分配溢出桶的最后一个桶,那就使用当前的这个桶,并将记录下一个可供使用的
// 桶的标记nextOverflow向后移动一个位置,供下一次调用时使用
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
// 如果桶的overflow为非空,那说明当前的这个桶已经是预分配溢出桶的最后一个桶了,
// 将当前桶overflow指向标记去掉(makemap的时候指向的是第一个桶的位置),并
// 将记录预分溢出桶标记nextOverflow置空,因为所有的预分配溢出桶都已经使用完了
// 后续在调用newoverflow方法时就需要进行下面的new分配了
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
// 预分配的溢出桶都已经使用完了,new一个新的桶
ovf = (*bmap)(newobject(t.bucket))
}
h.incrnoverflow()
// map中的存储的数据value不含有指针的情况
if t.bucket.ptrdata == 0 {
h.createOverflow()
// 溢出桶保存在extra.overflow切片中,如果存储数据value含有指针
// 不会存在在extra中,这样处理是在不含有指针时不用进行GC检查
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
// b的overflow指向新的桶ovf,这样新的桶ovf可以跟前面的桶串起来了
b.setoverflow(t, ovf)
return ovf
}
// incrnoverflow 方法用来增加h.noverflow的值,noverflow字段保存的是溢出桶的数量
// 该方法是给tooManyOverflowBuckets调用的,目的是为了触发同等map的扩容
// 为了让hmap比较小,noverflow使用的是uint16类型,当桶的数量比较小时,
// noverflow记录的溢出桶数是一个精确值,当桶的数量很多时,noverflow记录的是一个近似值
func (h *hmap) incrnoverflow() {
// 如果溢出桶与桶一样多,会触发相同大小的mpa扩容,
// noverflow能够计数到2^B
if h.B < 16 {
// 直接加1
h.noverflow
return
}
// 当B值大于等于16时,概率性增加noverflow的值,概率大小为1/(2^(h.B-15))
// 也就是当B的值达到2^15-1时候,近似记录溢出桶的数量,因为noverflow是uint16类型
// 如果还是直接不断加1,会存在溢出
mask := uint32(1)<<(h.B-15) - 1
// 例如当h.B=18的时候,mask的值为7,然后调用fastrand()产生一个随机数,再对该随机数
// 与7进行相&处理,即对该数除8求其余数,如果余数是0,才对noverflow加1,可见这个概率为1/8
if fastrand()&mask == 0 {
h.noverflow
}
}
看完了mapassign逻辑,可能有点疑惑,函数签名中咋看不到value参数呢?函数返回的值是elem指向地址,即value所在的内存地址,代码中看到有对key保存处理,typedmemmove(t.key, insertk, key)这个行就是,但是没有看到对value的保存处理。下面写个demo翻译成汇编,看看能否有新发现。
代码语言:javascript复制package main
func main() {
m := make(map[int]int)
m[1] = 123
}
上述代码保存在main.go文件中,将其翻译成汇编代码,执行 go tool compile -S main.go的关键内容如下图所示,最后一步MOVQ $123, (AX)就是保存value,这是由编译器额外生成的汇编指令来完成的,可见靠 runtime有些工作是没有做完的。
map扩容
搞清楚四个问题,我们就理解了map扩容。这四个问题分别是:1为什么要扩容?2 什么时候扩容?3 怎么进行扩容?4 扩容会产生哪些影响?下面将依次回答这个几个问题。为什么要扩容:我们使用map存取数据的目的就是因为它非常高效,哈希函数取的非常好,理想情况下,获取一个键值对只需要O(1)时间复杂度。但是实际是很难达到的,随着map中存放的数据越来越多,向里面添加数据发送碰撞的概率会变大,访问数据效率会下降。为了保证访问效率,就需要在效率不高的时候进行扩容,衡量效率不高的依据是装填因子。还是一种情况是,当一个大型map中大量数据被删除之后,会留下很多empty的空槽位,在查找key的时候,不得不与这些大量空的tophash进行比较,但是这些比较是无意义的,只会浪费时间。所以将这种稀疏的桶进行搬迁之后,进行重新整理,尽量让他们相邻存储,以提升查询效率。
什么时候扩容:在map赋值和删除key的时候,会检查是否需要扩容。触发扩容有两种情况:一种是装填因子超过临界值(6.5)时,即map中元素的数量/桶的数量>6.5,因为一个桶装载8个元素,这说明大部分桶都快满了,有些甚至有在使用溢出桶了。这个时候添加新元素,碰撞的概率就比较大。
代码语言:javascript复制func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
还有一种是溢出桶是否太多,当桶的数量超过2^15个时,如果溢出桶数量也超过2^15,就判断为溢出桶太多,当桶的数量小于等于2^15时,如果溢出桶的数量大于桶的数量,也判断为溢出桶太多。
代码语言:javascript复制// tooManyOverflowBuckets 判断溢出桶的数量是不是跟桶(2^B)的数量一样多, 如果溢出桶的数量很多,
// 也就是出现了桶有大量槽位是空的情况,频繁的删除会导致槽位变空。大量的空槽位会使得查询的时候效率很低,
// 可能要遍历找完所以的槽位,为了让槽位变得密集,不留什么"空洞",需要触发扩容操作,新创建桶,之前的桶
// 称为旧桶,将旧桶中的数据搬迁到新桶中,这样新桶中槽位是密集型的
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// 判断是否有太多桶的阈值不能太大或太小,太小就进行扩容并没有大的意义,做的是无用功
// 如果太大,无论是扩容还是缩容可能有大量分配的空间没有被使用,白白浪费内存
if B > 15 {
B = 15
}
// 如果B大于15,强行设置B为15,即B超过15,不管有多少,noverflow直接与2^15比较,如果大于表示map很稀疏
// 如果B小于等于15,noverflow直接与2^B进行比较,如果大于表示map很稀疏
return noverflow >= uint16(1)<<(B&15)
}
怎么扩容:前面介绍了触发扩容的两种情况,根据触发情况不同,在实现扩容的时候采用了不同策略。对于装填因子超过临界值6.5引起的扩容,会新建一个桶数组,B值会加1,即新桶的数量是原来桶的2倍,这是一种增量扩容,然后将旧桶中的数据搬迁到新桶中。对于有太多溢出桶引起的扩容,也会新建一个桶数组,但B值不变,即新桶的数量与旧桶的一样,然后将旧桶中的数据搬迁到新桶,这个过程可以理解为一个整理过程,相当于磁盘碎片整理,旧桶中的数据很分散,搬到新桶之后会相邻存储,新桶中数据密度很高。这种并不需要扩大桶的数量,因为桶的数量是够的,只不过是装的数据稀疏。称这种扩容为等量扩容,其实是等量搬迁。下面的两幅图展示了增量扩容和等量扩容的流程,可以将图结合下面的代码一起看,更容易理解。
上图展示的是增量扩容的流程,扩容之前,B为2,即有4个桶,key0和key1计算出来的哈希值分别为1001001010010010010101001110001001101010100001101001111000000010和1001001000010010010101001110001001101010100001101001111000000110,它们的最后2个bit都是10,转成十进制就是2,所以它们落在2号桶,当增量扩容后,此时B为3,也就扩容之后有8个桶,现在需要根据key哈希值的最后3个bit,决定它们落在新桶的位置,key0的最后3位为010,对应的十进制还是2,key1的最后3位为110,对应的十进制为6,所以它们分别落在新桶的2号桶和6号桶,从根据最后2bit到看最后3bit的这个过程,称之为rehash。总结起来,对于增量扩容,一个数据落在新桶的位置取决于它的key算出来的哈希值的第B(扩容器前的) 1位,设它在旧桶的桶号为X,如果B 1位为1,则它落在新桶的X 2^B号桶,如果B 1位为0,则它落在新桶的X号桶,即保存桶号不变。
代码语言:javascript复制
等量扩容,新桶的数量和旧桶的数量相同,所以一个元素在老桶的桶号为X,则它在新桶的桶号也为X,变化的只是在桶中槽位,例如上图中,key0和key7,它们都在一个桶号2中,只不过key7在溢出桶中,当进行扩容搬迁后,它们放在了新桶中的2号桶的相邻位置,进行重新排位整合后,溢出桶可能就不需要了。下面来看扩容的代码,跟扩容相关的函数有两个,hashGrow和growWork函数。hashGrow函数只是分配好了新的buckets,并没有做搬迁工作,并将老的buckets挂到了oldbuckets 字段上。真正搬迁buckets的动作在 growWork函数中,而调用growWork函数的动作是在mapassign和 mapdelete函数中。也就是插入/更新、删除 key 的时候,都会尝试进行搬迁buckets的工作。根据oldbuckets是否非nil,来进行搬迁工作。
代码语言:javascript复制// hashGrow 进行扩容字段的创建初始化分配工作
func hashGrow(t *maptype, h *hmap) {
// 如果是装填超过装填因子导致的map扩容,说明之前的桶数量小了,需要扩容更多的桶,
// 新分配桶的数量是原来的2倍,如果是太多溢出桶导致的扩容,则进行等量扩容,不用
// 申请更大数量的桶,只是之前的桶中数据太稀疏了,进行一次重新整理即可
bigger := uint8(1)
if !overLoadFactor(h.count 1, h.B) {
// 是有太多溢出桶导致的扩容,进行等量扩容,bigger调整为0
bigger = 0
h.flags |= sameSizeGrow
}
// 将现在的桶h.buckets挂载到h.oldbuckets中,新创建的桶保存在h.buckets中
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B bigger, nil)
// 将h.flags中的iterator和oldIterator对应位置清0保存在flags
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
// 如果有在进行迭代操作,将迭代标记设置在旧桶上,因为h.buckets中数据将放入到h.oldbuckets中
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
// 更新一批h字段内容
h.B = bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
// 对h.extra进行处理,如果非空,该挂载到h.extra.oldoverflow就挂载在它下面
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
// h.extra.nextOverflow指向新分配桶的溢出桶的位置
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
// growWork 进行真正的搬迁工作,调用一次growWork搬迁两个桶(如只剩一个桶未搬迁,只搬迁一个)
func growWork(t *maptype, h *hmap, bucket uintptr) {
// evacuate进行搬迁工作,执行一次该函数调用只会搬迁一个桶
evacuate(t, h, bucket&h.oldbucketmask())
// 如果h处在扩容状态,还未搬迁完成
if h.growing() {
// 又调用用一次evacuate,再搬迁一个桶
evacuate(t, h, h.nevacuate)
}
}
// evacuate 进行搬迁工作,调用该函数一次只搬迁一个桶
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 获取要搬迁桶的地址,搬迁是将旧桶oldbuckets中数据搬迁到新桶buckets
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 计算旧桶的桶数
newbit := h.noldbuckets()
// 如果b这个桶中的数据还未被搬迁,执行搬迁操作
if !evacuated(b) {
// xy是一个两个元素的数组,x[0]中的bmap指向新桶的前半部分,x[1]中的bmap指向新桶的后半部分
// 举个例子,扩容前h.B为2,那么扩容前桶的数量为2^2=4个,即h.oldbuckets中有4个桶,当加倍
// 扩容之后,即h.B增加到3(2 1),扩容之后桶的数量为2^3=8个,即h.buckets中有8个桶。这8
// 个桶分为两部分,每个部分有4个
// 假如h.oldbuckets旧桶数据tophash, 传入的oldbucket为3,指向210这个桶,e表示桶是空的
// 0 1 2 3
// |7|e|10|210|
// h.buckets新桶情况为,因为这是第一次搬迁,所以都是tophash空的
// 0 1 2 3 4 5 6 7
// |e|e|e|e|e|e|e|e|
var xy [2]evacDst
x := &xy[0]
// 设传入的oldbucket为3,x.b指向新桶第4(索引3)这个桶位置
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
// x.k指向索引3这个桶第一个key的位置
x.k = add(unsafe.Pointer(x.b), dataOffset)
// x.e指向索引3这个桶第一个value的位置
x.e = add(x.k, bucketCnt*uintptr(t.keysize))
// 如果是非等量扩容
if !h.sameSizeGrow() {
y := &xy[1]
// y指向后半部分中的桶,计算公式为 基地址 偏移量, 基地址为h.buckets
// 偏移量为(3 4)*uintptr(t.bucketsize), 即指向新桶索引为7的位置
y.b = (*bmap)(add(h.buckets, (oldbucket newbit)*uintptr(t.bucketsize)))
// y.k指向索引7这个桶第一个key的位置
y.k = add(unsafe.Pointer(y.b), dataOffset)
// y.e指向索引7这个桶第一个value的位置
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}
// 将桶b中的数据搬迁到x或y的桶中,具体是x还是y,根据key对应hash的h.B位的值
for ; b != nil; b = b.overflow(t) {
// k和e分别指向待搬迁桶b中第一个key和第一个value的位置
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 依次对桶b中的8个k-v进行搬迁
for i := 0; i < bucketCnt; i, k, e = i 1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
// 第i个槽位是不是没有放数据,如果没有放数据,continue跳过后面处理
if isEmpty(top) {
// 标记该槽位状态为已进行搬迁处理
b.tophash[i] = evacuatedEmpty
continue
}
// top值理论上不会为2,3,4,因为b肯定是没有搬迁过的,2,3,4表示桶已进了搬迁处理
if top < minTopHash {
throw("bad map state")
}
k2 := k
// 如果k是指针,进行解引用
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
// 如果是非等量扩容,重新确定key在新桶中的位置
if !h.sameSizeGrow() {
// 计算key的哈希值
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// 对于NaN类型的key,每次对它计算 hash,得到的结果都不一样
// 这个 key是通过math.NaN()产生的,它的含义是 not a number,类型是 float64。
// 把NaN类型作为map的key时,会遇到一个问题:再次计算它的哈希值和它当初插入map时计算出来的哈希值不同
// 这种类型的key一旦放入map是无法被Get操作获取的,当使用 m[math.NaN()]语句的时候,是查不出来结果的
// 这种类型的key只有在对map进行遍历的时候才能被找到,并且,可以向一个 map插入多个数量的math.NaN()作为key,
// 它们并不会被互相覆盖。当搬迁碰到 math.NaN() 的 key 时,只通过tophash的最低位决定分配到
// 扩容后桶的前半部分还是后版本部分。如果tophash的最低位是0 ,分配到前半部分,如果是1分配到后半部分
useY = top & 1
// 计算hash值的tophash
top = tophash(hash)
} else {
// 假设库容前h.B为2,在划分桶的时候根据的是hash值的最后的h.B个bit位,即最后2个bit位
// 现在容量扩大一倍了,要看新的h.B为3(2 1),现在要看最后3个bit位。hash&newbit计算出
// 就是判断倒数第3个bit是1还是0,如果还是0,在新桶的位置在前半部分,如果是1在新桶的后半
// 部分
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX 1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
//对b.tophash[i]进行标记,标记是当前槽位扩容后在新桶的前半部分还是后半部分
b.tophash[i] = evacuatedX useY // evacuatedX 1 == evacuatedY
// dst为拷贝到的目标位置,可能是新桶的前半部分还是后半部分,取决于前面判断
dst := &xy[useY] // evacuation destination
// dst.i是从0开始的,当到达8时,表示dst中bmap中已经装满8个数据
if dst.i == bucketCnt {
// 对桶dst.b创建溢出桶
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
// 设置dst.k,dst.e分别为溢出桶的第一个key和value的位置
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 将top值填充到目标桶的tophash中
dst.b.tophash[dst.i&(bucketCnt-1)] = top
// 分别将源桶的key和value拷贝给目标桶的key和value中,如果key或value是指针,进行解引用操作
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2
} else {
typedmemmove(t.key, dst.k, k)
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
// 目标桶的记录key/value的索引 1
dst.i
// 目标桶dst的key和value分别向后偏移1个位置,因为bmap结构体中最后有一个overflow指针
// 保护,这里不用担心偏移之后出现踩内存的问题
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
// h.nevacuate记录的是当前已经搬迁完桶的下一个未搬迁桶的位置,表示的是搬迁的进度,如果当前被搬迁的桶oldbucket
// 与h.nevacuate相同,进行h.nevacuate更新操作,这里注意一下,什么时候出现oldbucket与h.nevacuate相同呢?
// 回到growWork中看,第二次调用evacuate(t, h, h.nevacuate),传递的参数就是h.nevacuate
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
// advanceEvacuationMark处理核心是检查所有的桶是否全部搬迁完成
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
// 小于等于h.nevacuate的桶都已经搬迁完,这里h.nevacuate 1,接下来会检查从
// h.nevacuate 1的桶是否已完成搬迁
h.nevacuate
// 经过试验验证,将stop设置为当前进度 1024比较合理,因为1024至少会比newbit高出一个数量级
// 所以,用当前进度加上1024用于确保O(1)行为
stop := h.nevacuate 1024
if stop > newbit {
// stop强行设置为newbit,最大的桶才为newbit
stop = newbit
}
// 在stop的范围内,探测出一个还未搬迁桶的位置
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate
}
// 当还未搬迁的桶在newbit位置时,说明所有的桶都搬迁完成
if h.nevacuate == newbit { // newbit == # of oldbuckets
// 旧桶全部搬迁完成,置为nil,旧桶的数据可以被GC回收了
h.oldbuckets = nil
// 如果map中存储数据的key和value都不含指针,保存它们的bucket桶数组是存储在hmap.extra中的
// 这种情况下,搬迁的桶其实是h.extra.oldoverflow中的桶,现在已经搬迁完毕,需要将
// h.extra.oldoverflow置为nil
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 扩容搬迁完毕,清理扩容的标志位
h.flags &^= sameSizeGrow
}
}
上面的扩容代码已做了详尽的注解,结合前面的扩容流程图,理解不难。这里稍微说明下代码中出现的xy区,对于增量扩容来说,新桶的数量会是旧桶的2倍大,新桶的前一半区域称为x区,后一半区域称为y区。对于等量扩容来说,新桶数量和旧桶一样大,可以理解成只有x区没有y区。在扩容的时候,每次只搬迁2个桶,growWork中调了两次evacuate,evacuate每次只搬迁一个桶。这么做是为了一种性能考量,如果map存储了数以亿计的键值对,一次性全部搬迁将会造成比较大的延时,在添加元素或删除元素的时候出现卡顿,因此采用逐步搬迁策略。
扩容会产生什么影响:扩容会对性能有影响,在我们添加或删除元素的时候,存在扩容的可能,希望扩容越快越好,所以官方实现的时候采用了逐步搬迁的办法,每次只搬迁两个桶。还有一个是对遍历有影响,逐步搬迁是一个过程,当我们对map进行遍历的时候,map可能没有扩容完毕,这时候遍历的时候要防止不能从老桶中获取数据又从新桶中获取到了数据。扩容会增加遍历代码实现的难度。
map遍历
通过for range遍历map的时候,实现是通过初始化一个迭代器对象,迭代器结构如下,每次从迭代器对象中找到要访问的bmap和当前的槽位。map在每次遍历的时候,返回的顺序是不一样的,这是怎么做到的呢?每次遍历的时候,会随机产生从哪个桶开始迭代,并且从头bmap中的哪个槽位开始也是随机的,这些信息都记录在hiter的startBucket和offset中,具体实现细节看下面的代码注解。
遍历map的迭代器结构如下
代码语言:javascript复制// map的迭代器结构,遍历map的时候使用
type hiter struct {
// 指向迭代map的key
key unsafe.Pointer
// 指向迭代map的value
elem unsafe.Pointer
// 存储迭代map的类型
t *maptype
// 存储迭代的map的结构体头
h *hmap
// 记录迭代的map中hmap.buckets的值
buckets unsafe.Pointer
// 记录当前正在访问的bmap
bptr *bmap
// 保存hmap.buckets的值
overflow *[]*bmap
// 保存hmap.oldbuckets的值
oldoverflow *[]*bmap
// 记录开始是从哪个桶开始遍历的,起始桶的选择是随机的
startBucket uintptr
// offset记录开始遍历时是从桶中哪个槽位开始的,起始槽位的选择也是随机的
offset uint8
// 记录是否已经到桶数组中最后一个桶了,是用来辅助终止遍历桶用的
wrapped bool
// 迭代的map中的桶数的log2,与hmap中B的值相同
B uint8
// 记录下一次要访问的元素在哪个槽位
i uint8
// 记录桶数组中下一代访问桶的偏移值
bucket uintptr
checkBucket uintptr
}
遍历实现代码在mapiterinit和mapiternext函数里,从起始为startBucket桶中的offset槽位开始遍历,整个遍历都是在新桶buckets中进行的。每对一个新桶进行遍历的时候,先检查该桶对应老桶的状态,当该桶位于前半部分的时候,该桶的序号和老桶的是相同,直接检查老桶中对应序号桶的状态,如果是已搬迁状态,不用遍历该老桶了。如果是还未搬迁,需要将老桶中以后会搬迁到当前新桶的元素提取出来完成遍历,如果没有直接跳过。当该桶位于后半部分的时候,该桶的序号和老桶相差2^(B-1),直接减去2^(B-1)后定位到老桶,然后检查老桶的状态,如果是已搬迁,不用遍历老桶,如果还未搬迁,将老桶中以后会搬迁到当前新桶的元素提取出来完成遍历。
如上图所示,起始桶是4号桶,桶的遍历顺序为4->5->6->7->0->1->2->3.桶内的起始槽位为1。先检查4号对应老桶序号为0(4-2^(3-1)),发现老桶0号桶已搬迁完,直接不管老桶,遍历4号桶,从槽位1->2->3->...->7->1访问到槽位0中的{2,b}键值对,然后遍历5号桶,发现5号桶对应的老桶1号桶已搬迁,5号桶又没有元素,继续遍历6号桶,发现6号桶对应的老桶2还未搬迁,开始对老桶2号桶进行遍历,也是从槽位1开始,槽位1的key值为6,计算哈希值后发现它将搬迁到新桶的6号桶,所以访问它,然后继续查看槽位2,3...直到槽位0,槽位0的key值为5,计算哈希值后发现它将搬迁到新桶的2号桶,跳过不访问它。这样遍历一圈之后,得到的键值对为{2,b},{6,f},{1,a},{4,d},{3,c},{5,e}
代码语言:javascript复制// mapiterinit初始化一个对map进行for range迭代使用到的hiter结构对象it
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// data race检查
if raceenabled && h != nil {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiterinit))
}
// 如果map还未初始化,或者还没有存放数据,直接返回
if h == nil || h.count == 0 {
return
}
// 检查hiter{}大小是否为96
if unsafe.Sizeof(hiter{})/sys.PtrSize != 12 {
throw("hash_iter size incorrect")
}
// 对it做一些初始化,将t和h信息赋值给it
it.t = t
it.h = h
// 保存当前map的B值以及桶的位置
it.B = h.B
it.buckets = h.buckets
// 桶不含指针类型数据
if t.bucket.ptrdata == 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// 生产一个随机数,确定从map的哪个桶开始遍历
r := uintptr(fastrand())
// h.B大于28
if h.B > 31-bucketCntBits {
r = uintptr(fastrand()) << 31
}
// r&bucketMask(h.B)其实就是r%(2^h.B)操作,随机确定一个起始位置保存在startBucket中
// 然后会从当前位置的桶开始向后遍历,走一圈后又回到到startBucket的时候,表示遍历结束
it.startBucket = r & bucketMask(h.B)
// offset记录的是从哪个槽位开始,可以看到offset产生与r有关,r是一个随机数,那说明offset
// 也是一个随机值。map的for range遍历是从随机的桶中随机的一个槽位开始的,这就是为什么对
// map进行遍历的时候,产生的顺序不同的原因
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// it.bucket记录的是当前要正在等待要迭代访问的桶
it.bucket = it.startBucket
// 当前的迭代是可以和另一个迭代并行的,将h的flags标记设置为3
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
mapiternext(it)
}
func mapiternext(it *hiter) {
// 拿到it中存储的hmap
h := it.h
if raceenabled {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiternext))
}
// 检查map是不是有写操作,迭代是读操作,不能与写操作并行,如果正在对map进行写操作,抛出错误
if h.flags&hashWriting != 0 {
throw("concurrent map iteration and map write")
}
// 拿到map的类型
t := it.t
// 当前待访问的桶
bucket := it.bucket
// b为nil
b := it.bptr
// i为0
i := it.i
checkBucket := it.checkBucket
next:
if b == nil {
// 如果当前的桶地址和开始遍历的桶地址相同,并且顺序遍历了一圈(it.wrapped),表明已经遍历完成了
if bucket == it.startBucket && it.wrapped {
it.key = nil
it.elem = nil
// 遍历完成,直接退出
return
}
// 如果已在扩容
if h.growing() && it.B == h.B {
// bucket&操作是为了防止溢出,oldbucket为当前要处理桶的位置,可以理解为桶数组的下标
oldbucket := bucket & it.h.oldbucketmask()
// b为旧桶数组中距离第一桶偏移量为oldbucket个桶的内存地址
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 检查桶b数据是否已搬迁
if !evacuated(b) {
// 如果未搬迁,checkBucket记录下偏移量(bucket)
checkBucket = bucket
} else {
// 如果已搬迁,要访问的桶为新桶数组中偏移量为bucket地方的桶
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
// noCheck为1<<(8*8)-1,标记用
checkBucket = noCheck
}
} else {
// 没有进行扩容,即oldbuckets是空的,所有的数据都保存当前桶it.buckets中
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
bucket
// bucket已经走到桶数组的末尾了,将回到开头的位置,wrapped为true表示走了一圈了
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
for ; i < bucketCnt; i {
// offi槽位的偏移量
offi := (i it.offset) & (bucketCnt - 1)
// 如果槽位offi没有数据或数据已搬迁,跳过后面处理,直接判断下一个槽位
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
continue
}
// 获取槽位中的key
k := add(unsafe.Pointer(b), dataOffset uintptr(offi)*uintptr(t.keysize))
// 如果是指针类型,进行解引用
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 获取槽位中的value
e := add(unsafe.Pointer(b), dataOffset bucketCnt*uintptr(t.keysize) uintptr(offi)*uintptr(t.elemsize))
// checkBucket不为noCheck,需要进一步检查该桶是否已搬迁还是未搬迁
if checkBucket != noCheck && !h.sameSizeGrow() {
// 计算两次key是否相等,对于math.NaN()值的key计算哈希值每次得到都会不一样
if t.reflexivekey() || t.key.equal(k, k) {
// 计算key的哈希值保存在hash中
hash := t.hasher(k, uintptr(h.hash0))
// it.B是扩容后的B,将hash与it.B的桶掩码进行相与处理之后,得到是落在新桶的
// 位置,注意checkBucket值来自于bucket,bucket是当前要被访问的桶,bucket
// 范围是新桶的范围,所以这里将hash与B桶掩码相与之后,如果hash值的第B个bit位为1
// 说明落在扩容后的桶的后半部分,如果为0则落在前半部分。如果当前访问的桶checkBucket
// 在后半部分,恰好hash值的第B个bit位为1,它们是相等的,会继续走后面的逻辑,否则不会
// 处理
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
// 对于math.NaN()值为key的处理,由于计算出来的哈希值每次都在变化,
// checkBucket有可能位于y区(后半区)也有可能位于x区(前半区),当前
// checkBucket位于前半区时,checkBucket>>(it.B-1)后得到值始终都是
// 0,当前checkBucket位于后半区时,得到值始终都是1. 此时hash值每次都是
// 变化的不能再根据它决定是否要访问,得到找一个固定不变的值,对,这里找的就是
// b.tophash[offi],将它&1之后也会得到两种值,要么是0要么1.下面处理得到
// 的效果是,如果当前访问的bucket位于后半区,并且offi的tophash值最低位为1
// 这次就访问它,否则不访问,将offi的tophash值最低位为0留在bucket在前半区
// 的时候访问
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// 槽位offi没有处在扩容的状态,直接获取它的key和value值给it
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// mapaccessK获取map中键为k的键值对,整个mapaccessK的处理逻辑与mapaccess1基本一样
rk, re := mapaccessK(t, h, k)
// 如果key已被删除,直接continue,跳过当前的key的访问
if rk == nil {
continue
}
// 将获取到的key和value值赋值给迭代器it
it.key = rk
it.elem = re
}
// 更新迭代器中待访问的桶bucket位置
it.bucket = bucket
if it.bptr != b {
// it.bptr记录当前正在访问哪个桶,下一次迭代的时候,继续从it.bptr桶中取数据
it.bptr = b
}
// 更迭代器中的槽位位置,为下一次迭代做准备
it.i = i 1
it.checkBucket = checkBucket
return
}
// 检查是否有溢出桶,如果由,继续遍历溢出桶
b = b.overflow(t)
i = 0
goto next
}
map删除
map删除代码是在mapdelete函数中实现的,代码比较简单,由于篇幅原因,这里就不贴代码详细注解了,理解起来有不明白的非常欢迎与我沟通交流。删除的整体逻辑也是根据key定位到存储key-value的位置,然后对应位置"置空",在定位key的位置前,检查map是否处在扩容过程中,如果已在扩容中,进行一次搬迁操作,最后将key对应位置的tophash设置为empty状态,将记录map中元素数据的计数器count减一。
总结
Go中map实现采用的哈希表,通过拉链法来解决冲突问题,物理实现结构是用一个固定大小的桶数组加链表构成。在实现的过程中采用了很多优化点,总结如下。
- 每个map都有一个随机哈希种子,用于降低哈希碰撞的概率
- 根据key的类型确定哈希函数,对不同类型针对性优化哈希计算方式
- 定位key在哪个桶的操作采用&运算符而不是%,直接进行二进制运算
- 每个bmap存储8个key-value,存储的时先存储8个key然后在存储8个value,而不是安装key-value-key-value的形式存储,便于内存对齐,减少了不必要的内存浪费
- 当向桶中添加了超多的元素,超过了装填因子的临界值(6.5),或者多次增删操作,造成溢出桶过多,均会触发扩容
- 触发扩容时对不同的原因做不同处理,如果是装填因子不满足,则进行一倍扩容,保证桶的装载能力,对于溢出桶太多,进行等量扩容,相当于重新整理内存,减少不必要的溢出桶,加快读写速度。
- 采用渐进扩容,每次只搬迁2个桶,防止一次扩容需要搬迁的key数量过多,引发性能问题。触发扩容的时机是在增加元素的时候,桶搬迁的时机则发生添加元素和删除元素的时候
- 桶内若只有overflow桶链表是指针,则overflow类型转为uintptr,并使用mapextra引用该桶,避免桶的gc扫描又保证其overflow桶存活
- 对指定不同大小的map初始化,区别对待,不必要的桶预分配就避免,桶较多的情况下(B>=4),也增加overflow桶的预分配
开放寻址法VS链表法[1]treemap[2]Go之读懂map的底层设计[3]深入解析Golang的map设计[4]
Reference
[1]
开放寻址法VS链表法: https://blog.csdn.net/weixin_41563161/article/details/105104239
[2]
treemap: https://wizardforcel.gitbooks.io/think-dast/content/12.html
[3]
Go之读懂map的底层设计: http://blog.newbmiao.com/2020/02/04/dig101-golang-map.html
[4]
深入解析Golang的map设计: https://zhuanlan.zhihu.com/p/273666774