golang源码分析:fastcache

2023-09-06 19:28:39 浏览数 (2)

https://github.com/VictoriaMetrics/fastcache是一个使用循环缓冲区(RingBuffer)的缓存库,因使用RingBuffer结构,所以没有GC开销。Fastcache在创建期间达到设置的最大大小时会自动驱逐旧条目,线程安全的,键和值都是byte slice。这个库是fasthttp的作者开发的,思路和bigcache一致,但是对于bigcache里BytesQueue的设计进行改进,使用一个环形数组[][]byte来实现,扩容的时候只需要进行append即可。分成512个bucket,使用Mmap来分配内存,脱离GC约束,去掉数组里GC扫描带来的性能压力;bucket里的每个chunk 64k,避免CPU伪共享。不支持过期时间。首先看下如何使用,然后分析下它的源码。

代码语言:javascript复制
package main

import (
  "fmt"

  "github.com/VictoriaMetrics/fastcache"
)

func main() {
  // 初始化 cache, 其中 1024 为存储的最大数据,可以根据使用场景调整
  c := fastcache.New(1024)
  defer c.Reset()

  for i := 0; i < 100; i   {
    k := []byte(fmt.Sprintf("key %d", i))
    v := []byte(fmt.Sprintf("value %d", i))

    fmt.Println(len(v))

    // 设置 key
    c.Set(k, v)
    vv := c.Get(nil, k)
    if string(vv) != string(v) {
      fmt.Println("unexpected value for key %q; got %q; want %q", k, vv, v)
    } else {
      fmt.Println("set value ok", string(vv))
    }

    c.Del(k)
    vv = c.Get(nil, k)
    if len(vv) > 0 {
      fmt.Println("unexpected non-empty value got for key %q: %q", k, vv)
    } else {
      fmt.Println("delete value ok", string(vv), string(k))
    }
  }
}

然后我们分析下它的源码,创建cache的函数定义在fastcache.go,它先定义一个结构体然后,依次初始化每一个bucket

代码语言:javascript复制
func New(maxBytes int) *Cache {
  if maxBytes <= 0 {
    panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes))
  }
  var c Cache
  maxBucketBytes := uint64((maxBytes   bucketsCount - 1) / bucketsCount)
  for i := range c.buckets[:] {
    c.buckets[i].Init(maxBucketBytes)
  }
  return &c
}

其中cache就包含buckets和统计数据两部分。

代码语言:javascript复制
type Cache struct {
  buckets [bucketsCount]bucket


  bigStats BigStats
}
代码语言:javascript复制
// BigStats contains stats for GetBig/SetBig methods.
type BigStats struct {
  // GetBigCalls is the number of GetBig calls.
  GetBigCalls uint64


  // SetBigCalls is the number of SetBig calls.
  SetBigCalls uint64


  // TooBigKeyErrors is the number of calls to SetBig with too big key.
  TooBigKeyErrors uint64


  // InvalidMetavalueErrors is the number of calls to GetBig resulting
  // to invalid metavalue.
  InvalidMetavalueErrors uint64


  // InvalidValueLenErrors is the number of calls to GetBig resulting
  // to a chunk with invalid length.
  InvalidValueLenErrors uint64


  // InvalidValueHashErrors is the number of calls to GetBig resulting
  // to a chunk with invalid hash value.
  InvalidValueHashErrors uint64
}

bucket的数量是512

代码语言:javascript复制
const bucketsCount = 512

具体bucket定义如下,它的初始化定义了环形缓冲区数组和map结构。

