golang源码分析:freecache

2023-09-06 19:27:52 浏览数 (2)

github.com/coocood/freecache采用分片(shard)设计,划分了256个segments;每个分片都有自己独立的锁。实现类似于bigcache的索引操作,slotsData存放索引,RingBuf存放具体数据;只存在 512 个指针(每个segments两个指针,slotsData和RingBuf切片;存储空间都是预先分配好的,所以一开始就会占用较大的内存;环形数组RingBuffer的内存是预先分配的,数据过期不会清理存储空间,只是做标志。

它还提供了一个兼容Redis协议的server服务(server/main.go),支持get/set/del等常用kv命令,这样就给远程服务提供接口操作本地cache了。虽然实际用途不大,但是对于学习Redis协议和TCP服务有参考意义。

每个 segment 包含 256 个索引槽,用来对 key 进行快速检索,通过 uint8(hashVal >> 8) 运算获取到 key 对应索引槽的 slotId。每个槽中又有 n 个索引,每个槽中的索引数量是统一的,由 slotCap 进行控制,当某个槽中的索引的数量大于 slotCap 时,就会触发整个索引的扩容。每个槽的索引数据是存储在 slotsData 这个索引切片中的,256 个槽共用同一个索引切片,每个槽在索引切片中都是按照 hash16 顺序排列的。

当对 key 进行 set、get、del 等操作时,freecache 使用 xxhash 这个 hash 方法,对 key 计算得到一个64位的 hashValue。通过 hashVal & 255 得到 segId,将 key 定位到具体的 segment,并对 segment 加锁。由于每次只对单个 segment 加锁,不同 segment 之间可以并发进行 key 操作。可以看出 freecache 的底层只有 512 个指针(256 个 segment ,每个 segment 包含一个 slotsData 切片和一个 RingBuf),所以 freecache 的对GC开销几乎为0。freecache set 时,先查询当前 key 是否已存在,根据 segId 和 slotId 快速定位到 entry 所在的索引槽。由于索引槽中的索引都是按照 hash16 顺序排列的,可以用二分查找法检索(复杂度 O(log2n))。首先看下如何使用,然后看下它的源码。

代码语言:javascript复制
package main

import (
  "fmt"
  "runtime/debug"

  "github.com/coocood/freecache"
)

func main() {
  // In bytes, where 1024 * 1024 represents a single Megabyte, and 100 * 1024*1024 represents 100 Megabytes.
  cacheSize := 100 * 1024 * 1024
  cache := freecache.NewCache(cacheSize)
  debug.SetGCPercent(20)
  key := []byte("abc")
  val := []byte("def")
  expire := 60 // expire in 60 seconds
  cache.Set(key, val, expire)
  got, err := cache.Get(key)
  if err != nil {
    fmt.Println(err)
  } else {
    fmt.Printf("%sn", got)
  }
  affected := cache.Del(key)
  fmt.Println("deleted key ", affected)
  fmt.Println("entry count ", cache.EntryCount())
}

运行结果如下

代码语言:javascript复制
 % go run ./test/cache/exp5/main.go
def
deleted key  true
entry count  0

server/main.go定义了一个简单的server,实现了类似redis的协议,限制只有新分配的内存超过现有分配内存的10%的时候触发GC,它用到了debug包的函数func SetGCPercent(percent int) int ,如果这边比例是负数,可以不触发GC 。

代码语言:javascript复制
func main() {
  runtime.GOMAXPROCS(runtime.NumCPU() - 1)
  server := NewServer(256 * 1024 * 1024)
  debug.SetGCPercent(10)
  go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
  }()
  server.Start(":7788")
}
代码语言:javascript复制
type Server struct {
  cache *freecache.Cache
}
代码语言:javascript复制
func NewServer(cacheSize int) (server *Server) {
  server = new(Server)
  server.cache = freecache.NewCache(cacheSize)
  return
}

server启动后会监听连接,每新建立一个连接,创建一个新的session,分配两个协程分别负责读写,读协程负责解析请求命令,将结果通过chanel传递给写协程最后返回给客户端。

