Tendermint 启动流程

2023-10-23 08:55:09 浏览数 (1)

启动流程

Tendermint 的启动流程比较清析明了,各业务启动流程都在对应的实现代码,主启动流程加载所需配置,由各启动实现类启动自己对应业务,如节点启动相关在 nodeImpl,共识相关处理是 state 中进行处理。

流程大致:

  1. 加载配置 node.NewDefault
  2. 启动运行 Start
  3. 启动相关实现的 OnStart

先看启动流程

启动入口代码,这里使用到了一个命令行工具:cobra。 代码位置:cmd/tendermint/main.go

代码语言:javascript复制
func main() {
    ...省略部份代码
	// NOTE:
	// Users wishing to:
	//	* Use an external signer for their validators
	//	* Supply an in-proc abci app
	//	* Supply a genesis doc file from another source
	//	* Provide their own DB implementation
	// can copy this file and use something other than the
	// node.NewDefault function
    // 创建节点为默认动行节点,这里是函数引用,并未执行
    // 在 cmd.NewRunNodeCmd 调用
	nodeFunc := node.NewDefault

	// Create & start node
    // 主要方法 cmd.NewRunNodeCmd
	rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc))

	cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", config.DefaultTendermintDir)))
	if err := cmd.Execute(); err != nil {
		panic(err)
	}
}

创建运行节点

代码语言:javascript复制
// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider cfg.ServiceProvider) *cobra.Command {
	// 添加到命令行
	cmd := &cobra.Command{
		Use:     "start",
		Aliases: []string{"node", "run"},
		Short:   "Run the tendermint node",
		RunE: func(cmd *cobra.Command, args []string) error {
			if err := checkGenesisHash(config); err != nil {
				return err
			}
			// 这里调用 node.NewDefault 这个方法实现
            // 创建了节点
			n, err := nodeProvider(config, logger)
			if err != nil {
				return fmt.Errorf("failed to create node: %w", err)
			}
			// 启动服务
			if err := n.Start(); err != nil {
				return fmt.Errorf("failed to start node: %w", err)
			}

			logger.Info("started node", "node", n.String())

			// Stop upon receiving SIGTERM or CTRL-C.
			tmos.TrapSignal(logger, func() {
				if n.IsRunning() {
					if err := n.Stop(); err != nil {
						logger.Error("unable to stop the node", "error", err)
					}
				}
			})

			// Run forever.
			select {}
		},
	}

	AddNodeFlags(cmd)
	return cmd
}

启动需的默认配置一目了然

代码语言:javascript复制
// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config {
	return &Config{
		BaseConfig:      DefaultBaseConfig(),
		RPC:             DefaultRPCConfig(),
		P2P:             DefaultP2PConfig(),
		Mempool:         DefaultMempoolConfig(),
		StateSync:       DefaultStateSyncConfig(),
		BlockSync:       DefaultBlockSyncConfig(),
		Consensus:       DefaultConsensusConfig(),
		TxIndex:         DefaultTxIndexConfig(),
		Instrumentation: DefaultInstrumentationConfig(),
		PrivValidator:   DefaultPrivValidatorConfig(),
	}
}

启动服务

启动服务接口 Service 主要实现类是BaseService

service.go

代码语言:javascript复制
// Service defines a service that can be started, stopped, and reset.
type Service interface {
	// Start the service.
	// If it's already started or stopped, will return an error.
	// If OnStart() returns an error, it's returned by Start()
	Start() error
	OnStart() error
}

启动 node

代码语言:javascript复制
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
	if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
		if atomic.LoadUint32(&bs.stopped) == 1 {
			bs.Logger.Error("not starting service; already stopped", "service", bs.name, "impl", bs.impl.String())
			atomic.StoreUint32(&bs.started, 0)
			return ErrAlreadyStopped
		}

		bs.Logger.Info("starting service", "service", bs.name, "impl", bs.impl.String())
        // 启动节点。BaseService 有很多实,都实现 OnStart。
        // 服务启动是:node.go OnStart
        // 共识启动是: state.go OnStart
		if err := bs.impl.OnStart(); err != nil {
			// revert flag
			atomic.StoreUint32(&bs.started, 0)
			return err
		}
		return nil
	}

	bs.Logger.Debug("not starting service; already started", "service", bs.name, "impl", bs.impl.String())
	return ErrAlreadyStarted
}

