Tendermint 共识源码分析

2023-10-20 10:01:17 浏览数 (2)

简述

Tendermint的共识算法可以看成是POS BFT,Tendermint在进行BFT共识算法确认区块前,首先使用POS算法从Validators中选举出Proposer。 然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。

另外,源码分析部分由于代码篇幅太长,会省略部分源码,不影响阅读。

角色

Tendermint 共识网络中有两个重要角色

  1. Validator: 网络的参与者,也是区块的验证者(预先配置的网络中的一般验证者账户们)
  2. Proposer: 从validator中选举出一个节点成为出块人(选举出的出块人),选举不需要网络通信,这点后面从源码中说明。

共识5个阶段

  1. NewHeight
  2. propose
  3. prevote
  4. precommit
  5. commit
代码语言:javascript复制
                         -------------------------------------- 
                        v                                      |(Wait til `CommmitTime timeoutCommit`)
                    -----------                           ----- ----- 
       ----------> |  Propose   --------------           | NewHeight |
      |             -----------               |           ----------- 
      |                                       |                ^
      |(Else, after timeoutPrecommit)         v                |
 ----- -----                             -----------           |
| Precommit |  <------------------------   Prevote  |          |
 ----- -----                             -----------           |
      |(When  2/3 Precommits for block found)                  |
      v                                                        |
 -------------------------------------------------------------------- 
|  Commit                                                            |
|                                                                    |
|  * Set CommitTime = now;                                           |
|  * Wait for block, then stage/save/commit block;                   |
 -------------------------------------------------------------------- 

阶段:Propose阶段、Prevote阶段、Precommit阶段 投票种类:prevote、precommit、commit

round-based 协议

整个Tendermint区块链网络需要通过Round-based协议来决定下一个区块,在区块链中共识的直接目的就是确定下一个区块内容、链接下一个区块

round-based 协议是一个状态机,主要有:

NewHeigh -> Propose -> Prevote -> Precommit -> Commit

上述每个状态都被称为一个 Step。首尾的 NewHeigh 和 Commit ,这两个 Steps 被称为特殊的 Step。

Round

而中间循环三个 Steps则被称为一个 Round(Propose -> Prevote -> Precommit),是共识阶段,也是算法的核心原理所在。

一个块的最终提交(Commit)可能需要多个 Round 过程,这是因为有许多原因可能会导致当前 Round 不成功(比如出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 2/3 等等)。

共识失败怎么办

如果出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 2/3 出现这些情况的话,解决方案就是移步到下一轮,或者增加 timeout 时间。

共识流程

  1. NewHeight 阶段:当区块链达到一个新的高度时进入 NewHeight 阶段。
  2. propose 阶段:接下来 Propose 阶段会提交一个 proposal ,
  3. prevote 阶段:prevote 阶段会对收到的 proposal 进行 prevote 投票。
  4. precommit 阶段:在 precommit 阶段收集到 ⅔ prevote 投票后,对 block 进行 precommit 投票。
  5. commit阶段:
  • 如果收集到 ⅔ precommit 投票后则进入 commit 阶段,
  • 如果没有收集到 ⅔ precommit 投票,会再次进入 propose 段。 在共识阶段期间如果收到 ⅔ commit 投票那么直接进入 commit 阶段。 以上就是算法运行的整体过程,接下来分阶段来阐述各个阶段。

Round0 首轮共识分析

服务刚启动时,节点进入第一轮状态共识,Tendenmint 称之为Round0。 启动流程如下

Round0 是什么做用?做了哪些事

主要是通过监听消息,来处理对应消息类型携带的事件。

流程
  1. OnStart
  2. receiveRoutine 启动接收程序 go cs.receiveRoutine
  3. scheduleRound0: 注意,这个发送的是内部消息,不是 peer消息
  4. scheduleTimeout 发送round0 的事件,事件类型:receiveRoutine
  5. receiveRoutine: 处理事件
共识核心方法说明

receiveRoutine 核心方法 这个函数就比较重要了,它处理了可能导致状态转换的消息。 其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。 通过监听各种 Queue 的消息类型来处理

state.go 源码分析

