golang源码分析:etcd(11)

2023-09-09 08:27:50 浏览数 (3)

我们继续在文件 server/etcdserver/server.go 中分析EtcdServer的初始化流程,它会先调用bootstrap函数初始化后端存储bolt-db然后初始化raftNode,最后初始化transport,调用start开始raft协议的网络传输。具体实现如下

代码语言:javascript复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      b, err := bootstrap(cfg)
      srv = &EtcdServer{
    readych:               make(chan struct{}),
    Cfg:                   cfg,
    lgMu:                  new(sync.RWMutex),
    lg:                    cfg.Logger,
    errorc:                make(chan error, 1),
    v2store:               b.storage.st,
    snapshotter:           b.ss,
    r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
    memberId:              b.cluster.nodeID,
    attributes:            membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
    cluster:               b.cluster.cl,
    stats:                 sstats,
    lstats:                lstats,
    SyncTicker:            time.NewTicker(500 * time.Millisecond),
    peerRt:                b.prt,
    reqIDGen:              idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
    AccessController:      &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
    consistIndex:          b.storage.backend.ci,
    firstCommitInTerm:     notify.NewNotifier(),
    clusterVersionChanged: notify.NewNotifier(),
  }
      srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
      tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
      srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
      tr := &rafthttp.Transport{
    Logger:      cfg.Logger,
    TLSInfo:     cfg.PeerTLSInfo,
    DialTimeout: cfg.PeerDialTimeout(),
    ID:          b.cluster.nodeID,
    URLs:        cfg.PeerURLs,
    ClusterID:   b.cluster.cl.ID(),
    Raft:        srv,
    Snapshotter: b.ss,
    ServerStats: sstats,
    LeaderStats: lstats,
    ErrorC:      srv.errorc,
  }
      if err = tr.Start(); err != nil {

其中EtcdServer的定义如下:

代码语言:javascript复制
type EtcdServer struct {
  // inflightSnapshots holds count the number of snapshots currently inflight.
  inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
  appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
  committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
  term              uint64 // must use atomic operations to access; keep 64-bit aligned.
  lead              uint64 // must use atomic operations to access; keep 64-bit aligned.


  consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
  r            raftNode                 // uses 64-bit atomics; keep 64-bit aligned.


  readych chan struct{}
  Cfg     config.ServerConfig


  lgMu *sync.RWMutex
  lg   *zap.Logger


  w wait.Wait


  readMu sync.RWMutex
  // read routine notifies etcd server that it waits for reading by sending an empty struct to
  // readwaitC
  readwaitc chan struct{}
  // readNotifier is used to notify the read routine that it can process the request
  // when there is no error
  readNotifier *notifier


  // stop signals the run goroutine should shutdown.
  stop chan struct{}
  // stopping is closed by run goroutine on shutdown.
  stopping chan struct{}
  // done is closed when all goroutines from start() complete.
  done chan struct{}
  // leaderChanged is used to notify the linearizable read loop to drop the old read requests.
  leaderChanged *notify.Notifier


  errorc     chan error
  memberId   types.ID
  attributes membership.Attributes


  cluster *membership.RaftCluster


  v2store     v2store.Store
  snapshotter *snap.Snapshotter


  applyV2 ApplierV2


  uberApply apply.UberApplier


  applyWait wait.WaitTime


  kv         mvcc.WatchableKV
  lessor     lease.Lessor
  bemu       sync.RWMutex
  be         backend.Backend
  beHooks    *serverstorage.BackendHooks
  authStore  auth.AuthStore
  alarmStore *v3alarm.AlarmStore


  stats  *stats.ServerStats
  lstats *stats.LeaderStats


  SyncTicker *time.Ticker
  // compactor is used to auto-compact the KV.
  compactor v3compactor.Compactor


  // peerRt used to send requests (version, lease) to peers.
  peerRt   http.RoundTripper
  reqIDGen *idutil.Generator


  // wgMu blocks concurrent waitgroup mutation while server stopping
  wgMu sync.RWMutex
  // wg is used to wait for the goroutines that depends on the server state
  // to exit when stopping the server.
  wg sync.WaitGroup


  // ctx is used for etcd-initiated requests that may need to be canceled
  // on etcd server shutdown.
  ctx    context.Context
  cancel context.CancelFunc


  leadTimeMu      sync.RWMutex
  leadElectedTime time.Time


  firstCommitInTerm     *notify.Notifier
  clusterVersionChanged *notify.Notifier


  *AccessController
  // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
  // Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
  forceSnapshot     bool
  corruptionChecker CorruptionChecker
}

这里我们重点关注下它的属性r,它代表了一个raftNode节点,它调用了raft包的初始化函数来进行初始化

代码语言:javascript复制
r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),

