前面提到transport将远程对象分为两类:remote和peer,分别代表新建立的连接和已经加入集群的节点,下面简单分析下它们的核心逻辑:
代码语言:javascript复制type remote struct {
lg *zap.Logger
localID types.ID
id types.ID
status *peerStatus
pipeline *pipeline
}
初始化remote的时候会启动一个pipline对象,里面会通过channel将消息分发给handler协程,然后走http协议发送出去:
代码语言:javascript复制func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
pipeline := &pipeline{
peerID: id,
tr: tr,
picker: picker,
status: status,
raft: tr.Raft,
errorc: tr.ErrorC,
}
pipeline.start()
return &remote{
lg: tr.Logger,
localID: tr.ID,
id: id,
status: status,
pipeline: pipeline,
}
代码语言:javascript复制 func (g *remote) send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
if g.status.isActive() {
stop方法用来关闭pipeline。
代码语言:javascript复制func (g *remote) stop() {
g.pipeline.stop()
}
代码语言:javascript复制func (g *remote) Pause() {
g.stop()
}
代码语言:javascript复制func (g *remote) Resume() {
g.pipeline.start()
}
Peer是一个接口,定义了集群内部节点之间通信的核心方法,peer实现了这个接口:
代码语言:javascript复制type Peer interface {
// send sends the message to the remote peer. The function is non-blocking
// and has no promise that the message will be received by the remote.
// When it fails to send message out, it will report the status to underlying
// raft.
send(m raftpb.Message)
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
// is similar to send.
sendSnap(m snap.Message)
// update updates the urls of remote peer.
update(urls types.URLs)
// attachOutgoingConn attaches the outgoing connection to the peer for
// stream usage. After the call, the ownership of the outgoing
// connection hands over to the peer. The peer will close the connection
// when it is no longer used.
attachOutgoingConn(conn *outgoingConn)
// activeSince returns the time that the connection with the
// peer becomes active.
activeSince() time.Time
// stop performs any necessary finalization and terminates the peer
// elegantly.
stop()
}
peer的属性包括raft对象和前面提到的pipeline,以及stream
代码语言:javascript复制type peer struct {
lg *zap.Logger
localID types.ID
// id of the remote raft peer node
id types.ID
r Raft
status *peerStatus
picker *urlPicker
msgAppV2Writer *streamWriter
writer *streamWriter
pipeline *pipeline
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
msgAppV2Reader *streamReader
msgAppReader *streamReader
recvc chan raftpb.Message
propc chan raftpb.Message
mu sync.Mutex
paused bool
cancel context.CancelFunc // cancel pending works in go routine created by peer.
stopc chan struct{}
}
peer 代表了一个远程的raft节点,有两种方式把消息发送出去,stream和pipeline。stream是通过 long-polling 连接的。除了一般的流还有一个优化的流用来发送msgApp,发送大消息的分片。只有raft的leader会使用这个优化流来把消息发送给follower。
pipeline是一系列的http客户端,用来发送请求到remote,只是用在stream还没有被建立阶段。
代码语言:javascript复制func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
pipeline := &pipeline{
peerID: peerID,
tr: t,
picker: picker,
status: status,
followerStats: fs,
raft: r,
errorc: errorc,
}
pipeline.start()
p := &peer{
lg: t.Logger,
localID: t.ID,
id: peerID,
r: r,
status: status,
picker: picker,
msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
pipeline: pipeline,
snapSender: newSnapshotSender(t, picker, peerID, status),
recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals),
stopc: make(chan struct{}),
}
msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
}
}
case <-p.stopc:
return
}
}
}()
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
}
}
case <-p.stopc:
return
}
}
}()
p.msgAppV2Reader = &streamReader{
p.msgAppReader = &streamReader{
p.msgAppV2Reader.start()
p.msgAppReader.start()
然后是send方法,负责选取节点,把消息写入对应的channel里面:
代码语言:javascript复制func (p *peer) send(m raftpb.Message) {
writec, name := p.pick(m)
select {
case writec <- m:
发送快照类似
代码语言:javascript复制func (p *peer) sendSnap(m snap.Message) {
go p.snapSender.send(m)
}
有节点变化的时候,会更新
代码语言:javascript复制func (p *peer) update(urls types.URLs) {
p.picker.update(urls)
}
代码语言:javascript复制func (p *peer) attachOutgoingConn(conn *outgoingConn) {
switch conn.t {
case streamTypeMsgAppV2:
ok = p.msgAppV2Writer.attach(conn)
case streamTypeMessage:
ok = p.writer.attach(conn)
接受其他节点消息的http服务接口定义如下:
代码语言:javascript复制func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
conn := &outgoingConn{
t: t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
localID: h.tr.ID,
peerID: from,
}
// 初始化 stream writer
p.attachOutgoingConn(conn)
代码语言:javascript复制func (p *peer) activeSince() time.Time { return p.status.activeSince() }
暂停、关闭方法也是类似的
代码语言:javascript复制func (p *peer) Pause() {
p.msgAppReader.pause()
p.msgAppV2Reader.pause()
代码语言:javascript复制func (p *peer) Resume() {
p.msgAppReader.resume()
p.msgAppV2Reader.resume()
代码语言:javascript复制func (p *peer) stop() {
p.msgAppV2Writer.stop()
p.writer.stop()
p.pipeline.stop()
p.snapSender.stop()
p.msgAppV2Reader.stop()
p.msgAppReader.stop()
pick会根据消息类型选择不同的消息前缀
代码语言:javascript复制func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
return writec, streamAppV2
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
代码语言:javascript复制func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
代码语言:javascript复制func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
至此消息发送方法定义完毕。