代码语言:javascript复制
func (server *Server) Start(addr string) error {
      l, err := net.Listen("tcp", addr)
        for {
            tcpListener := l.(*net.TCPListener)
            tcpListener.SetDeadline(time.Now().Add(time.Second))
            conn, err := l.Accept()
            session := new(Session)
            session.conn = conn
            session.replyChan = make(chan *bytes.Buffer, 100)
            session.addr = conn.RemoteAddr().String()
            session.server = server
            session.reader = bufio.NewReader(conn)
            go session.readLoop()
            go session.writeLoop()
代码语言:javascript复制
type Session struct {
  server    *Server
  conn      net.Conn
  addr      string
  reader    *bufio.Reader
  replyChan chan *bytes.Buffer
}

读协程会解析它的协议:

代码语言:javascript复制
func (down *Session) readLoop() {
  var req = new(Request)
  req.buf = new(bytes.Buffer)
  for {
    req.Reset()
    err := down.server.ReadClient(down.reader, req)
      if len(req.args) == 4 && bytes.Equal(req.args[0], SETEX) {
      expire, err := btoi(req.args[2])
      if err != nil {
        reply.Write(ERROR_UNSUPPORTED)
      } else {
        down.server.cache.Set(req.args[1], req.args[3], expire)
        reply.Write(OK)
      }
      } else if len(req.args) == 2 {
      if bytes.Equal(req.args[0], GET) {
        value, err := down.server.cache.Get(req.args[1])
代码语言:javascript复制
func (down *Session) writeLoop() {
  var buffer = bytes.NewBuffer(nil)
  var replies = make([]*bytes.Buffer, 1)
  for {
    buffer.Reset()
    select {
    case reply, ok := <-down.replyChan:
      for _, reply := range replies {
        if reply == nil {
          buffer.Write(NIL)
          continue
        }
        buffer.Write(reply.Bytes())

cache的核心逻辑位于cache.go

代码语言:javascript复制
func NewCache(size int) (cache *Cache) {
  return NewCacheCustomTimer(size, defaultTimer{})
}
代码语言:javascript复制
func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
  cache = new(Cache)
  for i := 0; i < segmentCount; i   {
    cache.segments[i] = newSegment(size/segmentCount, i, timer)
  }

它包含256个分段,每个分段都有自己的锁,可以尽可能避免锁竞争。读写都是先通过hash定位到对应的分段,然后在分段上进行操作的。

代码语言:javascript复制
type Cache struct {
  locks    [segmentCount]sync.Mutex
  segments [segmentCount]segment
}
代码语言:javascript复制
  segmentCount = 256
代码语言:javascript复制
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
      err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
代码语言:javascript复制
func (cache *Cache) Get(key []byte) (value []byte, err error) {
      value, _, err = cache.segments[segID].get(key, nil, hashVal, false)

分段的详细定义位于segment.go,它的rb属性 seg.rb是一个环形缓冲区

代码语言:javascript复制
type segment struct {
  rb            RingBuf // ring buffer that stores data
  segId         int
  _             uint32
  missCount     int64
  hitCount      int64
  entryCount    int64
  totalCount    int64      // number of entries in ring buffer, including deleted entries.
  totalTime     int64      // used to calculate least recent used entry.
  timer         Timer      // Timer giving current time
  totalEvacuate int64      // used for debug
  totalExpired  int64      // used for debug
  overwrites    int64      // used for debug
  touched       int64      // used for debug
  vacuumLen     int64      // up to vacuumLen, new data can be written without overwriting old data.
  slotLens      [256]int32 // The actual length for every slot.
  slotCap       int32      // max number of entry pointers a slot can hold.
  slotsData     []entryPtr // shared by all 256 slots
}
代码语言:javascript复制
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
  seg.rb = NewRingBuf(bufSize, 0)
  seg.segId = segId
  seg.timer = timer
  seg.vacuumLen = int64(bufSize)
  seg.slotCap = 1
  seg.slotsData = make([]entryPtr, 256*seg.slotCap)
  return
}

读写过程中先通过hashkey的最低16位定位到对应的slot,然后使用中间16位,找到具体的槽,在槽上进行写入、查找和删除标记操作。每次写入值的时候,先写key,然后写value,最后跳过每个slot的cap-len空白区域后,写入过期时间和引用计数。

代码语言:javascript复制
func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
        slotId := uint8(hashVal >> 8)
  hash16 := uint16(hashVal >> 16)
  slot := seg.getSlot(slotId)
  idx, match := seg.lookup(slot, hash16, key)
  seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
  seg.rb.Write(hdrBuf[:])
  seg.rb.Write(key)
  seg.rb.Write(value)
  seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
  atomic.AddInt64(&seg.totalTime, int64(now))
  atomic.AddInt64(&seg.totalCount, 1)
代码语言:javascript复制
 func (seg *segment) getSlot(slotId uint8) []entryPtr {
  slotOff := int32(slotId) * seg.slotCap
  return seg.slotsData[slotOff : slotOff seg.slotLens[slotId] : slotOff seg.slotCap]
}
代码语言:javascript复制
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
      hdr, ptr, err := seg.locate(key, hashVal, peek)
      seg.rb.ReadAt(value, ptr.offset ENTRY_HDR_SIZE int64(hdr.keyLen))
代码语言:javascript复制
func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
    slot := seg.getSlot(slotId)
    idx, match := seg.lookup(slot, hash16, key)
    seg.delEntryPtr(slotId, slot, idx)
代码语言:javascript复制
func (seg *segment) delEntryPtr(slotId uint8, slot []entryPtr, idx int) {
      entryHdr.deleted = true

ringbuf.go

代码语言:javascript复制
type RingBuf struct {
  begin int64 // beginning offset of the data stream.
  end   int64 // ending offset of the data stream.
  data  []byte
  index int //range from '0' to 'len(rb.data)-1'
}

0 人点赞