代码语言:javascript复制
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
// OnStart通过WAL加载最新状态,并启动超时和接收程序。
func (cs *State) OnStart() error {
	// We may set the WAL in testing before calling Start, so only OpenWAL if its
	// still the nilWAL.
	// 在测试中,我们可能会在调用Start之前设置WAL,所以只有在其仍然是nilWAL的情况下才会打开WAL。
	if _, ok := cs.wal.(nilWAL); ok {
		if err := cs.loadWalFile(); err != nil {
			return err
		}
	}

	// We may have lost some votes if the process crashed reload from consensus
	// log to catchup.
	// 如果从共识日志到追赶的过程中崩溃重新加载,我们可能会失去一些票数。
	if cs.doWALCatchup {
		repairAttempted := false

	LOOP:
		for {
			err := cs.catchupReplay(cs.Height)
			switch {
			case err == nil:
				break LOOP

			case !IsDataCorruptionError(err):
				cs.Logger.Error("error on catchup replay; proceeding to start state anyway", "err", err)
				break LOOP

			case repairAttempted:
				return err
			}

			cs.Logger.Error("the WAL file is corrupted; attempting repair", "err", err)

			// 1) prep work
			if err := cs.wal.Stop(); err != nil {
				return err
			}

			repairAttempted = true

			// 2) backup original WAL file
			corruptedFile := fmt.Sprintf("%s.CORRUPTED", cs.config.WalFile())
			if err := tmos.CopyFile(cs.config.WalFile(), corruptedFile); err != nil {
				return err
			}

			cs.Logger.Debug("backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)

			// 3) try to repair (WAL file will be overwritten!)
			if err := repairWalFile(corruptedFile, cs.config.WalFile()); err != nil {
				cs.Logger.Error("the WAL repair failed", "err", err)
				return err
			}

			cs.Logger.Info("successful WAL repair")

			// reload WAL file
			if err := cs.loadWalFile(); err != nil {
				return err
			}
		}
	}

	// EventSwitch 只监听 EventNewRoundStep、EventValidBlock和EventVote 这三种事件
	if err := cs.evsw.Start(); err != nil {
		return err
	}

	// we need the timeoutRoutine for replay so
	// we don't block on the tick chan.
	// NOTE: we will get a build up of garbage go routines
	// firing on the tockChan until the receiveRoutine is started
	// to deal with them (by that point, at most one will be valid)
	// 我们需要重放的timeoutRoutine,这样我们就不会在tick chan上阻塞。
	// 注意:我们将得到大量的垃圾程序
	// 直到receiveRoutine开始处理它们(到那时,最多只有一个是有效的)来处理它们(到那时,最多只有一个是有效的)。
	if err := cs.timeoutTicker.Start(); err != nil {
		return err
	}

	// Double Signing Risk Reduction
	// 检查双重验签
	if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
		return err
	}

	// now start the receiveRoutine
	// 启动接收程序
	go cs.receiveRoutine(0)

	// schedule the first round!
	// use GetRoundState so we don't race the receiveRoutine for access
	// 安排第一轮!
	// 使用GetRoundState,这样我们就不会和receiveRoutine争夺访问权了。
	cs.scheduleRound0(cs.GetRoundState())

	return nil
}

发送内部消息 cs.scheduleRound0

scheduleRound0 的作用是将消息发送到内部的 chan 当中,receiveRoutine 负责监听不同类型事件,会监听到这个事件。

代码语言:javascript复制
// enterNewRound(height, 0) at cs.StartTime.
func (cs *State) scheduleRound0(rs *cstypes.RoundState) {
	// cs.Logger.Info("scheduleRound0", "now", tmtime.Now(), "startTime", cs.StartTime)
	sleepDuration := rs.StartTime.Sub(tmtime.Now())

	// 这一轮是发送了 cstypes.RoundStepNewHeight 事件类型
	cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}

发送内部消息,最终将消息发送到 chan

代码语言:javascript复制
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
// ScheduleTimeout通过在内部tickChan上发送来安排一个新的超时。
// timeoutRoutine总是可以从tickChan中读取,所以这不会阻塞。
// 如果timeoutRoutine已经为以后的高度/轮次/步长安排了一个超时,则调度可能会失败。
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
	t.tickChan <- ti
}

核心处理逻辑

主要是 receiveRoutine 处理

receiveRoutine 处理消息类型

  1. peerMsgQueue 来自节点的消息
  2. internalMsgQueue 内部消息
  3. timeoutTicker 超时的消息 timeoutTicker 这个消息,就是被上面流程中scheduleRound0发出的消息

