golang源码分析:etcd(18)

2023-09-20 08:30:30 浏览数 (3)

介绍完一个个函数实现后,我们分析下完整的etcd的读写流程。有没有觉得很奇怪既然bolt是采用b 树存储的持久化存储来存储kv,为什还需要一个Btree结构来存储key的信息?

其实在etcd内部,从一个可以找到一个value分为两个步骤:1,通过key找到所有的版本号,从版本号里筛选需要查找的版本。2,根据版本号到bolt里面查找对应的k/v对,从而获得value值。Revision 中定义了一个全局递增的主版本号main,发生 put、txn、del 操作会递增,一个事务内的 main 版本号是唯一的;事务内的子版本号定义为sub,事务发生 put 和 del 操作时,从 0 开始递增。

这样存储的好处是在btree里面key是唯一的,通过key可以找到所有版本号;在bolt里面版本号是唯一的,查找过程和key完全解耦了,再加上写过程中版本号的递增特性,可以实现近乎顺序写,整个写的过程非常迅速。

我们先看下读的过程,在server/storage/mvcc/kvstore_txn.go中有个Read函数

代码语言:javascript复制
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
  var tx backend.ReadTx
  if mode == ConcurrentReadTxMode {
  tx = s.b.ConcurrentReadTx()
  } else {
  tx = s.b.ReadTx()
  }
  return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})

返沪TxRead对象,对象有一个Range方法

代码语言:javascript复制
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}

其核心代码如下:

代码语言:javascript复制
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
      if ro.Count {
    total := tr.s.kvindex.CountRevisions(key, end, rev)
      revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
      for i, revpair := range revpairs[:len(kvs)] {
          revToBytes(revpair, revBytes)
    _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)

先通过kvindex.Revisions获取所有的版本号,然后再版本里筛选出需要的版本号到bolt里面查询,主要依赖readTx属性

代码语言:javascript复制
type storeTxnRead struct {
  s  *store
  tx backend.ReadTx

首先看第一步,通过key找到revision,调用的Revisions方法位于server/storage/mvcc/index.go,它通过unsafeVisit获取所有版本号,采用访问者模式,传入的函数用于筛选需要的版本,其中使用了get函数来从索引btree里面获取最终需要的版本列表 rev, _, _, err := ki.get(ti.lg, atRev),核心逻辑如下:

代码语言:javascript复制
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
      if end == nil {
    rev, _, _, err := ti.unsafeGet(key, atRev)
        ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
    if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
      if limit <= 0 || len(revs) < limit {
        revs = append(revs, rev)

btree的初始化方法如下

代码语言:javascript复制
func newTreeIndex(lg *zap.Logger) index {
        return &treeIndex{
    tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
      return aki.Less(bki)
    }),

查找用到的unsafeVisit方法如下,最终调用了BTree的AscendGreaterOrEqual方法

代码语言:javascript复制
func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
        ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
    if len(endi.key) > 0 && !item.Less(endi) {
      return false
    }
    if !f(item) {
      return false
    }
    return true
  })

server/storage/mvcc/key_index.go里定义了keyIndex,它实现了Less接口因此可以用BTree。

代码语言:javascript复制
func (ki *keyIndex) Less(bki *keyIndex) bool {
  return bytes.Compare(ki.key, bki.key) == -1
}

它的核心字段有三个,key,最近修改的版本号,历史上所有的版本号。btree里存储的元素,比较的时候比较key,一个key里存了多个reversion。

代码语言:javascript复制
type keyIndex struct {
  key         []byte
  modified    revision // the main rev of the last modification
  generations []generation
}

完成在b树中找出keyIndex信息,然后就在generations里面找出对应版本的信息调用了get方法

代码语言:javascript复制
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
      g := ki.findGeneration(atRev)
      n := g.walk(func(rev revision) bool { return rev.main > atRev })
代码语言:javascript复制
func (ki *keyIndex) findGeneration(rev int64) *generation {
      for cg >= 0 {
      g := ki.generations[cg]

以上就是完整的索引查找对应版本号的流程,索引的初始化代码位于server/storage/mvcc/kvstore.go

代码语言:javascript复制
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
        s := &store{
    cfg:     cfg,
    b:       b,
    kvindex: newTreeIndex(lg),

得到版本号后,可以根据版本号到bolt对应的bucket里查找我们的value值,用到了baseReadTx,其定义位于server/storage/backend/read_tx.go

代码语言:javascript复制
type baseReadTx struct {

通过rversion查找kv的过程是先缓存里找不到再到bolt里面找,最后缓存到buf

代码语言:javascript复制
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
      keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
                  bucket = baseReadTx.tx.Bucket(bucketType.Name())
    baseReadTx.buckets[bn] = bucket
        c := bucket.Cursor()
      k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))

server/storage/backend/batch_tx.go里会调用bolt对应的Seek方法

代码语言:javascript复制
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
        for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {

分析完查的流程后,我们分析下写的流程,可以从Put这个函数作为入开来进行追踪

代码语言:javascript复制
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  tw.put(key, value, lease)
return tw.beginRev   1
}

详细的put方法如下,先通过索引找到key最近修改的版本,然后创建用于存在在btree里面的key和用于存储在bolt里面的kv,然后使用UnsafeSeqPut存入bolt,使用kvindex.Put存入kv

代码语言:javascript复制
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
      _, created, ver, err := tw.s.kvindex.Get(key, rev)
      idxRev := revision{main: rev, sub: int64(len(tw.changes))}
        kv := mvccpb.KeyValue{
    Key:            key,
    Value:          value,
    CreateRevision: c,
    ModRevision:    rev,
    Version:        ver,
    Lease:          int64(leaseID),
  }
      d, err := kv.Marshal()
      tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
      tw.s.kvindex.Put(key, idxRev)

将数据存入bolt的函数定义如下

代码语言:javascript复制
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
  t.unsafePut(bucket, key, value, true)
}

最终也是调用bolt的Put方法来存储的:

代码语言:javascript复制
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {

      bucket := t.tx.Bucket(bucketType.Name())
      if err := bucket.Put(key, value); err != nil {

readTx的初始化位于server/storage/backend/backend.go

代码语言:javascript复制
func (b *backend) ReadTx() ReadTx { return b.readTx }
代码语言:javascript复制
func newBackend(bcfg BackendConfig) *backend {
      b := &backend{
    bopts: bopts,
    db:    db,
          readTx: &readTx{
      baseReadTx: baseReadTx{
        buf: txReadBuffer{
          txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
          bufVersion: 0,
        },
        buckets: make(map[BucketID]*bolt.Bucket),
        txWg:    new(sync.WaitGroup),
        txMu:    new(sync.RWMutex),
      },
    },

它的详细初始化链路追溯如下:server/storage/mvcc/watchable_store.go

代码语言:javascript复制
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
      s := &watchableStore{
    store:    NewStore(lg, b, le, cfg),
代码语言:javascript复制
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
  return newWatchableStore(lg, b, le, cfg)
}

server/etcdserver/server.go

代码语言:javascript复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      srv.be = b.storage.backend.be
      srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

最后看下etcd在bolt里面是如何分桶的,桶的bucket定义如下server/storage/schema/bucket.go

代码语言:javascript复制
  var (
  Key     = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
  Meta    = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
  Lease   = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
  Alarm   = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
  Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
代码语言:javascript复制
Members        = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
  MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})  
代码语言:javascript复制
Auth      = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
  AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
  AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})


  Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
)

etcd里面每种类型一个有一个单独的bucket,但是所有的key都使用的是同一个bucket

1 人点赞