解读gobgp(三)

2023-05-04 16:52:18 浏览数 (1)

声明

本章主要介绍BGP消息的处理流。

1 BGP消息处理

当fsmHandler的状态走到establish(./pkg/server/fsm.go):

代码语言:go复制
func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
    ...

	go h.sendMessageloop(ctx, &wg)
	h.msgCh = h.incoming // 这个incoming来自于fsm的incoming
    // 这里会循环接收conn来的BGP消息,这里面会调用recvMessageWithError
	go h.recvMessageloop(ctx, &wg)
    ...

进入到recvMessageWithError(./pkg/server/fsm.go),这里会对BGP消息进行反序列化:

代码语言:go复制
func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
	...
    // 这里反序列化BGP消息
	headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH)
	if err != nil {
		sendToStateReasonCh(fsmReadFailed, nil)
		return nil, err
	}

	hd := &bgp.BGPHeader{}
	err = hd.DecodeFromBytes(headerBuf)
	...

	bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH)
	...

	switch handling {
	case bgp.ERROR_HANDLING_AFISAFI_DISABLE:
		fmsg.MsgData = m
		return fmsg, nil
	case bgp.ERROR_HANDLING_SESSION_RESET:
		...
		fmsg.MsgData = err
		return fmsg, err
	default:
		...

		if establishedState {
			switch m.Header.Type {
			case bgp.BGP_MSG_ROUTE_REFRESH:
				fmsg.MsgType = fsmMsgRouteRefresh
			case bgp.BGP_MSG_UPDATE:
				...
				fallthrough
			case bgp.BGP_MSG_KEEPALIVE:
				// 注意ka消息不会被server处理
				if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
					return nil, nil
				}
			case bgp.BGP_MSG_NOTIFICATION:
				...
				return nil, nil
			}
		}
	}
    // 返回的fmsg会被送入fsmHandler的msgCh中
	return fmsg, nil
}

注意这里的msgCh在establish中被incoming赋值,而fsmHandler的incoming被fsm的incomingCh赋值,实际相当于把消息送到了fsm的incomingCh中

而fsm的incomingCh会被加入到BgpServer的incomings中,在BgpServer的Serve(./pkg/server/server.go)方法中会不断扫描incomings从中获取消息:

代码语言:go复制
func (s *BgpServer) Serve() {
    ...

	for {
        // 这里会扫描api接口消息和各个peer fsm的消息
		cases := make([]reflect.SelectCase, firstPeerCaseIndex len(s.incomings))
		...

		chosen, value, ok := reflect.Select(cases)
		switch chosen {
		case 0: // 处理api消息
			op := value.Interface().(*mgmtOp)
			s.handleMGMTOp(op)
		case 1: // 处理连接
			conn := value.Interface().(*net.TCPConn)
			s.passConnToPeer(conn)
		case 2: // 处理ROA事件
			ev := value.Interface().(*roaEvent)
			s.roaManager.HandleROAEvent(ev)
		default: // 这里会处理BGP消息
			if ok {
				e := value.Interface().(*fsmMsg)
				handlefsmMsg(e)
			}
		}
	}
}

点评

BGP消息的流转挺繁琐的,消息在channel中不断被转移,还是缺乏层次,gobgp的所有消息处理都需要经过server的Serve方法,这么设计可能是考虑api消息和BGP消息并发时的冲突,但也成为了消息处理的瓶颈。

0 人点赞