源码解读
concurrent-map的readme中说,这是一个高性能的并发安全的map,一起看源码来解读下他是如何实现高性能的。
https://github.com/orcaman/concurrent-map/blob/master/concurrent_map.go
源码相当精简,只有区区343行。先看如何设计的数据结构
代码语言:txt复制var SHARD_COUNT = 32
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared
// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}
// Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
ConcurrentMap
是真正的对外的结构,其内部是一个预设为32个元素的ConcurrentMapShared
,而ConcurrentMapShared
其内部封装这一个匿名读写锁sync.RWMutex
和一个原生的map。
看到这里大致可猜出他是如何实现并发时的高性能的了。对于一个非并发安全的map,要实现并发安全,肯定要加一个全局锁。而这里使用32个map结构,32个锁,通过降低锁的粒度,来减小锁等待。
基本接口
ConcurrentMap
提供了map应有的基本接口
// 获取分区key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
// 合并map
func (m ConcurrentMap) MSet(data map[string]interface{})
// 添加一个元素
func (m ConcurrentMap) Set(key string, value interface{})
// 获取一个元素
func (m ConcurrentMap) Get(key string) (interface{}, bool)
// 计算有多少元素
func (m ConcurrentMap) Count() int
//判断元素是否存
func (m ConcurrentMap) Has(key string) bool
// 移除指定元素
func (m ConcurrentMap) Remove(key string)
// 获取并移除指定的元素
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
// 判断是否是map
func (m ConcurrentMap) IsEmpty() bool
// 清空map
func (m ConcurrentMap) Clear()
set接口
先来看set接口:
代码语言:txt复制func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
根据请求key找到这个key所在的分区,对分区加锁,然后再set,最后解锁。通过对key做哈希,将应该设置的全局锁分散到32个细粒度的分区锁,降低获取锁时的等待概率,从而提高并发量。
代码语言:txt复制// GetShard returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}
而分区的选择也相对简单,对请求key做一次hash,对其取模找到对应的分区。
get接口
代码语言:txt复制// Get retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
get接口和set接口基本上是一样的,只是加锁的粒度不一样,set接口是加的写锁,保证写时串行。而get接口是加的读锁,在没有写操作时,任意协程均可以获取到读锁,进行读取数据。
count接口
代码语言:txt复制func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i {
shard := m[i]
shard.RLock()
count = len(shard.items)
shard.RUnlock()
}
return count
}
count接口的实现是单独对遍历分区加锁后,累加这个分区内的元素个数。个人认为在高并发情况下,这个count的值是不准确的。这里的不准确是只调用count时map真实元素数量和调用结束后map的真实元素数量可能不同。因为锁是加在分区上的,当在遍历2号分区时,1号分区写入了一个新元素,由于对1号分区写入数据并不影响2号分区,因此,此时1号分区的真实数量和已经累加过的1号分区的数量就有差别了。当然在高并发场景下也不必纠结于count的准确性。
其他的一些基本接口和上面的都比较类似,就是读取类接口,获取分区后加读锁,保证读不互斥;修改类操作,获取分区后,加写锁,保证一致性。
高级接口
还提供了一些高级接口,比如回调函数接口,
1. 插入-更新回调
代码语言:txt复制type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
2. 移除回调
代码语言:txt复制// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb func(key string, v interface{}, exists bool) bool
// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
remove := cb(key, v, ok)
if remove && ok {
delete(shard.items, key)
}
shard.Unlock()
return remove
}
3. 迭代回调
代码语言:txt复制// Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key string, v interface{})
// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap) IterCb(fn IterCb) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}
基准测试
代码中和go提供的sync.map做了比较。
代码语言:txt复制func BenchmarkSingleInsertPresent(b *testing.B) {
m := New()
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i {
m.Set("key", "value")
}
}
func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
var m sync.Map
m.Store("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i {
m.Store("key", "value")
}
}
代码语言:txt复制go test -bench=InsertPresent -benchtime 5s
goos: linux
goarch: amd64
pkg: concurrent-map
BenchmarkSingleInsertPresent-8 172822759 34.9 ns/op
BenchmarkSingleInsertPresentSyncMap-8 65351324 92.9 ns/op
从结果看,set固定元素。concurrent-map
比sync.map
快约3倍。
再看看插入不同的key时的表现。
代码语言:txt复制func benchmarkMultiInsertDifferent(b *testing.B) {
m := New()
finished := make(chan struct{}, b.N)
_, set := GetSet(m, finished)
b.ResetTimer()
for i := 0; i < b.N; i {
go set(strconv.Itoa(i), "value")
}
for i := 0; i < b.N; i {
<-finished
}
}
func BenchmarkMultiInsertDifferentSyncMap(b *testing.B) {
var m sync.Map
finished := make(chan struct{}, b.N)
_, set := GetSetSyncMap(&m, finished)
b.ResetTimer()
for i := 0; i < b.N; i {
go set(strconv.Itoa(i), "value")
}
for i := 0; i < b.N; i {
<-finished
}
}
func BenchmarkMultiInsertDifferent_1_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 1)
}
func BenchmarkMultiInsertDifferent_16_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 16)
}
func BenchmarkMultiInsertDifferent_32_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 32)
}
func BenchmarkMultiInsertDifferent_256_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 256)
}
func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
oldShardsCount := SHARD_COUNT
SHARD_COUNT = shardsCount
bench(b)
SHARD_COUNT = oldShardsCount
}
代码语言:txt复制go test -bench=InsertDifferent -benchtime 5s
goos: linux
goarch: amd64
pkg: concurrent-map
BenchmarkMultiInsertDifferentSyncMap-8 560900 11996 ns/op
BenchmarkMultiInsertDifferent_1_Shard-8 1000000 7499 ns/op
BenchmarkMultiInsertDifferent_16_Shard-8 10377100 662 ns/op
BenchmarkMultiInsertDifferent_32_Shard-8 10511775 603 ns/op
BenchmarkMultiInsertDifferent_64_Shard-8 11624546 590 ns/op
BenchmarkMultiInsertDifferent_128_Shard-8 11773946 578 ns/op
BenchmarkMultiInsertDifferent_256_Shard-8 7914397 912 ns/op
sync.map在插入不同key时的表现似乎是最差的。在concurrent-map的分区数设置为1时,可以认为是对单个map加了全局读写锁,居然也比sync.map要快。但sync.map和分区为1的concurrent-map在多次测试时差异比较大,有时sync.map快,有时分区为1的concurrent-map快。单都不会比分区为16以上的concurrent-map快。而且并不是分区数越大越快,在分区数为256时,执行速度已经开始变慢了。
参考
- concurrent-map git仓库