golang源码分析:raft(12)

2023-09-07 09:06:08 浏览数 (2)

下面我们来到更底一层,分别分析下pipeline和stream的实现,pipeline通过固定数量的goroutine 来分发处理消息:

代码语言:javascript复制
type pipeline struct {
  peerID types.ID


  tr     *Transport
  picker *urlPicker
  status *peerStatus
  raft   Raft
  errorc chan error
  // deprecate when we depercate v2 API
  followerStats *stats.FollowerStats


  msgc chan raftpb.Message
  // wait for the handling routines
  wg    sync.WaitGroup
  stopc chan struct{}
}

核心逻辑位于start和handle方法中

代码语言:javascript复制
func (p *pipeline) start() {
      for i := 0; i < connPerPipeline; i   {
    go p.handle()
  }
代码语言:javascript复制
func (p *pipeline) stop() {
    close(p.stopc)
    p.wg.Wait()

当channel 里面有消息可以消费的时候,调用http 的post方法通过protobuf协议将消息发送出去:

代码语言:javascript复制
func (p *pipeline) handle() {
    defer p.wg.Done()
    for {
    select {
    case m := <-p.msgc:
      start := time.Now()
      err := p.post(pbutil.MustMarshal(&m))
代码语言:javascript复制
func (p *pipeline) post(data []byte) (err error) {
   req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
   resp, err := p.tr.pipelineRt.RoundTrip(req)
  done <- struct{}{}

stream分为writer和reader两类:

代码语言:javascript复制
type streamWriter struct {
  lg *zap.Logger


  localID types.ID
  peerID  types.ID


  status *peerStatus
  fs     *stats.FollowerStats
  r      Raft


  mu      sync.Mutex // guard field working and closer
  closer  io.Closer
  working bool


  msgc  chan raftpb.Message
  connc chan *outgoingConn
  stopc chan struct{}
  done  chan struct{}
}

初始化的时候会起一个协程

代码语言:javascript复制
func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
      w := &streamWriter{
      go w.run()

在协程内部根据消息类型进行分发处理:

代码语言:javascript复制
func (cw *streamWriter) run() {
      flusher    http.Flusher
      for {
    select {
    case <-heartbeatc:
              if err == nil {
        flusher.Flush()
      case m := <-msgc:
      err := enc.encode(&m)
                if len(msgc) == 0 || batched > streamBufSize/2 {
          flusher.Flush()
          case conn := <-cw.connc:
      cw.mu.Lock()
      closed := cw.closeUnlocked()
      t = conn.t
      switch conn.t {
        flusher = conn.Flusher
        case <-cw.stopc:
      if cw.close() {
代码语言:javascript复制
func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
代码语言:javascript复制
func (cw *streamWriter) close() bool {
代码语言:javascript复制
func (cw *streamWriter) closeUnlocked() bool {

当新连接建立的时候会通过attach方法将连接建立消息传递进来,这是消息循环的入口

代码语言:javascript复制
func (cw *streamWriter) attach(conn *outgoingConn) bool {
  select {
  case cw.connc <- conn:
代码语言:javascript复制
func (cw *streamWriter) stop() {

reader的实现类似

代码语言:javascript复制
type streamReader struct {
  lg *zap.Logger


  peerID types.ID
  typ    streamType


  tr     *Transport
  picker *urlPicker
  status *peerStatus
  recvc  chan<- raftpb.Message
  propc  chan<- raftpb.Message


  rl *rate.Limiter // alters the frequency of dial retrial attempts


  errorc chan<- error


  mu     sync.Mutex
  paused bool
  closer io.Closer


  ctx    context.Context
  cancel context.CancelFunc
  done   chan struct{}
}

也是在start的时候启动goroutine

代码语言:javascript复制
func (cr *streamReader) start() {
      go cr.run()
代码语言:javascript复制
func (cr *streamReader) run() {
        for {
    rc, err := cr.dial(t)
      cr.status.activate()
      err = cr.decodeLoop(rc, t)
      err = cr.rl.Wait(cr.ctx)
      close(cr.done)

不同的是,它是根据不同的消息类型进行不同的处理

代码语言:javascript复制
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
  switch t {
  case streamTypeMsgAppV2:
    dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
  case streamTypeMessage:
    dec = &messageDecoder{r: rc}
        for {
    m, err := dec.decode()
      receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
      select {
    case recvc <- m:
    default:
代码语言:javascript复制
    func (cr *streamReader) stop() {

dial方法会发起http请求

代码语言:javascript复制
    func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
      uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
      req, err := http.NewRequest(http.MethodGet, uu.String(), nil)
      req = req.WithContext(cr.ctx)
        switch resp.StatusCode {
  case http.StatusGone:
代码语言:javascript复制
func (cr *streamReader) close() {
代码语言:javascript复制
func (cr *streamReader) pause() {
代码语言:javascript复制
func (cr *streamReader) resume() {
代码语言:javascript复制
func checkStreamSupport(v *semver.Version, t streamType) bool {

0 人点赞