golang源码分析:etcd(12)

2023-09-20 08:28:20 浏览数 (3)

etcd后端存储用的是bolt,在分析完server如何初始化raftNode流程后,我们看下后端存储bolt-db的初始化流程。

在server初始化函数里,有个bootstrap函数server/etcdserver/server.go

代码语言:javascript复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      b, err := bootstrap(cfg)
      v2store:               b.storage.st,

向下跟下它的调用路径server/etcdserver/bootstrap.go

代码语言:javascript复制
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
      backend, err := bootstrapBackend(cfg, haveWAL, st, ss)

调用了backend包的OpenBackend函数:

代码语言:javascript复制
func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) {
      be := serverstorage.OpenBackend(cfg, beHooks)
        return &bootstrappedBackend{
    beHooks:  beHooks,
    be:       be,
    ci:       ci,
    beExist:  beExist,
    snapshot: snapshot,
  }, nil

server/storage/backend.go

代码语言:javascript复制
func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
        go func() {
    beOpened <- newBackend(cfg, hooks)
  }()
        select {
  case be := <-beOpened:
    return be
代码语言:javascript复制
    func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
      return backend.New(bcfg)
    

该函数同时也被OpenSnapshotBackend调用了

代码语言:javascript复制
  func OpenSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks *BackendHooks) (backend.Backend, error) {
      return OpenBackend(cfg, hooks), nil

再往下跟一层,终于发现了bolt的初始化函数server/storage/backend/backend.go

代码语言:javascript复制
type backend struct {
        bopts *bolt.Options
  db    *bolt.DB
代码语言:javascript复制
func newBackend(bcfg BackendConfig) *backend {
      bopts := &bolt.Options{}
      db, err := bolt.Open(bcfg.Path, 0600, bopts)
      go b.run()
代码语言:javascript复制
func (b *backend) defrag() error {
      b.db, err = bolt.Open(dbp, 0600, b.bopts)

其核心函数是run函数:

代码语言:javascript复制
func (b *backend) run() {
        for {
    select {
    case <-t.C:
    case <-b.stopc:
      b.batchTx.CommitAndStop()
      return
    }
    if b.batchTx.safePending() != 0 {
      b.batchTx.Commit()
    }
    t.Reset(b.batchInterval)
  }
代码语言:javascript复制
func (b *backend) begin(write bool) *bolt.Tx {
      tx := b.unsafeBegin(write)
      db := tx.DB()
代码语言:javascript复制
func New(bcfg BackendConfig) Backend {
  return newBackend(bcfg)
}

除了核心流程,其它的工具流程也使用了bolt-db比如tools/etcd-dump-db/main.go

代码语言:javascript复制
listBucketCommand = &cobra.Command{
    Use:   "list-bucket [data dir or db file path]",
    Short: "bucket lists all buckets.",
    Run:   listBucketCommandFunc,
  }
代码语言:javascript复制
func listBucketCommandFunc(cmd *cobra.Command, args []string) {
      bts, err := getBuckets(dp)

tools/etcd-dump-db/backend.go

代码语言:javascript复制
func getBuckets(dbPath string) (buckets []string, err error) {
      db, derr := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout})
      err = db.View(func(tx *bolt.Tx) error {
    return tx.ForEach(func(b []byte, _ *bolt.Bucket) error {
      buckets = append(buckets, string(b))
      return nil
    })
  })

etcdutl/snapshot/v3_snapshot.go

代码语言:javascript复制
func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
      db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})

tools/benchmark/cmd/mvcc.go

代码语言:javascript复制
func initMVCC() {
      be := backend.New(bcfg)

bolt作为etcd后端的kv存储,可以持久化,支持事务mvcc,从而保证了etcd的存储的可靠性。获取到bolt的实例后,会在etcdserver的初始化函数里,将它传递给mvcc

代码语言:javascript复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

调用的是server/storage/mvcc/watchable_store.go的New方法,并将bolt对象传入

代码语言:javascript复制
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
  return newWatchableStore(lg, b, le, cfg)
}
代码语言:javascript复制
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
        s := &watchableStore{
    store:    NewStore(lg, b, le, cfg),
        s.wg.Add(2)
  go s.syncWatchersLoop()
  go s.syncVictimsLoop()

server/storage/mvcc/kvstore.go

代码语言:javascript复制
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
      s := &store{
    cfg:     cfg,
    b:       b,
    kvindex: newTreeIndex(lg),


    le: le,


    currentRev:     1,
    compactMainRev: -1,


    fifoSched: schedule.NewFIFOScheduler(lg),


    stopc: make(chan struct{}),


    lg: lg,
  }
  

至此完成了mvcc的初始化

1 人点赞