golang源码分析:etcd(16)

2023-09-20 08:29:55 浏览数 (3)

server/storage/backend/backend.go定义了后端存储的核心接口和具体实现,本质上是对boltdb的相关接口的一个封装

代码语言:javascript复制
type Backend interface {
  // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
  ReadTx() ReadTx
  BatchTx() BatchTx
  // ConcurrentReadTx returns a non-blocking read transaction.
  ConcurrentReadTx() ReadTx


  Snapshot() Snapshot
  Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
  // Size returns the current size of the backend physically allocated.
  // The backend can hold DB space that is not utilized at the moment,
  // since it can conduct pre-allocation or spare unused space for recycling.
  // Use SizeInUse() instead for the actual DB size.
  Size() int64
  // SizeInUse returns the current size of the backend logically in use.
  // Since the backend can manage free space in a non-byte unit such as
  // number of pages, the returned value can be not exactly accurate in bytes.
  SizeInUse() int64
  // OpenReadTxN returns the number of currently open read transactions in the backend.
  OpenReadTxN() int64
  Defrag() error
  ForceCommit()
  Close() error


  // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
  SetTxPostLockInsideApplyHook(func())
}

snapshot接口核心方法只有计算大小、写、关闭三个:

代码语言:javascript复制
type Snapshot interface {
  // Size gets the size of the snapshot.
  Size() int64
  // WriteTo writes the snapshot into the given writer.
  WriteTo(w io.Writer) (n int64, err error)
  // Close closes the snapshot.
  Close() error
}

读事物的buffer是需要加锁的

代码语言:javascript复制
type txReadBufferCache struct {
  mu         sync.Mutex
  buf        *txReadBuffer
  bufVersion uint64
}

backend是Backend接口的具体实现:

代码语言:javascript复制
type backend struct {
  // size and commits are used with atomic operations so they must be
  // 64-bit aligned, otherwise 32-bit tests will crash


  // size is the number of bytes allocated in the backend
  size int64
  // sizeInUse is the number of bytes actually used in the backend
  sizeInUse int64
  // commits counts number of commits since start
  commits int64
  // openReadTxN is the number of currently open read transactions in the backend
  openReadTxN int64
  // mlock prevents backend database file to be swapped
  mlock bool


  mu    sync.RWMutex
  bopts *bolt.Options
  db    *bolt.DB


  batchInterval time.Duration
  batchLimit    int
  batchTx       *batchTxBuffered


  readTx *readTx
  // txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
  // When creating "concurrentReadTx":
  // - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
  // - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
  txReadBufferCache txReadBufferCache


  stopc chan struct{}
  donec chan struct{}


  hooks Hooks


  // txPostLockInsideApplyHook is called each time right after locking the tx.
  txPostLockInsideApplyHook func()


  lg *zap.Logger
}

响应的config定义了初始化backend对象需要的属性

代码语言:javascript复制
type BackendConfig struct {
  // Path is the file path to the backend file.
  Path string
  // BatchInterval is the maximum time before flushing the BatchTx.
  BatchInterval time.Duration
  // BatchLimit is the maximum puts before flushing the BatchTx.
  BatchLimit int
  // BackendFreelistType is the backend boltdb's freelist type.
  BackendFreelistType bolt.FreelistType
  // MmapSize is the number of bytes to mmap for the backend.
  MmapSize uint64
  // Logger logs backend-side operations.
  Logger *zap.Logger
  // UnsafeNoFsync disables all uses of fsync.
  UnsafeNoFsync bool `json:"unsafe-no-fsync"`
  // Mlock prevents backend database file to be swapped
  Mlock bool


  // Hooks are getting executed during lifecycle of Backend's transactions.
  Hooks Hooks
}

newBackend就是初始化一个bolt对象,设置读事务和批量事物的buffer,最后启动一个协程提供存储服务:

