【共识算法】--“raft的实现”

2022-04-26 19:50:23 浏览数 (2)

看过之前几期的朋友们应该知道在1号第1期最初的时候就实现过一次raft,但之前实现基本是基于python实现的,这次可结合着PBFT,用golang实现了raft。

准备工作:

(1)go编译软件(本人用goland);

(2)客户端

原理如下:

实现的功能如下:

  • 节点状态分为Leader(领导者)、Follower(追随者)、Candidate(候选人)
  • 节点间随机成为candidate状态并选举出Leader,且同时仅存在一个Leader Leader节点定时发送心跳检测至其他Follower节点
  • Follower节点们超过一定时间未收到心跳检测,则Follower节点们重新开启选举
  • 客户端通过http发送消息到节点A,如果A不是Leader则转发至Leader节点
  • Leader收到客户端的消息后向Follower节点进行广播
  • Follower节点收到消息,反馈给Leader,等待Leader确认
  • Leader收到全网超过二分之一的反馈后,本地进行打印,然后将确认收到反馈的信息提交到Follower节点
  • Follower节点收到确认提交信息后,打印消息

操作如下:

(1)下载、编译(或是在已保存的代码包中找到程序位置)

代码语言:javascript复制
git clone https://github.com/corgi-kx/blockchain_consensus_algorithm.git

(2)构建raft.exe

代码语言:javascript复制
go build -o raft.exe

(3)开启三个端口,并分别执行raft.exe A 、raft.exe B 、 raft.exe C,代表开启三个节点(初始状态为追随者)

建立第一个节点进行投票,由于其余节点未开启,单个节点投票数不够,一直处于候选状态。

开启三个节点以后B先超时得2个选票成为leader,其余两个节点自动成为跟随节点。

主节点向其他跟随节点发送心跳

领导节点故障,重新开始选票:

源码如下:

http.go

代码语言:javascript复制
package main

import (
  "crypto/rand"
  "fmt"
  "log"
  "math/big"
  "net/http"
  "net/rpc"
)

