简述
Tendermint的共识算法可以看成是POS BFT,Tendermint在进行BFT共识算法确认区块前,首先使用POS算法从Validators中选举出Proposer。 然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。
另外,源码分析部分由于代码篇幅太长,会省略部分源码,不影响阅读。
角色
Tendermint 共识网络中有两个重要角色
- Validator: 网络的参与者,也是区块的验证者(预先配置的网络中的一般验证者账户们)
- Proposer: 从validator中选举出一个节点成为出块人(选举出的出块人),选举不需要网络通信,这点后面从源码中说明。
共识5个阶段
- NewHeight
- propose
- prevote
- precommit
- commit
--------------------------------------
v |(Wait til `CommmitTime timeoutCommit`)
----------- ----- -----
----------> | Propose -------------- | NewHeight |
| ----------- | -----------
| | ^
|(Else, after timeoutPrecommit) v |
----- ----- ----------- |
| Precommit | <------------------------ Prevote | |
----- ----- ----------- |
|(When 2/3 Precommits for block found) |
v |
--------------------------------------------------------------------
| Commit |
| |
| * Set CommitTime = now; |
| * Wait for block, then stage/save/commit block; |
--------------------------------------------------------------------
阶段:Propose阶段、Prevote阶段、Precommit阶段 投票种类:prevote、precommit、commit
round-based 协议
整个Tendermint区块链网络需要通过Round-based协议来决定下一个区块,在区块链中共识的直接目的就是确定下一个区块内容、链接下一个区块
round-based 协议是一个状态机,主要有:
NewHeigh -> Propose -> Prevote -> Precommit -> Commit
上述每个状态都被称为一个 Step。首尾的 NewHeigh 和 Commit ,这两个 Steps 被称为特殊的 Step。
Round
而中间循环三个 Steps则被称为一个 Round
(Propose
-> Prevote
-> Precommit
),是共识阶段
,也是算法的核心原理所在。
一个块的最终提交(Commit)可能需要多个 Round 过程,这是因为有许多原因可能会导致当前 Round 不成功(比如出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 2/3 等等)。
共识失败怎么办
如果出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 2/3 出现这些情况的话,解决方案就是移步到下一轮,或者增加 timeout 时间。
共识流程
- NewHeight 阶段:当区块链达到一个新的高度时进入 NewHeight 阶段。
- propose 阶段:接下来 Propose 阶段会提交一个 proposal ,
- prevote 阶段:prevote 阶段会对收到的 proposal 进行 prevote 投票。
- precommit 阶段:在 precommit 阶段收集到 ⅔ prevote 投票后,对 block 进行 precommit 投票。
- commit阶段:
- 如果收集到 ⅔ precommit 投票后则进入 commit 阶段,
- 如果没有收集到 ⅔ precommit 投票,会再次进入 propose 段。 在共识阶段期间如果收到 ⅔ commit 投票那么直接进入 commit 阶段。 以上就是算法运行的整体过程,接下来分阶段来阐述各个阶段。
Round0 首轮共识分析
服务刚启动时,节点进入第一轮状态共识,Tendenmint 称之为Round0
。
启动流程如下
Round0 是什么做用?做了哪些事
主要是通过监听消息,来处理对应消息类型携带的事件。
流程
- OnStart
- receiveRoutine 启动接收程序 go cs.receiveRoutine
- scheduleRound0: 注意,这个发送的是内部消息,不是 peer消息
- scheduleTimeout 发送
round0
的事件,事件类型:receiveRoutine
- receiveRoutine: 处理事件
共识核心方法说明
receiveRoutine 核心方法 这个函数就比较重要了,它处理了可能导致状态转换的消息。 其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。 通过监听各种 Queue 的消息类型来处理
state.go 源码分析
代码语言:javascript复制// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
// OnStart通过WAL加载最新状态,并启动超时和接收程序。
func (cs *State) OnStart() error {
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
// 在测试中,我们可能会在调用Start之前设置WAL,所以只有在其仍然是nilWAL的情况下才会打开WAL。
if _, ok := cs.wal.(nilWAL); ok {
if err := cs.loadWalFile(); err != nil {
return err
}
}
// We may have lost some votes if the process crashed reload from consensus
// log to catchup.
// 如果从共识日志到追赶的过程中崩溃重新加载,我们可能会失去一些票数。
if cs.doWALCatchup {
repairAttempted := false
LOOP:
for {
err := cs.catchupReplay(cs.Height)
switch {
case err == nil:
break LOOP
case !IsDataCorruptionError(err):
cs.Logger.Error("error on catchup replay; proceeding to start state anyway", "err", err)
break LOOP
case repairAttempted:
return err
}
cs.Logger.Error("the WAL file is corrupted; attempting repair", "err", err)
// 1) prep work
if err := cs.wal.Stop(); err != nil {
return err
}
repairAttempted = true
// 2) backup original WAL file
corruptedFile := fmt.Sprintf("%s.CORRUPTED", cs.config.WalFile())
if err := tmos.CopyFile(cs.config.WalFile(), corruptedFile); err != nil {
return err
}
cs.Logger.Debug("backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)
// 3) try to repair (WAL file will be overwritten!)
if err := repairWalFile(corruptedFile, cs.config.WalFile()); err != nil {
cs.Logger.Error("the WAL repair failed", "err", err)
return err
}
cs.Logger.Info("successful WAL repair")
// reload WAL file
if err := cs.loadWalFile(); err != nil {
return err
}
}
}
// EventSwitch 只监听 EventNewRoundStep、EventValidBlock和EventVote 这三种事件
if err := cs.evsw.Start(); err != nil {
return err
}
// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
// 我们需要重放的timeoutRoutine,这样我们就不会在tick chan上阻塞。
// 注意:我们将得到大量的垃圾程序
// 直到receiveRoutine开始处理它们(到那时,最多只有一个是有效的)来处理它们(到那时,最多只有一个是有效的)。
if err := cs.timeoutTicker.Start(); err != nil {
return err
}
// Double Signing Risk Reduction
// 检查双重验签
if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
return err
}
// now start the receiveRoutine
// 启动接收程序
go cs.receiveRoutine(0)
// schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access
// 安排第一轮!
// 使用GetRoundState,这样我们就不会和receiveRoutine争夺访问权了。
cs.scheduleRound0(cs.GetRoundState())
return nil
}
发送内部消息 cs.scheduleRound0
scheduleRound0 的作用是将消息发送到内部的 chan 当中,receiveRoutine 负责监听不同类型事件,会监听到这个事件。
代码语言:javascript复制// enterNewRound(height, 0) at cs.StartTime.
func (cs *State) scheduleRound0(rs *cstypes.RoundState) {
// cs.Logger.Info("scheduleRound0", "now", tmtime.Now(), "startTime", cs.StartTime)
sleepDuration := rs.StartTime.Sub(tmtime.Now())
// 这一轮是发送了 cstypes.RoundStepNewHeight 事件类型
cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}
发送内部消息,最终将消息发送到 chan
代码语言:javascript复制// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
// ScheduleTimeout通过在内部tickChan上发送来安排一个新的超时。
// timeoutRoutine总是可以从tickChan中读取,所以这不会阻塞。
// 如果timeoutRoutine已经为以后的高度/轮次/步长安排了一个超时,则调度可能会失败。
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
核心处理逻辑
主要是 receiveRoutine 处理
receiveRoutine 处理消息类型
- peerMsgQueue 来自节点的消息
- internalMsgQueue 内部消息
- timeoutTicker 超时的消息
timeoutTicker 这个消息,就是被上面流程中
scheduleRound0
发出的消息
receiveRoutine 处理流程
- 根据类型选把处理器 cs.timeoutTicker.Chan()
- handleTimeout 处理 处理具体类型
handleTimeout 处理具体事件类型
- 监听
RoundStepNewHeight
事件 case cstypes.RoundStepNewHeight: - 进入 Round 流程 这个就重点了,概据官方文档说明,每一次共识都有 5 个步骤,这个是第一步。 cs.enterNewRound(ti.Height, 0)
//-----------------------------------------
// the main go routines
// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// State must be locked before any internal state is updated.
// receiveRoutine处理可能导致状态转换的消息。
// 它的参数(n)是退出前要处理的消息的数量--用0表示永远运行。
// 它保持RoundState,并且是唯一能更新它的东西。
// 更新(状态转换)发生在超时、完整提案和2/3多数的情况下。
// 在任何内部状态被更新之前,状态必须被锁定。
func (cs *State) receiveRoutine(maxSteps int) {
...
// 拿到当前链状态
rs := cs.RoundState
// 注意,这个是接收的 reactor 的消息
var mi msgInfo
// 处理三种类型的消息
// 1.peerMsgQueue 来自节点的消息
// 2.internalMsgQueue 内部消息
// 3.timeoutTicker 超时的消息
select {
case <-cs.txNotifier.TxsAvailable():
cs.handleTxsAvailable()
// peer 节点消息
case mi = <-cs.peerMsgQueue:
if err := cs.wal.Write(mi); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
//监听内部队列消息
case mi = <-cs.internalMsgQueue:
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf(
"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
mi, err,
))
}
// handles proposals, block parts, votes
// 核心的状态逻辑处理,处理 proposals, block parts, votes
cs.handleMsg(mi)
// 注意这个监听,ScheduleTimeout 的 channel
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
if err := cs.wal.Write(ti); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit():
onExit(cs)
return
}
}
}
handleTimeout 分析
代码语言:javascript复制// 进入新一轮
func (cs *State) enterNewRound(height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
logger.Debug(
"entering new round with invalid args",
"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
)
return
}
if now := tmtime.Now(); cs.StartTime.After(now) {
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
}
logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
// increment validators if necessary
// 必要时增加 validator
validators := cs.Validators
if cs.Round < round {
validators = validators.Copy()
validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))
}
// Setup new round
// we don't fire newStep for this step,
// but we fire an event, so update the round step first
// 只是 set 值,并没有接口调用
cs.updateRoundStep(round, cstypes.RoundStepNewRound)
...//省略部分代码
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round 1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
// 发布事件??
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
cs.Logger.Error("failed publishing new round", "err", err)
}
cs.metrics.Rounds.Set(float64(round))
// Wait for txs to be available in the mempool
// before we enterPropose in round 0. If the last block changed the app hash,
// we may need an empty "proof" block, and enterPropose immediately.
// 进入 round0 之前,等待mempool中的txs可用。
// 如果最后一个区块改变了应用程序的哈希值,我们可能需要一个空的 "证明 "区块,并立即输入Propose。
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
if waitForTxs {
if cs.config.CreateEmptyBlocksInterval > 0 {
// 构建空块证明,进入下一个阶段
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
} else {
// 进入 propose 阶段
cs.enterPropose(height, round)
}
}