golang源码分析:raft(11)

2023-09-07 09:05:45 浏览数 (3)

前面提到transport将远程对象分为两类:remote和peer,分别代表新建立的连接和已经加入集群的节点,下面简单分析下它们的核心逻辑:

代码语言:javascript复制
type remote struct {
  lg       *zap.Logger
  localID  types.ID
  id       types.ID
  status   *peerStatus
  pipeline *pipeline
}

初始化remote的时候会启动一个pipline对象,里面会通过channel将消息分发给handler协程,然后走http协议发送出去:

代码语言:javascript复制
func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
   pipeline := &pipeline{
    peerID: id,
    tr:     tr,
    picker: picker,
    status: status,
    raft:   tr.Raft,
    errorc: tr.ErrorC,
  }
  pipeline.start()
      return &remote{
    lg:       tr.Logger,
    localID:  tr.ID,
    id:       id,
    status:   status,
    pipeline: pipeline,
  }
代码语言:javascript复制
  func (g *remote) send(m raftpb.Message) {
    select {
  case g.pipeline.msgc <- m:
  default:
    if g.status.isActive() {

stop方法用来关闭pipeline。

代码语言:javascript复制
func (g *remote) stop() {
  g.pipeline.stop()
}
代码语言:javascript复制
func (g *remote) Pause() {
  g.stop()
}
代码语言:javascript复制
func (g *remote) Resume() {
  g.pipeline.start()
}

Peer是一个接口,定义了集群内部节点之间通信的核心方法,peer实现了这个接口:

代码语言:javascript复制
type Peer interface {
  // send sends the message to the remote peer. The function is non-blocking
  // and has no promise that the message will be received by the remote.
  // When it fails to send message out, it will report the status to underlying
  // raft.
  send(m raftpb.Message)


  // sendSnap sends the merged snapshot message to the remote peer. Its behavior
  // is similar to send.
  sendSnap(m snap.Message)


  // update updates the urls of remote peer.
  update(urls types.URLs)


  // attachOutgoingConn attaches the outgoing connection to the peer for
  // stream usage. After the call, the ownership of the outgoing
  // connection hands over to the peer. The peer will close the connection
  // when it is no longer used.
  attachOutgoingConn(conn *outgoingConn)
  // activeSince returns the time that the connection with the
  // peer becomes active.
  activeSince() time.Time
  // stop performs any necessary finalization and terminates the peer
  // elegantly.
  stop()
}

peer的属性包括raft对象和前面提到的pipeline,以及stream

代码语言:javascript复制
type peer struct {
  lg *zap.Logger


  localID types.ID
  // id of the remote raft peer node
  id types.ID


  r Raft


  status *peerStatus


  picker *urlPicker


  msgAppV2Writer *streamWriter
  writer         *streamWriter
  pipeline       *pipeline
  snapSender     *snapshotSender // snapshot sender to send v3 snapshot messages
  msgAppV2Reader *streamReader
  msgAppReader   *streamReader


  recvc chan raftpb.Message
  propc chan raftpb.Message


  mu     sync.Mutex
  paused bool


  cancel context.CancelFunc // cancel pending works in go routine created by peer.
  stopc  chan struct{}
}

peer 代表了一个远程的raft节点,有两种方式把消息发送出去,stream和pipeline。stream是通过 long-polling 连接的。除了一般的流还有一个优化的流用来发送msgApp,发送大消息的分片。只有raft的leader会使用这个优化流来把消息发送给follower。

pipeline是一系列的http客户端,用来发送请求到remote,只是用在stream还没有被建立阶段。

代码语言:javascript复制
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
   pipeline := &pipeline{
    peerID:        peerID,
    tr:            t,
    picker:        picker,
    status:        status,
    followerStats: fs,
    raft:          r,
    errorc:        errorc,
  }
  pipeline.start()
      p := &peer{
    lg:             t.Logger,
    localID:        t.ID,
    id:             peerID,
    r:              r,
    status:         status,
    picker:         picker,
    msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    pipeline:       pipeline,
    snapSender:     newSnapshotSender(t, picker, peerID, status),
    recvc:          make(chan raftpb.Message, recvBufSize),
    propc:          make(chan raftpb.Message, maxPendingProposals),
    stopc:          make(chan struct{}),
  }
    msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
    writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
   go func() {
    for {
      select {
      case mm := <-p.recvc:
        if err := r.Process(ctx, mm); err != nil {
          if t.Logger != nil {
            t.Logger.Warn("failed to process Raft message", zap.Error(err))
          }
        }
      case <-p.stopc:
        return
      }
    }
  }()
   go func() {
    for {
      select {
      case mm := <-p.propc:
        if err := r.Process(ctx, mm); err != nil {
          if t.Logger != nil {
            t.Logger.Warn("failed to process Raft message", zap.Error(err))
          }
        }
      case <-p.stopc:
        return
      }
    }
  }()
    p.msgAppV2Reader = &streamReader{
    p.msgAppReader = &streamReader{
    p.msgAppV2Reader.start()
    p.msgAppReader.start()

然后是send方法,负责选取节点,把消息写入对应的channel里面:

代码语言:javascript复制
func (p *peer) send(m raftpb.Message) {
  writec, name := p.pick(m)
  select {
  case writec <- m:

发送快照类似

代码语言:javascript复制
func (p *peer) sendSnap(m snap.Message) {
  go p.snapSender.send(m)
}

有节点变化的时候,会更新

代码语言:javascript复制
func (p *peer) update(urls types.URLs) {
  p.picker.update(urls)
}
代码语言:javascript复制
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
   switch conn.t {
  case streamTypeMsgAppV2:
    ok = p.msgAppV2Writer.attach(conn)
  case streamTypeMessage:
    ok = p.writer.attach(conn)

接受其他节点消息的http服务接口定义如下:

代码语言:javascript复制
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    conn := &outgoingConn{
    t:       t,
    Writer:  w,
    Flusher: w.(http.Flusher),
    Closer:  c,
    localID: h.tr.ID,
    peerID:  from,
  }
 // 初始化 stream writer
p.attachOutgoingConn(conn)
代码语言:javascript复制
func (p *peer) activeSince() time.Time { return p.status.activeSince() }

暂停、关闭方法也是类似的

代码语言:javascript复制
func (p *peer) Pause() {
  p.msgAppReader.pause()
  p.msgAppV2Reader.pause()
代码语言:javascript复制
func (p *peer) Resume() {
  p.msgAppReader.resume()
  p.msgAppV2Reader.resume()
代码语言:javascript复制
func (p *peer) stop() {
  p.msgAppV2Writer.stop()
  p.writer.stop()
  p.pipeline.stop()
  p.snapSender.stop()
  p.msgAppV2Reader.stop()
  p.msgAppReader.stop()

pick会根据消息类型选择不同的消息前缀

代码语言:javascript复制
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
   if isMsgSnap(m) {
    return p.pipeline.msgc, pipelineMsg
  } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
    return writec, streamAppV2
  } else if writec, ok = p.writer.writec(); ok {
    return writec, streamMsg
  }
代码语言:javascript复制
func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
代码语言:javascript复制
func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }

至此消息发送方法定义完毕。

0 人点赞