receiveRoutine 处理流程

  1. 根据类型选把处理器 cs.timeoutTicker.Chan()
  2. handleTimeout 处理 处理具体类型

handleTimeout 处理具体事件类型

  1. 监听RoundStepNewHeight事件 case cstypes.RoundStepNewHeight:
  2. 进入 Round 流程 这个就重点了,概据官方文档说明,每一次共识都有 5 个步骤,这个是第一步。 cs.enterNewRound(ti.Height, 0)
代码语言:javascript复制
//-----------------------------------------
// the main go routines

// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// State must be locked before any internal state is updated.
// receiveRoutine处理可能导致状态转换的消息。
// 它的参数(n)是退出前要处理的消息的数量--用0表示永远运行。
// 它保持RoundState,并且是唯一能更新它的东西。
// 更新(状态转换)发生在超时、完整提案和2/3多数的情况下。
// 在任何内部状态被更新之前,状态必须被锁定。
func (cs *State) receiveRoutine(maxSteps int) {
        ...
		// 拿到当前链状态
		rs := cs.RoundState

		// 注意,这个是接收的 reactor 的消息
		var mi msgInfo

		// 处理三种类型的消息
		// 1.peerMsgQueue 来自节点的消息
		// 2.internalMsgQueue 内部消息
		// 3.timeoutTicker 超时的消息
		select {
		case <-cs.txNotifier.TxsAvailable():
			cs.handleTxsAvailable()

			// peer 节点消息
		case mi = <-cs.peerMsgQueue:
			if err := cs.wal.Write(mi); err != nil {
				cs.Logger.Error("failed writing to WAL", "err", err)
			}

			// handles proposals, block parts, votes
			// may generate internal events (votes, complete proposals, 2/3 majorities)
			cs.handleMsg(mi)

			//监听内部队列消息
		case mi = <-cs.internalMsgQueue:
			err := cs.wal.WriteSync(mi) // NOTE: fsync
			if err != nil {
				panic(fmt.Sprintf(
					"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
					mi, err,
				))
			}

			// handles proposals, block parts, votes
			// 核心的状态逻辑处理,处理 proposals, block parts, votes
			cs.handleMsg(mi)

		// 注意这个监听,ScheduleTimeout 的 channel
		case ti := <-cs.timeoutTicker.Chan(): // tockChan:
			if err := cs.wal.Write(ti); err != nil {
				cs.Logger.Error("failed writing to WAL", "err", err)
			}

			// if the timeout is relevant to the rs
			// go to the next step
			cs.handleTimeout(ti, rs)

		case <-cs.Quit():
			onExit(cs)
			return
		}
	}
}

handleTimeout 分析

代码语言:javascript复制
// 进入新一轮
func (cs *State) enterNewRound(height int64, round int32) {
	logger := cs.Logger.With("height", height, "round", round)

	if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
		logger.Debug(
			"entering new round with invalid args",
			"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
		)
		return
	}

	if now := tmtime.Now(); cs.StartTime.After(now) {
		logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
	}

	logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))

	// increment validators if necessary
	// 必要时增加 validator
	validators := cs.Validators
	if cs.Round < round {
		validators = validators.Copy()
		validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))
	}

	// Setup new round
	// we don't fire newStep for this step,
	// but we fire an event, so update the round step first
	// 只是 set 值,并没有接口调用
	cs.updateRoundStep(round, cstypes.RoundStepNewRound)
    ...//省略部分代码
	cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round 1) to allow round-skipping
	cs.TriggeredTimeoutPrecommit = false

	// 发布事件??
	if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
		cs.Logger.Error("failed publishing new round", "err", err)
	}

	cs.metrics.Rounds.Set(float64(round))

	// Wait for txs to be available in the mempool
	// before we enterPropose in round 0. If the last block changed the app hash,
	// we may need an empty "proof" block, and enterPropose immediately.
	// 进入 round0 之前,等待mempool中的txs可用。
	// 如果最后一个区块改变了应用程序的哈希值,我们可能需要一个空的 "证明 "区块,并立即输入Propose。
	waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
	if waitForTxs {
		if cs.config.CreateEmptyBlocksInterval > 0 {
			// 构建空块证明,进入下一个阶段
			cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
				cstypes.RoundStepNewRound)
		}
	} else {
		// 进入 propose 阶段
		cs.enterPropose(height, round)
	}
}

0 人点赞