//等待节点访问
func (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {
  request.ParseForm()
  //http://localhost:8080/req?message=ohmygod
  if len(request.Form["message"]) > 0 && rf.currentLeader != "-1" {
    message := request.Form["message"][0]
    m := new(Message)
    m.MsgID = getRandom()
    m.Msg = message
    //接收到消息后,直接转发到领导者
    fmt.Println("http监听到了消息,准备发送给领导者,消息id:", m.MsgID)
    port := nodeTable[rf.currentLeader]
    rp, err := rpc.DialHTTP("tcp", "127.0.0.1" port)
    if err != nil {
      log.Panic(err)
    }
    b := false
    err = rp.Call("Raft.LeaderReceiveMessage", m, &b)
    if err != nil {
      log.Panic(err)
    }
    fmt.Println("消息是否已发送到领导者:", b)
    writer.Write([]byte("ok!!!"))
  }
}

func (rf *Raft) httpListen() {
  //创建getRequest()回调方法
  http.HandleFunc("/req", rf.getRequest)
  fmt.Println("监听8080")
  if err := http.ListenAndServe(":8080", nil); err != nil {
    fmt.Println(err)
    return
  }
}

//返回一个十位数的随机数,作为消息idgit
func getRandom() int {
  x := big.NewInt(10000000000)
  for {
    result, err := rand.Int(rand.Reader, x)
    if err != nil {
      log.Panic(err)
    }
    if result.Int64() > 1000000000 {
      return int(result.Int64())
    }
  }
}

main.go

代码语言:javascript复制
package main

import (
  "fmt"
  "log"
  "os"
  "time"
)

//定义节点数量
var raftCount = 3

//节点池
var nodeTable map[string]string

//选举超时时间(单位:秒)
var timeout = 3

//心跳检测超时时间
var heartBeatTimeout = 7

//心跳检测频率(单位:秒)
var heartBeatTimes = 3

//用于存储消息
var MessageStore = make(map[int]string)

func main() {
  //定义三个节点  节点编号 - 监听端口号
  nodeTable = map[string]string{
    "A": ":9000",
    "B": ":9001",
    "C": ":9002",
  }
  //运行程序时候 指定节点编号
  if len(os.Args) < 1 {
    log.Fatal("程序参数不正确")
  }

  id := os.Args[1]
  //传入节点编号,端口号,创建raft实例
  raft := NewRaft(id, nodeTable[id])
  //启用RPC,注册raft
  go rpcRegister(raft)
  //开启心跳检测
  go raft.heartbeat()
  //开启一个Http监听
  if id == "A" {
    go raft.httpListen()
  }

Circle:
  //开启选举
  go func() {
    for {
      //成为候选人节点
      if raft.becomeCandidate() {
        //成为后选人节点后 向其他节点要选票来进行选举
        if raft.election() {
          break
        } else {
          continue
        }
      } else {
        break
      }
    }
  }()

  //进行心跳检测
  for {
    //0.5秒检测一次
    time.Sleep(time.Millisecond * 5000)
    if raft.lastHeartBeartTime != 0 && (millisecond()-raft.lastHeartBeartTime) > int64(raft.timeout*1000) {
      fmt.Printf("心跳检测超时,已超过%d秒n", raft.timeout)
      fmt.Println("即将重新开启选举")
      raft.reDefault()
      raft.setCurrentLeader("-1")
      raft.lastHeartBeartTime = 0
      goto Circle
    }
  }
}

raft.go

代码语言:javascript复制
package raft

import (
  "fmt"
  "math/rand"
  "sync"
  "time"
)

//声明raft节点类型
type Raft struct {
  node *NodeInfo
  //本节点获得的投票数
  vote int
  //线程锁
  lock sync.Mutex
  //节点编号
  me string
  //当前任期
  currentTerm int
  //为哪个节点投票
  votedFor string
  //当前节点状态
  //0 follower  1 candidate  2 leader
  state int
  //发送最后一条消息的时间
  lastMessageTime int64
  //发送最后一条消息的时间
  lastHeartBeartTime int64
  //当前节点的领导
  currentLeader string
  //心跳超时时间(单位:秒)
  timeout int
  //接收投票成功通道
  voteCh chan bool
  //心跳信号
  heartBeat chan bool
}

type NodeInfo struct {
  ID   string
  Port string
}

type Message struct {
  Msg   string
  MsgID int
}

func NewRaft(id, port string) *Raft {
  node := new(NodeInfo)
  node.ID = id
  node.Port = port

  rf := new(Raft)
  //节点信息
  rf.node = node
  //当前节点获得票数
  rf.setVote(0)
  //编号
  rf.me = id
  //给0  1  2三个节点投票,给谁都不投
  rf.setVoteFor("-1")
  //0 follower
  rf.setStatus(0)
  //最后一次心跳检测时间
  rf.lastHeartBeartTime = 0
  rf.timeout = heartBeatTimeout
  //最初没有领导
  rf.setCurrentLeader("-1")
  //设置任期
  rf.setTerm(0)
  //投票通道
  rf.voteCh = make(chan bool)
  //心跳通道
  rf.heartBeat = make(chan bool)
  return rf
}

//修改节点为候选人状态
func (rf *Raft) becomeCandidate() bool {
  r := randRange(1500, 5000)
  //休眠随机时间后,再开始成为候选人
  time.Sleep(time.Duration(r) * time.Millisecond)
  //如果发现本节点已经投过票,或者已经存在领导者,则不用变身候选人状态
  if rf.state == 0 && rf.currentLeader == "-1" && rf.votedFor == "-1" {
    //将节点状态变为1
    rf.setStatus(1)
    //设置为哪个节点投票
    rf.setVoteFor(rf.me)
    //节点任期加1
    rf.setTerm(rf.currentTerm   1)
    //当前没有领导
    rf.setCurrentLeader("-1")
    //为自己投票
    rf.voteAdd()
    fmt.Println("本节点已变更为候选人状态")
    fmt.Printf("当前得票数:%dn", rf.vote)
    //开启选举通道
    return true
  } else {
    return false
  }
}

//进行选举
func (rf *Raft) election() bool {
  fmt.Println("开始进行领导者选举,向其他节点进行广播")
  go rf.broadcast("Raft.Vote", rf.node, func(ok bool) {
    rf.voteCh <- ok
  })
  for {
    select {
    case <-time.After(time.Second * time.Duration(timeout)):
      fmt.Println("领导者选举超时,节点变更为追随者状态n")
      rf.reDefault()
      return false
    case ok := <-rf.voteCh:
      if ok {
        rf.voteAdd()
        fmt.Printf("获得来自其他节点的投票,当前得票数:%dn", rf.vote)
      }
      if rf.vote > raftCount/2 && rf.currentLeader == "-1" {
        fmt.Println("获得超过网络节点二分之一的得票数,本节点被选举成为了leader")
        //节点状态变为2,代表leader
        rf.setStatus(2)
        //当前领导者为自己
        rf.setCurrentLeader(rf.me)
        fmt.Println("向其他节点进行广播...")
        go rf.broadcast("Raft.ConfirmationLeader", rf.node, func(ok bool) {
          fmt.Println(ok)
        })
        //开启心跳检测通道
        rf.heartBeat <- true
        return true
      }
    }
  }
}

//心跳检测方法
func (rf *Raft) heartbeat() {
  //如果收到通道开启的信息,将会向其他节点进行固定频率的心跳检测
  if <-rf.heartBeat {
    for {
      fmt.Println("本节点开始发送心跳检测...")
      rf.broadcast("Raft.HeartbeatRe", rf.node, func(ok bool) {
        fmt.Println("收到回复:", ok)
      })
      rf.lastHeartBeartTime = millisecond()
      time.Sleep(time.Second * time.Duration(heartBeatTimes))
    }
  }
}

//产生随机值
func randRange(min, max int64) int64 {
  //用于心跳信号的时间
  rand.Seed(time.Now().UnixNano())
  return rand.Int63n(max-min)   min
}

//获取当前时间的毫秒数
func millisecond() int64 {
  return time.Now().UnixNano() / int64(time.Millisecond)
}

//设置任期
func (rf *Raft) setTerm(term int) {
  rf.lock.Lock()
  rf.currentTerm = term
  rf.lock.Unlock()
}

//设置为谁投票
func (rf *Raft) setVoteFor(id string) {
  rf.lock.Lock()
  rf.votedFor = id
  rf.lock.Unlock()
}

//设置当前领导者
func (rf *Raft) setCurrentLeader(leader string) {
  rf.lock.Lock()
  rf.currentLeader = leader
  rf.lock.Unlock()
}

//设置当前领导者
func (rf *Raft) setStatus(state int) {
  rf.lock.Lock()
  rf.state = state
  rf.lock.Unlock()
}

//投票累加
func (rf *Raft) voteAdd() {
  rf.lock.Lock()
  rf.vote  
  rf.lock.Unlock()
}

//设置投票数量
func (rf *Raft) setVote(num int) {
  rf.lock.Lock()
  rf.vote = num
  rf.lock.Unlock()
}

//恢复默认设置
func (rf *Raft) reDefault() {
  rf.setVote(0)
  //rf.currentLeader = "-1"
  rf.setVoteFor("-1")
  rf.setStatus(0)
}

rpc.go

代码语言:javascript复制
package main

import (
  "fmt"
  "log"
  "net/http"
  "net/rpc"
  "time"
)

//rpc服务注册
func rpcRegister(raft *Raft) {
  //注册一个服务器
  err := rpc.Register(raft)
  if err != nil {
    log.Panic(err)
  }
  port := raft.node.Port
  //把服务绑定到http协议上
  rpc.HandleHTTP()
  //监听端口
  err = http.ListenAndServe(port, nil)
  if err != nil {
    fmt.Println("注册rpc服务失败", err)
  }
}

func (rf *Raft) broadcast(method string, args interface{}, fun func(ok bool)) {
  //设置不要自己给自己广播
  for nodeID, port := range nodeTable {
    if nodeID == rf.node.ID {
      continue
    }
    //连接远程rpc
    rp, err := rpc.DialHTTP("tcp", "127.0.0.1" port)
    if err != nil {
      fun(false)
      continue
    }

    var bo = false
    err = rp.Call(method, args, &bo)
    if err != nil {
      fun(false)
      continue
    }
    fun(bo)
  }
}

//投票
func (rf *Raft) Vote(node NodeInfo, b *bool) error {
  if rf.votedFor != "-1" || rf.currentLeader != "-1" {
    *b = false
  } else {
    rf.setVoteFor(node.ID)
    fmt.Printf("投票成功,已投%s节点n", node.ID)
    *b = true
  }
  return nil
}

//确认领导者
func (rf *Raft) ConfirmationLeader(node NodeInfo, b *bool) error {
  rf.setCurrentLeader(node.ID)
  *b = true
  fmt.Println("已发现网络中的领导节点,", node.ID, "成为了领导者!n")
  rf.reDefault()
  return nil
}

//心跳检测回复
func (rf *Raft) HeartbeatRe(node NodeInfo, b *bool) error {
  rf.setCurrentLeader(node.ID)
  rf.lastHeartBeartTime = millisecond()
  fmt.Printf("接收到来自领导节点%s的心跳检测n", node.ID)
  fmt.Printf("当前时间为:%dnn", millisecond())
  *b = true
  return nil
}

//追随者节点用来接收消息,然后存储到消息池中,待领导者确认后打印
func (rf *Raft) ReceiveMessage(message Message, b *bool) error {
  fmt.Printf("接收到领导者节点发来的信息,id为:%dn", message.MsgID)
  MessageStore[message.MsgID] = message.Msg
  *b = true
  fmt.Println("已回复接收到消息,待领导者确认后打印")
  return nil
}

//追随者节点的反馈得到领导者节点的确认,开始打印消息
func (rf *Raft) ConfirmationMessage(message Message, b *bool) error {
  go func() {
    for {
      if _, ok := MessageStore[message.MsgID]; ok {
        fmt.Printf("raft验证通过,可以打印消息,id为:%dn", message.MsgID)
        fmt.Println("消息为:", MessageStore[message.MsgID], "n")
        rf.lastMessageTime = millisecond()
        break
      } else {
        //如果没有此消息,等一会看看!!!
        time.Sleep(time.Millisecond * 10)
      }

    }
  }()
  *b = true
  return nil
}

//领导者接收到,追随者节点转发过来的消息
func (rf *Raft) LeaderReceiveMessage(message Message, b *bool) error {
  fmt.Printf("领导者节点接收到转发过来的消息,id为:%dn", message.MsgID)
  MessageStore[message.MsgID] = message.Msg
  *b = true
  fmt.Println("准备将消息进行广播...")
  num := 0
  go rf.broadcast("Raft.ReceiveMessage", message, func(ok bool) {
    if ok {
      num  
    }
  })

  for {
    //自己默认收到了消息,所以减去一
    if num > raftCount/2-1 {
      fmt.Printf("全网已超过半数节点接收到消息id:%dnraft验证通过,可以打印消息,id为:%dn", message.MsgID, message.MsgID)
      fmt.Println("消息为:", MessageStore[message.MsgID], "n")
      rf.lastMessageTime = millisecond()
      fmt.Println("准备将消息提交信息发送至客户端...")
      go rf.broadcast("Raft.ConfirmationMessage", message, func(ok bool) {
      })
      break
    } else {
      //休息会儿
      time.Sleep(time.Millisecond * 100)
    }
  }
  return nil
}

参考:

  • http://thesecretlivesofdata.com/raft/
  • https://www.cnblogs.com/mindwind/p/5231986.html
  • https://blog.csdn.net/s15738841819/article/details/84286276

【共识算法】-“PBFT的实现”

2021-11-26

【共识算法(6)】-“DPOS与POS的区别与实现”

2021-11-24

0 人点赞