nodeImpl 实现启动流程,总的来说还是比较清晰。

代码语言:javascript复制

// OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart() error {
	now := tmtime.Now()
	genTime := n.genesisDoc.GenesisTime
	if genTime.After(now) {
		n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
		time.Sleep(genTime.Sub(now))
	}

	// Start the RPC server before the P2P server
	// so we can eg. receive txs for the first block
	// 这里顺带说下,tendermint 的3种节点为类型
	// 	ModeFull      = "full" 数据转发节点
	//	ModeValidator = "validator"  数据验证节点
	//	ModeSeed      = "seed"   用来做节点发现
	if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
		// 启动 RPC
		listeners, err := n.startRPC()
		if err != nil {
			return err
		}
		n.rpcListeners = listeners
	}

	if n.config.Instrumentation.Prometheus &&
		n.config.Instrumentation.PrometheusListenAddr != "" {
		n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
	}

	// Start the transport.
	addr, err := types.NewNetAddressString(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress))
	if err != nil {
		return err
	}
	if err := n.transport.Listen(p2p.NewEndpoint(addr)); err != nil {
		return err
	}

	n.isListening = true

	// p2p 路由
	if err = n.router.Start(); err != nil {
		return err
	}

	if n.config.Mode != config.ModeSeed {
		if n.config.BlockSync.Enable {
			// 开启区块同步
			if err := n.bcReactor.Start(); err != nil {
				return err
			}
		}

		// Start the real consensus reactor separately since the switch uses the shim.
		if err := n.consensusReactor.Start(); err != nil {
			return err
		}

		// Start the real state sync reactor separately since the switch uses the shim.
		if err := n.stateSyncReactor.Start(); err != nil {
			return err
		}

		// Start the real mempool reactor separately since the switch uses the shim.
		if err := n.mempoolReactor.Start(); err != nil {
			return err
		}

		// Start the real evidence reactor separately since the switch uses the shim.
		if err := n.evidenceReactor.Start(); err != nil {
			return err
		}
	}

	if err := n.pexReactor.Start(); err != nil {
		return err
	}

	// Run state sync
	// TODO: We shouldn't run state sync if we already have state that has a
	// LastBlockHeight that is not InitialHeight
	if n.stateSync {
		bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
		if !ok {
			return fmt.Errorf("this blockchain reactor does not support switching from state sync")
		}

		// we need to get the genesis state to get parameters such as
		state, err := sm.MakeGenesisState(n.genesisDoc)
		if err != nil {
			return fmt.Errorf("unable to derive state: %w", err)
		}

		// TODO: we may want to move these events within the respective
		// reactors.
		// At the beginning of the statesync start, we use the initialHeight as the event height
		// because of the statesync doesn't have the concreate state height before fetched the snapshot.
		d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
		if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
			n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
		}

		// FIXME: We shouldn't allow state sync to silently error out without
		// bubbling up the error and gracefully shutting down the rest of the node
		go func() {
			n.Logger.Info("starting state sync")
			state, err := n.stateSyncReactor.Sync(context.TODO())
			if err != nil {
				n.Logger.Error("state sync failed; shutting down this node", "err", err)
				// stop the node
				if err := n.Stop(); err != nil {
					n.Logger.Error("failed to shut down node", "err", err)
				}
				return
			}

			n.consensusReactor.SetStateSyncingMetrics(0)

			d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
			if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
				n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
			}

			// TODO: Some form of orchestrator is needed here between the state
			// advancing reactors to be able to control which one of the three
			// is running
			if n.config.BlockSync.Enable {
				// FIXME Very ugly to have these metrics bleed through here.
				n.consensusReactor.SetBlockSyncingMetrics(1)
				if err := bcR.SwitchToBlockSync(state); err != nil {
					n.Logger.Error("failed to switch to block sync", "err", err)
					return
				}

				d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
				if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
					n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
				}

			} else {
				n.consensusReactor.SwitchToConsensus(state, true)
			}
		}()
	}

	return nil
}

0 人点赞