介绍完一个个函数实现后,我们分析下完整的etcd的读写流程。有没有觉得很奇怪既然bolt是采用b 树存储的持久化存储来存储kv,为什还需要一个Btree结构来存储key的信息?
其实在etcd内部,从一个可以找到一个value分为两个步骤:1,通过key找到所有的版本号,从版本号里筛选需要查找的版本。2,根据版本号到bolt里面查找对应的k/v对,从而获得value值。Revision 中定义了一个全局递增的主版本号main,发生 put、txn、del 操作会递增,一个事务内的 main 版本号是唯一的;事务内的子版本号定义为sub,事务发生 put 和 del 操作时,从 0 开始递增。
这样存储的好处是在btree里面key是唯一的,通过key可以找到所有版本号;在bolt里面版本号是唯一的,查找过程和key完全解耦了,再加上写过程中版本号的递增特性,可以实现近乎顺序写,整个写的过程非常迅速。
我们先看下读的过程,在server/storage/mvcc/kvstore_txn.go中有个Read函数
代码语言:javascript复制func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
var tx backend.ReadTx
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
返沪TxRead对象,对象有一个Range方法
代码语言:javascript复制func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}
其核心代码如下:
代码语言:javascript复制func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev)
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
先通过kvindex.Revisions获取所有的版本号,然后再版本里筛选出需要的版本号到bolt里面查询,主要依赖readTx属性
代码语言:javascript复制type storeTxnRead struct {
s *store
tx backend.ReadTx
首先看第一步,通过key找到revision,调用的Revisions方法位于server/storage/mvcc/index.go,它通过unsafeVisit获取所有版本号,采用访问者模式,传入的函数用于筛选需要的版本,其中使用了get函数来从索引btree里面获取最终需要的版本列表 rev, _, _, err := ki.get(ti.lg, atRev),核心逻辑如下:
代码语言:javascript复制func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
if end == nil {
rev, _, _, err := ti.unsafeGet(key, atRev)
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
if limit <= 0 || len(revs) < limit {
revs = append(revs, rev)
btree的初始化方法如下
代码语言:javascript复制func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
查找用到的unsafeVisit方法如下,最终调用了BTree的AscendGreaterOrEqual方法
代码语言:javascript复制func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
if !f(item) {
return false
}
return true
})
server/storage/mvcc/key_index.go里定义了keyIndex,它实现了Less接口因此可以用BTree。
代码语言:javascript复制func (ki *keyIndex) Less(bki *keyIndex) bool {
return bytes.Compare(ki.key, bki.key) == -1
}
它的核心字段有三个,key,最近修改的版本号,历史上所有的版本号。btree里存储的元素,比较的时候比较key,一个key里存了多个reversion。
代码语言:javascript复制type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
完成在b树中找出keyIndex信息,然后就在generations里面找出对应版本的信息调用了get方法
代码语言:javascript复制func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
g := ki.findGeneration(atRev)
n := g.walk(func(rev revision) bool { return rev.main > atRev })
代码语言:javascript复制func (ki *keyIndex) findGeneration(rev int64) *generation {
for cg >= 0 {
g := ki.generations[cg]
以上就是完整的索引查找对应版本号的流程,索引的初始化代码位于server/storage/mvcc/kvstore.go
代码语言:javascript复制func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
s := &store{
cfg: cfg,
b: b,
kvindex: newTreeIndex(lg),
得到版本号后,可以根据版本号到bolt对应的bucket里查找我们的value值,用到了baseReadTx,其定义位于server/storage/backend/read_tx.go
代码语言:javascript复制type baseReadTx struct {
通过rversion查找kv的过程是先缓存里找不到再到bolt里面找,最后缓存到buf
代码语言:javascript复制func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
c := bucket.Cursor()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
server/storage/backend/batch_tx.go里会调用bolt对应的Seek方法
代码语言:javascript复制func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
分析完查的流程后,我们分析下写的流程,可以从Put这个函数作为入开来进行追踪
代码语言:javascript复制func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return tw.beginRev 1
}
详细的put方法如下,先通过索引找到key最近修改的版本,然后创建用于存在在btree里面的key和用于存储在bolt里面的kv,然后使用UnsafeSeqPut存入bolt,使用kvindex.Put存入kv
代码语言:javascript复制func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
_, created, ver, err := tw.s.kvindex.Get(key, rev)
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
将数据存入bolt的函数定义如下
代码语言:javascript复制func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, true)
}
最终也是调用bolt的Put方法来存储的:
代码语言:javascript复制func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if err := bucket.Put(key, value); err != nil {
readTx的初始化位于server/storage/backend/backend.go
代码语言:javascript复制func (b *backend) ReadTx() ReadTx { return b.readTx }
代码语言:javascript复制func newBackend(bcfg BackendConfig) *backend {
b := &backend{
bopts: bopts,
db: db,
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
它的详细初始化链路追溯如下:server/storage/mvcc/watchable_store.go
代码语言:javascript复制func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
代码语言:javascript复制func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, cfg)
}
server/etcdserver/server.go
代码语言:javascript复制func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.be = b.storage.backend.be
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
最后看下etcd在bolt里面是如何分桶的,桶的bucket定义如下server/storage/schema/bucket.go
代码语言:javascript复制 var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
代码语言:javascript复制Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
代码语言:javascript复制Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
)
etcd里面每种类型一个有一个单独的bucket,但是所有的key都使用的是同一个bucket