下面我们来到更底一层,分别分析下pipeline和stream的实现,pipeline通过固定数量的goroutine 来分发处理消息:
代码语言:javascript复制type pipeline struct {
peerID types.ID
tr *Transport
picker *urlPicker
status *peerStatus
raft Raft
errorc chan error
// deprecate when we depercate v2 API
followerStats *stats.FollowerStats
msgc chan raftpb.Message
// wait for the handling routines
wg sync.WaitGroup
stopc chan struct{}
}
核心逻辑位于start和handle方法中
代码语言:javascript复制func (p *pipeline) start() {
for i := 0; i < connPerPipeline; i {
go p.handle()
}
代码语言:javascript复制func (p *pipeline) stop() {
close(p.stopc)
p.wg.Wait()
当channel 里面有消息可以消费的时候,调用http 的post方法通过protobuf协议将消息发送出去:
代码语言:javascript复制func (p *pipeline) handle() {
defer p.wg.Done()
for {
select {
case m := <-p.msgc:
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))
代码语言:javascript复制func (p *pipeline) post(data []byte) (err error) {
req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
resp, err := p.tr.pipelineRt.RoundTrip(req)
done <- struct{}{}
stream分为writer和reader两类:
代码语言:javascript复制type streamWriter struct {
lg *zap.Logger
localID types.ID
peerID types.ID
status *peerStatus
fs *stats.FollowerStats
r Raft
mu sync.Mutex // guard field working and closer
closer io.Closer
working bool
msgc chan raftpb.Message
connc chan *outgoingConn
stopc chan struct{}
done chan struct{}
}
初始化的时候会起一个协程
代码语言:javascript复制func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
w := &streamWriter{
go w.run()
在协程内部根据消息类型进行分发处理:
代码语言:javascript复制func (cw *streamWriter) run() {
flusher http.Flusher
for {
select {
case <-heartbeatc:
if err == nil {
flusher.Flush()
case m := <-msgc:
err := enc.encode(&m)
if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush()
case conn := <-cw.connc:
cw.mu.Lock()
closed := cw.closeUnlocked()
t = conn.t
switch conn.t {
flusher = conn.Flusher
case <-cw.stopc:
if cw.close() {
代码语言:javascript复制func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
代码语言:javascript复制func (cw *streamWriter) close() bool {
代码语言:javascript复制func (cw *streamWriter) closeUnlocked() bool {
当新连接建立的时候会通过attach方法将连接建立消息传递进来,这是消息循环的入口
代码语言:javascript复制func (cw *streamWriter) attach(conn *outgoingConn) bool {
select {
case cw.connc <- conn:
代码语言:javascript复制func (cw *streamWriter) stop() {
reader的实现类似
代码语言:javascript复制type streamReader struct {
lg *zap.Logger
peerID types.ID
typ streamType
tr *Transport
picker *urlPicker
status *peerStatus
recvc chan<- raftpb.Message
propc chan<- raftpb.Message
rl *rate.Limiter // alters the frequency of dial retrial attempts
errorc chan<- error
mu sync.Mutex
paused bool
closer io.Closer
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}
也是在start的时候启动goroutine
代码语言:javascript复制func (cr *streamReader) start() {
go cr.run()
代码语言:javascript复制func (cr *streamReader) run() {
for {
rc, err := cr.dial(t)
cr.status.activate()
err = cr.decodeLoop(rc, t)
err = cr.rl.Wait(cr.ctx)
close(cr.done)
不同的是,它是根据不同的消息类型进行不同的处理
代码语言:javascript复制func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
switch t {
case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
dec = &messageDecoder{r: rc}
for {
m, err := dec.decode()
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
select {
case recvc <- m:
default:
代码语言:javascript复制 func (cr *streamReader) stop() {
dial方法会发起http请求
代码语言:javascript复制 func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
req, err := http.NewRequest(http.MethodGet, uu.String(), nil)
req = req.WithContext(cr.ctx)
switch resp.StatusCode {
case http.StatusGone:
代码语言:javascript复制func (cr *streamReader) close() {
代码语言:javascript复制func (cr *streamReader) pause() {
代码语言:javascript复制func (cr *streamReader) resume() {
代码语言:javascript复制func checkStreamSupport(v *semver.Version, t streamType) bool {