看过之前几期的朋友们应该知道在1号第1期最初的时候就实现过一次raft,但之前实现基本是基于python实现的,这次可结合着PBFT,用golang实现了raft。
准备工作:
(1)go编译软件(本人用goland);
(2)客户端
原理如下:
实现的功能如下:
- 节点状态分为Leader(领导者)、Follower(追随者)、Candidate(候选人)
- 节点间随机成为candidate状态并选举出Leader,且同时仅存在一个Leader Leader节点定时发送心跳检测至其他Follower节点
- Follower节点们超过一定时间未收到心跳检测,则Follower节点们重新开启选举
- 客户端通过http发送消息到节点A,如果A不是Leader则转发至Leader节点
- Leader收到客户端的消息后向Follower节点进行广播
- Follower节点收到消息,反馈给Leader,等待Leader确认
- Leader收到全网超过二分之一的反馈后,本地进行打印,然后将确认收到反馈的信息提交到Follower节点
- Follower节点收到确认提交信息后,打印消息
操作如下:
(1)下载、编译(或是在已保存的代码包中找到程序位置)
代码语言:javascript复制git clone https://github.com/corgi-kx/blockchain_consensus_algorithm.git
(2)构建raft.exe
代码语言:javascript复制go build -o raft.exe
(3)开启三个端口,并分别执行raft.exe A 、raft.exe B 、 raft.exe C,代表开启三个节点(初始状态为追随者)
建立第一个节点进行投票,由于其余节点未开启,单个节点投票数不够,一直处于候选状态。
开启三个节点以后B先超时得2个选票成为leader,其余两个节点自动成为跟随节点。
主节点向其他跟随节点发送心跳
领导节点故障,重新开始选票:
源码如下:
http.go
代码语言:javascript复制package main
import (
"crypto/rand"
"fmt"
"log"
"math/big"
"net/http"
"net/rpc"
)
//等待节点访问
func (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {
request.ParseForm()
//http://localhost:8080/req?message=ohmygod
if len(request.Form["message"]) > 0 && rf.currentLeader != "-1" {
message := request.Form["message"][0]
m := new(Message)
m.MsgID = getRandom()
m.Msg = message
//接收到消息后,直接转发到领导者
fmt.Println("http监听到了消息,准备发送给领导者,消息id:", m.MsgID)
port := nodeTable[rf.currentLeader]
rp, err := rpc.DialHTTP("tcp", "127.0.0.1" port)
if err != nil {
log.Panic(err)
}
b := false
err = rp.Call("Raft.LeaderReceiveMessage", m, &b)
if err != nil {
log.Panic(err)
}
fmt.Println("消息是否已发送到领导者:", b)
writer.Write([]byte("ok!!!"))
}
}
func (rf *Raft) httpListen() {
//创建getRequest()回调方法
http.HandleFunc("/req", rf.getRequest)
fmt.Println("监听8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Println(err)
return
}
}
//返回一个十位数的随机数,作为消息idgit
func getRandom() int {
x := big.NewInt(10000000000)
for {
result, err := rand.Int(rand.Reader, x)
if err != nil {
log.Panic(err)
}
if result.Int64() > 1000000000 {
return int(result.Int64())
}
}
}
main.go
代码语言:javascript复制package main
import (
"fmt"
"log"
"os"
"time"
)
//定义节点数量
var raftCount = 3
//节点池
var nodeTable map[string]string
//选举超时时间(单位:秒)
var timeout = 3
//心跳检测超时时间
var heartBeatTimeout = 7
//心跳检测频率(单位:秒)
var heartBeatTimes = 3
//用于存储消息
var MessageStore = make(map[int]string)
func main() {
//定义三个节点 节点编号 - 监听端口号
nodeTable = map[string]string{
"A": ":9000",
"B": ":9001",
"C": ":9002",
}
//运行程序时候 指定节点编号
if len(os.Args) < 1 {
log.Fatal("程序参数不正确")
}
id := os.Args[1]
//传入节点编号,端口号,创建raft实例
raft := NewRaft(id, nodeTable[id])
//启用RPC,注册raft
go rpcRegister(raft)
//开启心跳检测
go raft.heartbeat()
//开启一个Http监听
if id == "A" {
go raft.httpListen()
}
Circle:
//开启选举
go func() {
for {
//成为候选人节点
if raft.becomeCandidate() {
//成为后选人节点后 向其他节点要选票来进行选举
if raft.election() {
break
} else {
continue
}
} else {
break
}
}
}()
//进行心跳检测
for {
//0.5秒检测一次
time.Sleep(time.Millisecond * 5000)
if raft.lastHeartBeartTime != 0 && (millisecond()-raft.lastHeartBeartTime) > int64(raft.timeout*1000) {
fmt.Printf("心跳检测超时,已超过%d秒n", raft.timeout)
fmt.Println("即将重新开启选举")
raft.reDefault()
raft.setCurrentLeader("-1")
raft.lastHeartBeartTime = 0
goto Circle
}
}
}
raft.go
代码语言:javascript复制package raft
import (
"fmt"
"math/rand"
"sync"
"time"
)
//声明raft节点类型
type Raft struct {
node *NodeInfo
//本节点获得的投票数
vote int
//线程锁
lock sync.Mutex
//节点编号
me string
//当前任期
currentTerm int
//为哪个节点投票
votedFor string
//当前节点状态
//0 follower 1 candidate 2 leader
state int
//发送最后一条消息的时间
lastMessageTime int64
//发送最后一条消息的时间
lastHeartBeartTime int64
//当前节点的领导
currentLeader string
//心跳超时时间(单位:秒)
timeout int
//接收投票成功通道
voteCh chan bool
//心跳信号
heartBeat chan bool
}
type NodeInfo struct {
ID string
Port string
}
type Message struct {
Msg string
MsgID int
}
func NewRaft(id, port string) *Raft {
node := new(NodeInfo)
node.ID = id
node.Port = port
rf := new(Raft)
//节点信息
rf.node = node
//当前节点获得票数
rf.setVote(0)
//编号
rf.me = id
//给0 1 2三个节点投票,给谁都不投
rf.setVoteFor("-1")
//0 follower
rf.setStatus(0)
//最后一次心跳检测时间
rf.lastHeartBeartTime = 0
rf.timeout = heartBeatTimeout
//最初没有领导
rf.setCurrentLeader("-1")
//设置任期
rf.setTerm(0)
//投票通道
rf.voteCh = make(chan bool)
//心跳通道
rf.heartBeat = make(chan bool)
return rf
}
//修改节点为候选人状态
func (rf *Raft) becomeCandidate() bool {
r := randRange(1500, 5000)
//休眠随机时间后,再开始成为候选人
time.Sleep(time.Duration(r) * time.Millisecond)
//如果发现本节点已经投过票,或者已经存在领导者,则不用变身候选人状态
if rf.state == 0 && rf.currentLeader == "-1" && rf.votedFor == "-1" {
//将节点状态变为1
rf.setStatus(1)
//设置为哪个节点投票
rf.setVoteFor(rf.me)
//节点任期加1
rf.setTerm(rf.currentTerm 1)
//当前没有领导
rf.setCurrentLeader("-1")
//为自己投票
rf.voteAdd()
fmt.Println("本节点已变更为候选人状态")
fmt.Printf("当前得票数:%dn", rf.vote)
//开启选举通道
return true
} else {
return false
}
}
//进行选举
func (rf *Raft) election() bool {
fmt.Println("开始进行领导者选举,向其他节点进行广播")
go rf.broadcast("Raft.Vote", rf.node, func(ok bool) {
rf.voteCh <- ok
})
for {
select {
case <-time.After(time.Second * time.Duration(timeout)):
fmt.Println("领导者选举超时,节点变更为追随者状态n")
rf.reDefault()
return false
case ok := <-rf.voteCh:
if ok {
rf.voteAdd()
fmt.Printf("获得来自其他节点的投票,当前得票数:%dn", rf.vote)
}
if rf.vote > raftCount/2 && rf.currentLeader == "-1" {
fmt.Println("获得超过网络节点二分之一的得票数,本节点被选举成为了leader")
//节点状态变为2,代表leader
rf.setStatus(2)
//当前领导者为自己
rf.setCurrentLeader(rf.me)
fmt.Println("向其他节点进行广播...")
go rf.broadcast("Raft.ConfirmationLeader", rf.node, func(ok bool) {
fmt.Println(ok)
})
//开启心跳检测通道
rf.heartBeat <- true
return true
}
}
}
}
//心跳检测方法
func (rf *Raft) heartbeat() {
//如果收到通道开启的信息,将会向其他节点进行固定频率的心跳检测
if <-rf.heartBeat {
for {
fmt.Println("本节点开始发送心跳检测...")
rf.broadcast("Raft.HeartbeatRe", rf.node, func(ok bool) {
fmt.Println("收到回复:", ok)
})
rf.lastHeartBeartTime = millisecond()
time.Sleep(time.Second * time.Duration(heartBeatTimes))
}
}
}
//产生随机值
func randRange(min, max int64) int64 {
//用于心跳信号的时间
rand.Seed(time.Now().UnixNano())
return rand.Int63n(max-min) min
}
//获取当前时间的毫秒数
func millisecond() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
//设置任期
func (rf *Raft) setTerm(term int) {
rf.lock.Lock()
rf.currentTerm = term
rf.lock.Unlock()
}
//设置为谁投票
func (rf *Raft) setVoteFor(id string) {
rf.lock.Lock()
rf.votedFor = id
rf.lock.Unlock()
}
//设置当前领导者
func (rf *Raft) setCurrentLeader(leader string) {
rf.lock.Lock()
rf.currentLeader = leader
rf.lock.Unlock()
}
//设置当前领导者
func (rf *Raft) setStatus(state int) {
rf.lock.Lock()
rf.state = state
rf.lock.Unlock()
}
//投票累加
func (rf *Raft) voteAdd() {
rf.lock.Lock()
rf.vote
rf.lock.Unlock()
}
//设置投票数量
func (rf *Raft) setVote(num int) {
rf.lock.Lock()
rf.vote = num
rf.lock.Unlock()
}
//恢复默认设置
func (rf *Raft) reDefault() {
rf.setVote(0)
//rf.currentLeader = "-1"
rf.setVoteFor("-1")
rf.setStatus(0)
}
rpc.go
代码语言:javascript复制package main
import (
"fmt"
"log"
"net/http"
"net/rpc"
"time"
)
//rpc服务注册
func rpcRegister(raft *Raft) {
//注册一个服务器
err := rpc.Register(raft)
if err != nil {
log.Panic(err)
}
port := raft.node.Port
//把服务绑定到http协议上
rpc.HandleHTTP()
//监听端口
err = http.ListenAndServe(port, nil)
if err != nil {
fmt.Println("注册rpc服务失败", err)
}
}
func (rf *Raft) broadcast(method string, args interface{}, fun func(ok bool)) {
//设置不要自己给自己广播
for nodeID, port := range nodeTable {
if nodeID == rf.node.ID {
continue
}
//连接远程rpc
rp, err := rpc.DialHTTP("tcp", "127.0.0.1" port)
if err != nil {
fun(false)
continue
}
var bo = false
err = rp.Call(method, args, &bo)
if err != nil {
fun(false)
continue
}
fun(bo)
}
}
//投票
func (rf *Raft) Vote(node NodeInfo, b *bool) error {
if rf.votedFor != "-1" || rf.currentLeader != "-1" {
*b = false
} else {
rf.setVoteFor(node.ID)
fmt.Printf("投票成功,已投%s节点n", node.ID)
*b = true
}
return nil
}
//确认领导者
func (rf *Raft) ConfirmationLeader(node NodeInfo, b *bool) error {
rf.setCurrentLeader(node.ID)
*b = true
fmt.Println("已发现网络中的领导节点,", node.ID, "成为了领导者!n")
rf.reDefault()
return nil
}
//心跳检测回复
func (rf *Raft) HeartbeatRe(node NodeInfo, b *bool) error {
rf.setCurrentLeader(node.ID)
rf.lastHeartBeartTime = millisecond()
fmt.Printf("接收到来自领导节点%s的心跳检测n", node.ID)
fmt.Printf("当前时间为:%dnn", millisecond())
*b = true
return nil
}
//追随者节点用来接收消息,然后存储到消息池中,待领导者确认后打印
func (rf *Raft) ReceiveMessage(message Message, b *bool) error {
fmt.Printf("接收到领导者节点发来的信息,id为:%dn", message.MsgID)
MessageStore[message.MsgID] = message.Msg
*b = true
fmt.Println("已回复接收到消息,待领导者确认后打印")
return nil
}
//追随者节点的反馈得到领导者节点的确认,开始打印消息
func (rf *Raft) ConfirmationMessage(message Message, b *bool) error {
go func() {
for {
if _, ok := MessageStore[message.MsgID]; ok {
fmt.Printf("raft验证通过,可以打印消息,id为:%dn", message.MsgID)
fmt.Println("消息为:", MessageStore[message.MsgID], "n")
rf.lastMessageTime = millisecond()
break
} else {
//如果没有此消息,等一会看看!!!
time.Sleep(time.Millisecond * 10)
}
}
}()
*b = true
return nil
}
//领导者接收到,追随者节点转发过来的消息
func (rf *Raft) LeaderReceiveMessage(message Message, b *bool) error {
fmt.Printf("领导者节点接收到转发过来的消息,id为:%dn", message.MsgID)
MessageStore[message.MsgID] = message.Msg
*b = true
fmt.Println("准备将消息进行广播...")
num := 0
go rf.broadcast("Raft.ReceiveMessage", message, func(ok bool) {
if ok {
num
}
})
for {
//自己默认收到了消息,所以减去一
if num > raftCount/2-1 {
fmt.Printf("全网已超过半数节点接收到消息id:%dnraft验证通过,可以打印消息,id为:%dn", message.MsgID, message.MsgID)
fmt.Println("消息为:", MessageStore[message.MsgID], "n")
rf.lastMessageTime = millisecond()
fmt.Println("准备将消息提交信息发送至客户端...")
go rf.broadcast("Raft.ConfirmationMessage", message, func(ok bool) {
})
break
} else {
//休息会儿
time.Sleep(time.Millisecond * 100)
}
}
return nil
}
参考:
- http://thesecretlivesofdata.com/raft/
- https://www.cnblogs.com/mindwind/p/5231986.html
- https://blog.csdn.net/s15738841819/article/details/84286276
【共识算法】-“PBFT的实现”
2021-11-26
【共识算法(6)】-“DPOS与POS的区别与实现”
2021-11-24