我们继续在文件 server/etcdserver/server.go 中分析EtcdServer的初始化流程,它会先调用bootstrap函数初始化后端存储bolt-db然后初始化raftNode,最后初始化transport,调用start开始raft协议的网络传输。具体实现如下
代码语言:javascript复制func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.storage.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
memberId: b.cluster.nodeID,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.cluster.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.storage.backend.ci,
firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(),
}
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
tr := &rafthttp.Transport{
Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID,
URLs: cfg.PeerURLs,
ClusterID: b.cluster.cl.ID(),
Raft: srv,
Snapshotter: b.ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
其中EtcdServer的定义如下:
代码语言:javascript复制type EtcdServer struct {
// inflightSnapshots holds count the number of snapshots currently inflight.
inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
term uint64 // must use atomic operations to access; keep 64-bit aligned.
lead uint64 // must use atomic operations to access; keep 64-bit aligned.
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
readych chan struct{}
Cfg config.ServerConfig
lgMu *sync.RWMutex
lg *zap.Logger
w wait.Wait
readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// readNotifier is used to notify the read routine that it can process the request
// when there is no error
readNotifier *notifier
// stop signals the run goroutine should shutdown.
stop chan struct{}
// stopping is closed by run goroutine on shutdown.
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
leaderChanged *notify.Notifier
errorc chan error
memberId types.ID
attributes membership.Attributes
cluster *membership.RaftCluster
v2store v2store.Store
snapshotter *snap.Snapshotter
applyV2 ApplierV2
uberApply apply.UberApplier
applyWait wait.WaitTime
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.RWMutex
be backend.Backend
beHooks *serverstorage.BackendHooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore
stats *stats.ServerStats
lstats *stats.LeaderStats
SyncTicker *time.Ticker
// compactor is used to auto-compact the KV.
compactor v3compactor.Compactor
// peerRt used to send requests (version, lease) to peers.
peerRt http.RoundTripper
reqIDGen *idutil.Generator
// wgMu blocks concurrent waitgroup mutation while server stopping
wgMu sync.RWMutex
// wg is used to wait for the goroutines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
// ctx is used for etcd-initiated requests that may need to be canceled
// on etcd server shutdown.
ctx context.Context
cancel context.CancelFunc
leadTimeMu sync.RWMutex
leadElectedTime time.Time
firstCommitInTerm *notify.Notifier
clusterVersionChanged *notify.Notifier
*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
corruptionChecker CorruptionChecker
}
这里我们重点关注下它的属性r,它代表了一个raftNode节点,它调用了raft包的初始化函数来进行初始化
代码语言:javascript复制r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
server的Start方法定义如下:
代码语言:javascript复制func (s *EtcdServer) Start() {
s.start()
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorClusterVersions)
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorCompactHash)
s.GoAttach(s.monitorDowngrade)
代码语言:javascript复制func (s *EtcdServer) start() {
go s.run()
里面定义了一部分raft处理流程
代码语言:javascript复制func (s *EtcdServer) run() {
sn, err := s.r.raftStorage.Snapshot()
sched := schedule.NewFIFOScheduler(lg)
rh := &raftReadyHandler{
getLead: func() (lead uint64) { return s.getLead() },
updateLead: func(lead uint64) { s.setLead(lead) },
updateLeadership: func(newLeader bool) {
s.r.start(rh)
s.r.stop()
for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)
raft的Step方法被包裹在函数
代码语言:javascript复制func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.r.Step(ctx, m)
代码语言:javascript复制func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
s.r.ReportSnapshot(id, status)
}
代码语言:javascript复制func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc
s.triggerSnapshot(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
在applySnapshot函数里会调用transport的AddPeer方法来增加节点:
代码语言:javascript复制func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if raft.IsEmptySnap(toApply.snapshot) {
return
}
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.cluster.Members() {
if m.ID == s.MemberId() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
代码语言:javascript复制func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
resp, err := s.promoteMember(ctx, id)
for _, url := range leader.PeerURLs {
resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
代码语言:javascript复制func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: b,
}
return s.configure(ctx, cc)
代码语言:javascript复制func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
cc.NodeID = raft.None
代码语言:javascript复制func (s *EtcdServer) raftStatus() raft.Status {
return s.r.Node.Status()
}
pkg/schedule/schedule.go包里定义了一个先进先出的队列,如果能处理完直接处理,处理不完,起goroutine来进行处理
代码语言:javascript复制func NewFIFOScheduler(lg *zap.Logger) Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
lg: lg,
}
go f.run()
代码语言:javascript复制func (f *fifo) Schedule(j Job) {
select {
case f.resume <- struct{}{}:
代码语言:javascript复制func (f *fifo) run() {
for _, todo := range pendings {
f.executeJob(todo, true)
}
真正执行函数的方法是
代码语言:javascript复制func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
todo.Do(f.ctx)
server/etcdserver/bootstrap.go方法里面定义了raftNode的初始化方法,最终调用了raft算法的StartNode方法:
代码语言:javascript复制func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
if len(b.peers) == 0 {
n = raft.RestartNode(b.config)
} else {
n = raft.StartNode(b.config, b.peers)
}
transport的Start方法位于server/etcdserver/api/rafthttp/transport.go
代码语言:javascript复制func (t *Transport) Start() error {
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
raftNode的start方法定义于server/etcdserver/raft.go,相关有两个核心的结构体
代码语言:javascript复制 type raftNode struct {
和
代码语言:javascript复制type raftNodeConfig struct {
raft.Node
raftStorage *raft.MemoryStorage
代码语言:javascript复制func (r *raftNode) start(rh *raftReadyHandler) {
go func() {
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():