声明
关于解读gobgp断更了好久,再次回来进行填坑。
本章主要讲解gobgp中的邻居连接处理。首先讲一下怎么从哪里开始学习gobgp的代码。
1 代码入口
gobgp的代码入口可以查从./cmd/gobgpd/main.go
中的main方法开始走读:
func main() {
// 开始是一些命令行解析以及证书、日志和grpc之类的设置
logger.Info("gobgpd started")
// 这里是创建一个BgpServer的对象,注意这里面只是启动了grpc server,并没有发起bgp server的监听
bgpServer := server.NewBgpServer(server.GrpcListenAddress(opts.GrpcHosts), server.GrpcOption(grpcOpts), server.LoggerOption(&builtinLogger{logger: logger}))
// 新的版本已经增加了prometheus的支持
prometheus.MustRegister(metrics.NewBgpCollector(bgpServer))
go bgpServer.Serve()
if opts.UseSdNotify {
if status, err := daemon.SdNotify(false, daemon.SdNotifyReady); !status {
if err != nil {
logger.Warnf("Failed to send notification via sd_notify(): %s", err)
} else {
logger.Warnf("The socket sd_notify() isn't available")
}
}
}
if opts.ConfigFile == "" {
<-sigCh
stopServer(bgpServer, opts.UseSdNotify)
return
}
signal.Notify(sigCh, syscall.SIGHUP)
// 注意这里,在读取配置文件的时候会启动bgp server的监听(这里的设计不是很好,但是考虑到设计者可能是希望通过命令行配置启动bgp server)
initialConfig, err := config.ReadConfigFile(opts.ConfigFile, opts.ConfigType)
if err != nil {
logger.WithFields(logrus.Fields{
"Topic": "Config",
"Error": err,
}).Fatalf("Can't read config file %s", opts.ConfigFile)
}
logger.WithFields(logrus.Fields{
"Topic": "Config",
}).Info("Finished reading the config file")
currentConfig, err := config.InitialConfig(context.Background(), bgpServer, initialConfig, opts.GracefulRestart)
if err != nil {
logger.WithFields(logrus.Fields{
"Topic": "Config",
"Error": err,
}).Fatalf("Failed to apply initial configuration %s", opts.ConfigFile)
}
for sig := range sigCh {
if sig != syscall.SIGHUP {
stopServer(bgpServer, opts.UseSdNotify)
return
}
logger.WithFields(logrus.Fields{
"Topic": "Config",
}).Info("Reload the config file")
newConfig, err := config.ReadConfigFile(opts.ConfigFile, opts.ConfigType)
if err != nil {
logger.WithFields(logrus.Fields{
"Topic": "Config",
"Error": err,
}).Warningf("Can't read config file %s", opts.ConfigFile)
continue
}
currentConfig, err = config.UpdateConfig(context.Background(), bgpServer, currentConfig, newConfig)
if err != nil {
logrus.WithFields(logrus.Fields{
"Topic": "Config",
"Error": err,
}).Warningf("Failed to update config %s", opts.ConfigFile)
continue
}
}
}
2 启动BGP连接的监听
在上述加载配置时会调用./pkg/server/server.go
中的StartBGP
func (s *BgpServer) StartBgp(ctx context.Context, r *api.StartBgpRequest) error {
if r == nil || r.Global == nil {
return fmt.Errorf("nil request")
}
return s.mgmtOperation(func() error {
...
if c.Config.Port > 0 {
acceptCh := make(chan *net.TCPConn, 32)
for _, addr := range c.Config.LocalAddressList {
// 这里会启动BGP TCP的监听
l, err := newTCPListener(s.logger, addr, uint32(c.Config.Port), g.BindToDevice, acceptCh)
if err != nil {
return err
}
s.listeners = append(s.listeners, l)
}
s.acceptCh = acceptCh
}
...
return nil
}, false)
}
func newTCPListener(logger log.Logger, address string, port uint32, bindToDev string, ch chan *net.TCPConn) (*tcpListener, error) {
...
go func() error {
for {
conn, err := listener.AcceptTCP()
if err != nil {
close(closeCh)
if !errors.Is(err, net.ErrClosed) {
logger.Warn("Failed to AcceptTCP",
log.Fields{
"Topic": "Peer",
"Error": err,
})
}
return err
}
// 这里将accept的connection放到BgpServer的accecptCh中
ch <- conn
}
}()
return &tcpListener{
l: listener,
ch: closeCh,
}, nil
}
3 neighbor(peer) 和 conn 关联
BgpServer有一个Serve方法(./pkg/server/server.go),这个方法比较复杂:
代码语言:go复制func (s *BgpServer) Serve() {
...
for {
cases := make([]reflect.SelectCase, firstPeerCaseIndex len(s.incomings))
...
chosen, value, ok := reflect.Select(cases)
switch chosen {
case 0:
op := value.Interface().(*mgmtOp)
s.handleMGMTOp(op)
case 1:
conn := value.Interface().(*net.TCPConn)
// 这里会把conn交给peer
s.passConnToPeer(conn)
case 2:
ev := value.Interface().(*roaEvent)
s.roaManager.HandleROAEvent(ev)
default:
// in the case of dynamic peer, handleFSMMessage closed incoming channel so
// nil fsmMsg can happen here.
if ok {
e := value.Interface().(*fsmMsg)
handlefsmMsg(e)
}
}
}
}
然后会调用peer的PassConn(./pkg/server/peer.go):
代码语言:go复制func (peer *peer) PassConn(conn *net.TCPConn) {
select {
// 这里把conn先放到fsm的一个connCh里面
case peer.fsm.connCh <- conn:
default:
conn.Close()
peer.fsm.logger.Warn("accepted conn is closed to avoid be blocked",
log.Fields{
"Topic": "Peer",
"Key": peer.ID()})
}
}
然后推动peer fsm的状态机,如果状态正确,则会把connCh中的conn赋值给fsm的conn(./pkg/server/fsm.go),会话建立成功后会赋值给fsmHandler的conn(./pkg/server/fsm.go)
点评
gobgp中server、peer、fsm的逻辑有些混乱,代码都放在一个包里,很多方法都是调来调去,没有层次,fsm和fsmHandler使逻辑更加复杂了。