启动流程
Tendermint 的启动流程比较清析明了,各业务启动流程都在对应的实现代码,主启动流程加载所需配置,由各启动实现类启动自己对应业务,如节点启动相关在 nodeImpl,共识相关处理是 state 中进行处理。
流程大致:
- 加载配置
node.NewDefault
- 启动运行
Start
- 启动相关实现的
OnStart
先看启动流程
启动入口代码,这里使用到了一个命令行工具:cobra
。
代码位置:cmd/tendermint/main.go
func main() {
...省略部份代码
// NOTE:
// Users wishing to:
// * Use an external signer for their validators
// * Supply an in-proc abci app
// * Supply a genesis doc file from another source
// * Provide their own DB implementation
// can copy this file and use something other than the
// node.NewDefault function
// 创建节点为默认动行节点,这里是函数引用,并未执行
// 在 cmd.NewRunNodeCmd 调用
nodeFunc := node.NewDefault
// Create & start node
// 主要方法 cmd.NewRunNodeCmd
rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc))
cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", config.DefaultTendermintDir)))
if err := cmd.Execute(); err != nil {
panic(err)
}
}
创建运行节点
代码语言:javascript复制// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider cfg.ServiceProvider) *cobra.Command {
// 添加到命令行
cmd := &cobra.Command{
Use: "start",
Aliases: []string{"node", "run"},
Short: "Run the tendermint node",
RunE: func(cmd *cobra.Command, args []string) error {
if err := checkGenesisHash(config); err != nil {
return err
}
// 这里调用 node.NewDefault 这个方法实现
// 创建了节点
n, err := nodeProvider(config, logger)
if err != nil {
return fmt.Errorf("failed to create node: %w", err)
}
// 启动服务
if err := n.Start(); err != nil {
return fmt.Errorf("failed to start node: %w", err)
}
logger.Info("started node", "node", n.String())
// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
if n.IsRunning() {
if err := n.Stop(); err != nil {
logger.Error("unable to stop the node", "error", err)
}
}
})
// Run forever.
select {}
},
}
AddNodeFlags(cmd)
return cmd
}
启动需的默认配置一目了然
代码语言:javascript复制// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config {
return &Config{
BaseConfig: DefaultBaseConfig(),
RPC: DefaultRPCConfig(),
P2P: DefaultP2PConfig(),
Mempool: DefaultMempoolConfig(),
StateSync: DefaultStateSyncConfig(),
BlockSync: DefaultBlockSyncConfig(),
Consensus: DefaultConsensusConfig(),
TxIndex: DefaultTxIndexConfig(),
Instrumentation: DefaultInstrumentationConfig(),
PrivValidator: DefaultPrivValidatorConfig(),
}
}
启动服务
启动服务接口 Service
主要实现类是BaseService
。
service.go
代码语言:javascript复制// Service defines a service that can be started, stopped, and reset.
type Service interface {
// Start the service.
// If it's already started or stopped, will return an error.
// If OnStart() returns an error, it's returned by Start()
Start() error
OnStart() error
}
启动 node
代码语言:javascript复制// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error("not starting service; already stopped", "service", bs.name, "impl", bs.impl.String())
atomic.StoreUint32(&bs.started, 0)
return ErrAlreadyStopped
}
bs.Logger.Info("starting service", "service", bs.name, "impl", bs.impl.String())
// 启动节点。BaseService 有很多实,都实现 OnStart。
// 服务启动是:node.go OnStart
// 共识启动是: state.go OnStart
if err := bs.impl.OnStart(); err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}
bs.Logger.Debug("not starting service; already started", "service", bs.name, "impl", bs.impl.String())
return ErrAlreadyStarted
}
nodeImpl 实现启动流程,总的来说还是比较清晰。
代码语言:javascript复制
// OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart() error {
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
// 这里顺带说下,tendermint 的3种节点为类型
// ModeFull = "full" 数据转发节点
// ModeValidator = "validator" 数据验证节点
// ModeSeed = "seed" 用来做节点发现
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
// 启动 RPC
listeners, err := n.startRPC()
if err != nil {
return err
}
n.rpcListeners = listeners
}
if n.config.Instrumentation.Prometheus &&
n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}
// Start the transport.
addr, err := types.NewNetAddressString(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(p2p.NewEndpoint(addr)); err != nil {
return err
}
n.isListening = true
// p2p 路由
if err = n.router.Start(); err != nil {
return err
}
if n.config.Mode != config.ModeSeed {
if n.config.BlockSync.Enable {
// 开启区块同步
if err := n.bcReactor.Start(); err != nil {
return err
}
}
// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(); err != nil {
return err
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
}
// Start the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Start(); err != nil {
return err
}
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
}
}
if err := n.pexReactor.Start(); err != nil {
return err
}
// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
if !ok {
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
}
// we need to get the genesis state to get parameters such as
state, err := sm.MakeGenesisState(n.genesisDoc)
if err != nil {
return fmt.Errorf("unable to derive state: %w", err)
}
// TODO: we may want to move these events within the respective
// reactors.
// At the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}
// FIXME: We shouldn't allow state sync to silently error out without
// bubbling up the error and gracefully shutting down the rest of the node
go func() {
n.Logger.Info("starting state sync")
state, err := n.stateSyncReactor.Sync(context.TODO())
if err != nil {
n.Logger.Error("state sync failed; shutting down this node", "err", err)
// stop the node
if err := n.Stop(); err != nil {
n.Logger.Error("failed to shut down node", "err", err)
}
return
}
n.consensusReactor.SetStateSyncingMetrics(0)
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
if n.config.BlockSync.Enable {
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
n.Logger.Error("failed to switch to block sync", "err", err)
return
}
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
}
} else {
n.consensusReactor.SwitchToConsensus(state, true)
}
}()
}
return nil
}