代码语言:javascript复制
func newBackend(bcfg BackendConfig) *backend {
      db, err := bolt.Open(bcfg.Path, 0600, bopts)
      b := &backend{
    bopts: bopts,
    db:    db,
readTx: &readTx{
      baseReadTx:baseReadTx{
        buf: txReadBuffer{
      b.batchTx = newBatchTxBuffered(b)
      go b.run()		
代码语言:javascript复制
func (b *backend) BatchTx() BatchTx {
  return b.batchTx
}
代码语言:javascript复制
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {

Snapshot返回一个Snapshot对象,里面包含一个boltdb的只读事物

代码语言:javascript复制
func (b *backend) Snapshot() Snapshot {
      b.batchTx.Commit()
      tx, err := b.db.Begin(false)
      go func() {
              case <-stopc:
        snapshotTransferSec.Observe(time.Since(start).Seconds())
      return &snapshot{tx, stopc, donec}

Hash方法会计算对应的crc,遍历所有bucket里面的k,v然后计算所有内容的crc

代码语言:javascript复制
func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {


      h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
      err := b.db.View(func(tx *bolt.Tx) error {
          c := tx.Cursor()
    for next, _ := c.First(); next != nil; next, _ = c.Next() {
      b := tx.Bucket(next)
        h.Write(next)
      b.ForEach(func(k, v []byte) error {
        if ignores != nil && !ignores(next, k) {
          h.Write(k)
          h.Write(v)
      return h.Sum32(), nil
}

run方法会做定时的事物提交和系统关闭时候的提交

代码语言:javascript复制
func (b *backend) run() {
      for {
    select {
    case <-t.C:
    case <-b.stopc:
      b.batchTx.CommitAndStop()
        if b.batchTx.safePending() != 0 {
      b.batchTx.Commit()
    }
    t.Reset(b.batchInterval)

defrag会进行碎片整理操作:

代码语言:javascript复制
  func (b *backend) defrag() error {
      tmpdb, err := bolt.Open(tdbp, 0600, &options)
      err = defragdb(b.db, tmpdb, defragLimit)

会整理bolt的所有k,v

代码语言:javascript复制
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
      tx, err := odb.Begin(false)
      for next, _ := c.First(); next != nil; next, _ = c.Next() {
        b := tx.Bucket(next)
        if err = b.ForEach(func(k, v []byte) error {
          tmptx, err = tmpdb.Begin(true)
          tmpb = tmptx.Bucket(next)
代码语言:javascript复制
func (b *backend) begin(write bool) *bolt.Tx {
        size := tx.Size()
  db := tx.DB()
  stats := db.Stats()
  atomic.StoreInt64(&b.size, size)

server/storage/backend/batch_tx.go 定义了Bucket和事务相关接口,这些都是和bolt对应的。

代码语言:javascript复制
type Bucket interface {
  // ID returns a unique identifier of a bucket.
  // The id must NOT be persisted and can be used as lightweight identificator
  // in the in-memory maps.
  ID() BucketID
  Name() []byte
  // String implements Stringer (human readable name).
  String() string


  // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  // is known to never overwrite any key so range is safe.
  IsSafeRangeBucket() bool
}
代码语言:javascript复制
type BatchTx interface {
  ReadTx
  UnsafeCreateBucket(bucket Bucket)
  UnsafeDeleteBucket(bucket Bucket)
  UnsafePut(bucket Bucket, key []byte, value []byte)
  UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
  UnsafeDelete(bucket Bucket, key []byte)
  // Commit commits a previous tx and begins a new writable one.
  Commit()
  // CommitAndStop commits the previous tx and does not create a new one.
  CommitAndStop()
  LockInsideApply()
  LockOutsideApply()
}

batchTx的具体实现是对bolt的Tx进行了嵌入

代码语言:javascript复制
type batchTx struct {
  sync.Mutex
  tx      *bolt.Tx
  backend *backend


  pending int
}
代码语言:javascript复制
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
      _, err := t.tx.CreateBucket(bucket.Name())  
代码语言:javascript复制
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {

Put就是bolt的k/v的Put操作

代码语言:javascript复制
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
      bucket := t.tx.Bucket(bucketType.Name())
      if err := bucket.Put(key, value); err != nil {
代码语言:javascript复制
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript复制
func (t *batchTx) commit(stop bool) {
      err := t.tx.Commit()

batchTxBuffered仅仅是加了个buffer而已

代码语言:javascript复制
type batchTxBuffered struct {
  batchTx
  buf txWriteBuffer
}

server/storage/backend/config_default.go里定义了配置

代码语言:javascript复制
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

server/storage/backend/config_linux.go每种平台的配置都有一定的差别

代码语言:javascript复制
var boltOpenOptions = &bolt.Options{
  MmapFlags:      syscall.MAP_POPULATE,
  NoFreelistSync: true,
}
代码语言:javascript复制
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

server/storage/backend/hooks.go里定义了Hooks的接口,方便我们使用Backend的时候设置前置Hook操作

代码语言:javascript复制
type HookFunc func(tx BatchTx)
代码语言:javascript复制
type Hooks interface {
  // OnPreCommitUnsafe is executed before Commit of transactions.
  // The given transaction is already locked.
  OnPreCommitUnsafe(tx BatchTx)
}

并提供了具体实现

代码语言:javascript复制
type hooks struct {
  onPreCommitUnsafe HookFunc
}
代码语言:javascript复制
func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
  h.onPreCommitUnsafe(tx)
}

server/storage/backend/metrics.go里定义对应的监控指标

代码语言:javascript复制
prometheus.MustRegister(commitSec)
prometheus.MustRegister(rebalanceSec)

server/storage/backend/read_tx.go定义了读事务接口和实现

代码语言:javascript复制
type ReadTx interface {
  Lock()
  Unlock()
  RLock()
  RUnlock()


  UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}

具体实现中也是对bolt的事物的一个封装

代码语言:javascript复制
type baseReadTx struct {
  // mu protects accesses to the txReadBuffer
  mu  sync.RWMutex
  buf txReadBuffer


  // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
  // txMu protects accesses to buckets and tx on Range requests.
  txMu    *sync.RWMutex
  tx      *bolt.Tx
  buckets map[BucketID]*bolt.Bucket
  // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
  txWg *sync.WaitGroup
}
代码语言:javascript复制
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
代码语言:javascript复制
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
      keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)

读事物分两种实现,一个是普通读,一个是并发读

代码语言:javascript复制
type readTx struct {
  baseReadTx
}
代码语言:javascript复制
type concurrentReadTx struct {
  baseReadTx
}

server/storage/backend/tx_buffer.go事务buffer是一个map,把每个bucket分开单独处理

代码语言:javascript复制
type txBuffer struct {
  buckets map[BucketID]*bucketBuffer
}

写buffer也是一样的

代码语言:javascript复制
type txWriteBuffer struct {
  txBuffer
  // Map from bucket ID into information whether this bucket is edited
  // sequentially (i.e. keys are growing monotonically).
  bucket2seq map[BucketID]bool
}
代码语言:javascript复制
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
代码语言:javascript复制
type txReadBuffer struct {
  txBuffer
  // bufVersion is used to check if the buffer is modified recently
  bufVersion uint64
}
代码语言:javascript复制
func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript复制
func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {

kv里的key和val都是[]byte这里和bolt是一致的减少了编解码的烦恼

代码语言:javascript复制
type kv struct {
  key []byte
  val []byte
}  
代码语言:javascript复制
type bucketBuffer struct {
  buf []kv
  // used tracks number of elements in use so buf can be reused without reallocation.
  used int
}
代码语言:javascript复制
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {

server/storage/backend/verify.go会对环境变量进行一系列验证

代码语言:javascript复制
func ValidateCalledInsideApply(lg *zap.Logger) {

1 人点赞