server的Start方法定义如下:

代码语言:javascript复制
func (s *EtcdServer) Start() {
        s.start()
  s.GoAttach(func() { s.adjustTicks() })
  s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
  s.GoAttach(s.purgeFile)
  s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
  s.GoAttach(s.monitorClusterVersions)
  s.GoAttach(s.monitorStorageVersion)
  s.GoAttach(s.linearizableReadLoop)
  s.GoAttach(s.monitorKVHash)
  s.GoAttach(s.monitorCompactHash)
  s.GoAttach(s.monitorDowngrade)
代码语言:javascript复制
func (s *EtcdServer) start() {
      go s.run()

里面定义了一部分raft处理流程

代码语言:javascript复制
func (s *EtcdServer) run() {
      sn, err := s.r.raftStorage.Snapshot()
        sched := schedule.NewFIFOScheduler(lg)
      rh := &raftReadyHandler{
    getLead:    func() (lead uint64) { return s.getLead() },
    updateLead: func(lead uint64) { s.setLead(lead) },
    updateLeadership: func(newLeader bool) {
      s.r.start(rh)
      s.r.stop()
        for {
    select {
    case ap := <-s.r.apply():
      f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
      sched.Schedule(f)
    case leases := <-expiredLeaseC:
      s.revokeExpiredLeases(leases)

raft的Step方法被包裹在函数

代码语言:javascript复制
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
      return s.r.Step(ctx, m)
代码语言:javascript复制
func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
  s.r.ReportSnapshot(id, status)
}
代码语言:javascript复制
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
        <-apply.notifyc


  s.triggerSnapshot(ep)
  select {
  // snapshot requested via send()
  case m := <-s.r.msgSnapC:
    merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
    s.sendMergedSnap(merged)

在applySnapshot函数里会调用transport的AddPeer方法来增加节点:

代码语言:javascript复制
func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
        if raft.IsEmptySnap(toApply.snapshot) {
    return
  }
        // recover raft transport
  s.r.transport.RemoveAllPeers()
        for _, m := range s.cluster.Members() {
    if m.ID == s.MemberId() {
      continue
    }
    s.r.transport.AddPeer(m.ID, m.PeerURLs)
  }
代码语言:javascript复制
func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
      resp, err := s.promoteMember(ctx, id)
      for _, url := range leader.PeerURLs {
      resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
代码语言:javascript复制
func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
        cc := raftpb.ConfChange{
    Type:    raftpb.ConfChangeAddNode,
    NodeID:  id,
    Context: b,
  }


  return s.configure(ctx, cc)
代码语言:javascript复制
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
      cc.NodeID = raft.None
代码语言:javascript复制
func (s *EtcdServer) raftStatus() raft.Status {
  return s.r.Node.Status()
}

pkg/schedule/schedule.go包里定义了一个先进先出的队列,如果能处理完直接处理,处理不完,起goroutine来进行处理

代码语言:javascript复制
func NewFIFOScheduler(lg *zap.Logger) Scheduler {
          f := &fifo{
    resume: make(chan struct{}, 1),
    donec:  make(chan struct{}, 1),
    lg:     lg,
  }
        go f.run()
代码语言:javascript复制
func (f *fifo) Schedule(j Job) {
        select {
    case f.resume <- struct{}{}:
代码语言:javascript复制
func (f *fifo) run() {
                for _, todo := range pendings {
          f.executeJob(todo, true)
        }

真正执行函数的方法是

代码语言:javascript复制
func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
        todo.Do(f.ctx)

server/etcdserver/bootstrap.go方法里面定义了raftNode的初始化方法,最终调用了raft算法的StartNode方法:

代码语言:javascript复制
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
        if len(b.peers) == 0 {
    n = raft.RestartNode(b.config)
  } else {
    n = raft.StartNode(b.config, b.peers)
  }

transport的Start方法位于server/etcdserver/api/rafthttp/transport.go

代码语言:javascript复制
func (t *Transport) Start() error {
        t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
  if err != nil {
    return err
  }
  t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)

raftNode的start方法定义于server/etcdserver/raft.go,相关有两个核心的结构体

代码语言:javascript复制
  type raftNode struct {

代码语言:javascript复制
type raftNodeConfig struct {
      raft.Node
  raftStorage *raft.MemoryStorage
代码语言:javascript复制
func (r *raftNode) start(rh *raftReadyHandler) {
    go func() {
      for {
      select {
      case <-r.ticker.C:
        r.tick()
      case rd := <-r.Ready():

1 人点赞