代码语言:javascript复制
type bucket struct {
  mu sync.RWMutex


  // chunks is a ring buffer with encoded (k, v) pairs.
  // It consists of 64KB chunks.
  chunks [][]byte


  // m maps hash(k) to idx of (k, v) pair in chunks.
  m map[uint64]uint64


  // idx points to chunks for writing the next (k, v) pair.
  idx uint64


  // gen is the generation of chunks.
  gen uint64


  getCalls    uint64
  setCalls    uint64
  misses      uint64
  collisions  uint64
  corruptions uint64
}
代码语言:javascript复制
func (b *bucket) Init(maxBytes uint64) {
  if maxBytes == 0 {
    panic(fmt.Errorf("maxBytes cannot be zero"))
  }
  if maxBytes >= maxBucketSize {
    panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize))
  }
  maxChunks := (maxBytes   chunkSize - 1) / chunkSize
  b.chunks = make([][]byte, maxChunks)
  b.m = make(map[uint64]uint64)
  b.Reset()
}

存入数据的时候一样,先计算hash值,然后,找到对应的bucket,将k,v还有hash值都存入对应的桶。

代码语言:javascript复制
func (c *Cache) Set(k, v []byte) {
  h := xxhash.Sum64(k)
  idx := h % bucketsCount
  c.buckets[idx].Set(k, v, h)
}
代码语言:javascript复制
const chunkSize = 64 * 1024

存入桶的过程如下,先,把key大小和value大小,编码,然后和k,v一起被append到chunk里面。最后在map里存上k和对应的偏移量。最后更新当前chunk的偏移量。

代码语言:javascript复制
func (b *bucket) Set(k, v []byte, h uint64) {
  chunks := b.chunks
  var kvLenBuf [4]byte
  kvLenBuf[0] = byte(uint16(len(k)) >> 8)
  kvLenBuf[1] = byte(len(k))
  kvLenBuf[2] = byte(uint16(len(v)) >> 8)
  kvLenBuf[3] = byte(len(v))
 
  chunk = append(chunk, kvLenBuf[:]...)
  chunk = append(chunk, k...)
  chunk = append(chunk, v...)
  chunks[chunkIdx] = chunk
  
  b.m[h] = idx | (b.gen << bucketSizeBits)
  b.idx = idxNew

如果获取chunk失败就需要分配chunk,这个时候使用了unix。Mmap函数,不受golangGC的管理,所以不会有gc扫描的消耗,对应源码位于malloc_mmap.go

代码语言:javascript复制
func getChunk() []byte {
  freeChunksLock.Lock()
  if len(freeChunks) == 0 {
// Allocate offheap memory, so GOGC won't take into account cache size.
// This should reduce free memory waste.
    data, err := unix.Mmap(-1, 0, chunkSize*chunksPerAlloc, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_ANON|unix.MAP_PRIVATE)
    if err != nil {
      panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err))
    }
    for len(data) > 0 {
       p := (*[chunkSize]byte)(unsafe.Pointer(&data[0]))
      freeChunks = append(freeChunks, p)
      data = data[chunkSize:]
    }
  }
  n := len(freeChunks) - 1
  p := freeChunks[n]
  freeChunks[n] = nil
  freeChunks = freeChunks[:n]
  freeChunksLock.Unlock()
  return p[:]
}

接下来看下,数据读取的流程:一样,先hash,定位到桶,然后找到偏移量对应位置,解析k,v长度,解析对应数据。

代码语言:javascript复制
func (b *bucket) Get(dst, k []byte, h uint64, returnDst bool) ([]byte, bool) {
      v := b.m[h]
      chunk := chunks[chunkIdx]
      kvLenBuf := chunk[idx : idx 4]
      keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1])
      valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3])
      dst = append(dst, chunk[idx:idx valLen]...)

不过删除过程中,不会清理环形缓冲区中的数据。

代码语言:javascript复制
func (b *bucket) Del(h uint64) {
  b.mu.Lock()
  delete(b.m, h)

在bigcache.go中也提供了GetBig和SetBig的接口,思路类似的。

代码语言:javascript复制
func (c *Cache) GetBig(dst, k []byte) (r []byte) {
代码语言:javascript复制
func (c *Cache) SetBig(k, v []byte) {

0 人点赞