解读gobgp(二)

2023-05-04 16:11:37 浏览数 (1)

声明

关于解读gobgp断更了好久,再次回来进行填坑。

本章主要讲解gobgp中的邻居连接处理。首先讲一下怎么从哪里开始学习gobgp的代码。

1 代码入口

gobgp的代码入口可以查从./cmd/gobgpd/main.go中的main方法开始走读:

代码语言:go复制
func main() {
    // 开始是一些命令行解析以及证书、日志和grpc之类的设置

	logger.Info("gobgpd started")
    // 这里是创建一个BgpServer的对象,注意这里面只是启动了grpc server,并没有发起bgp server的监听
	bgpServer := server.NewBgpServer(server.GrpcListenAddress(opts.GrpcHosts), server.GrpcOption(grpcOpts), server.LoggerOption(&builtinLogger{logger: logger}))
    // 新的版本已经增加了prometheus的支持
	prometheus.MustRegister(metrics.NewBgpCollector(bgpServer))
	go bgpServer.Serve()

	if opts.UseSdNotify {
		if status, err := daemon.SdNotify(false, daemon.SdNotifyReady); !status {
			if err != nil {
				logger.Warnf("Failed to send notification via sd_notify(): %s", err)
			} else {
				logger.Warnf("The socket sd_notify() isn't available")
			}
		}
	}

	if opts.ConfigFile == "" {
		<-sigCh
		stopServer(bgpServer, opts.UseSdNotify)
		return
	}

	signal.Notify(sigCh, syscall.SIGHUP)

    // 注意这里,在读取配置文件的时候会启动bgp server的监听(这里的设计不是很好,但是考虑到设计者可能是希望通过命令行配置启动bgp server)
	initialConfig, err := config.ReadConfigFile(opts.ConfigFile, opts.ConfigType)
	if err != nil {
		logger.WithFields(logrus.Fields{
			"Topic": "Config",
			"Error": err,
		}).Fatalf("Can't read config file %s", opts.ConfigFile)
	}
	logger.WithFields(logrus.Fields{
		"Topic": "Config",
	}).Info("Finished reading the config file")

	currentConfig, err := config.InitialConfig(context.Background(), bgpServer, initialConfig, opts.GracefulRestart)
	if err != nil {
		logger.WithFields(logrus.Fields{
			"Topic": "Config",
			"Error": err,
		}).Fatalf("Failed to apply initial configuration %s", opts.ConfigFile)
	}

	for sig := range sigCh {
		if sig != syscall.SIGHUP {
			stopServer(bgpServer, opts.UseSdNotify)
			return
		}

		logger.WithFields(logrus.Fields{
			"Topic": "Config",
		}).Info("Reload the config file")
		newConfig, err := config.ReadConfigFile(opts.ConfigFile, opts.ConfigType)
		if err != nil {
			logger.WithFields(logrus.Fields{
				"Topic": "Config",
				"Error": err,
			}).Warningf("Can't read config file %s", opts.ConfigFile)
			continue
		}

		currentConfig, err = config.UpdateConfig(context.Background(), bgpServer, currentConfig, newConfig)
		if err != nil {
			logrus.WithFields(logrus.Fields{
				"Topic": "Config",
				"Error": err,
			}).Warningf("Failed to update config %s", opts.ConfigFile)
			continue
		}
	}
}

2 启动BGP连接的监听

在上述加载配置时会调用./pkg/server/server.go中的StartBGP

代码语言:go复制
func (s *BgpServer) StartBgp(ctx context.Context, r *api.StartBgpRequest) error {
	if r == nil || r.Global == nil {
		return fmt.Errorf("nil request")
	}
	return s.mgmtOperation(func() error {
        ...
		if c.Config.Port > 0 {
			acceptCh := make(chan *net.TCPConn, 32)
			for _, addr := range c.Config.LocalAddressList {
                // 这里会启动BGP TCP的监听
				l, err := newTCPListener(s.logger, addr, uint32(c.Config.Port), g.BindToDevice, acceptCh)
				if err != nil {
					return err
				}
				s.listeners = append(s.listeners, l)
			}
			s.acceptCh = acceptCh
		}
        ...
		return nil
	}, false)
}

func newTCPListener(logger log.Logger, address string, port uint32, bindToDev string, ch chan *net.TCPConn) (*tcpListener, error) {
    ...
	go func() error {
		for {
			conn, err := listener.AcceptTCP()
			if err != nil {
				close(closeCh)
				if !errors.Is(err, net.ErrClosed) {
					logger.Warn("Failed to AcceptTCP",
						log.Fields{
							"Topic": "Peer",
							"Error": err,
						})
				}
				return err
			}
            // 这里将accept的connection放到BgpServer的accecptCh中
			ch <- conn
		}
	}()
	return &tcpListener{
		l:  listener,
		ch: closeCh,
	}, nil
}

3 neighbor(peer) 和 conn 关联

BgpServer有一个Serve方法(./pkg/server/server.go),这个方法比较复杂:

代码语言:go复制
func (s *BgpServer) Serve() {
    ...

	for {
		cases := make([]reflect.SelectCase, firstPeerCaseIndex len(s.incomings))
		...

		chosen, value, ok := reflect.Select(cases)
		switch chosen {
		case 0:
			op := value.Interface().(*mgmtOp)
			s.handleMGMTOp(op)
		case 1:
			conn := value.Interface().(*net.TCPConn)
            // 这里会把conn交给peer
			s.passConnToPeer(conn)
		case 2:
			ev := value.Interface().(*roaEvent)
			s.roaManager.HandleROAEvent(ev)
		default:
			// in the case of dynamic peer, handleFSMMessage closed incoming channel so
			// nil fsmMsg can happen here.
			if ok {
				e := value.Interface().(*fsmMsg)
				handlefsmMsg(e)
			}
		}
	}
}

然后会调用peer的PassConn(./pkg/server/peer.go):

代码语言:go复制
func (peer *peer) PassConn(conn *net.TCPConn) {
	select {
    // 这里把conn先放到fsm的一个connCh里面
	case peer.fsm.connCh <- conn:
	default:
		conn.Close()
		peer.fsm.logger.Warn("accepted conn is closed to avoid be blocked",
			log.Fields{
				"Topic": "Peer",
				"Key":   peer.ID()})
	}
}

然后推动peer fsm的状态机,如果状态正确,则会把connCh中的conn赋值给fsm的conn(./pkg/server/fsm.go),会话建立成功后会赋值给fsmHandler的conn(./pkg/server/fsm.go)

点评

gobgp中server、peer、fsm的逻辑有些混乱,代码都放在一个包里,很多方法都是调来调去,没有层次,fsm和fsmHandler使逻辑更加复杂了。

0 人点赞