源码级分析Mongo升级断连问题

2023-08-19 09:30:43 浏览数 (1)

问题描述

代码语言:javascript复制
server selection error: server selection timeout, current topology: { Type: ReplicaSetNoPrimary, Servers: [] }

操作记录

集群信息:三个节点(一主两副) 当前版本4.2 升级到版本4.4

问题排查

通过google也没有找到任何有用的信息,只能从源码入手,下面先从各个部分分开看,最后会来一个总结。

源码版本:mongo-driver1.11.2

集群描述
代码语言:javascript复制
type Config struct {
   Mode MonitorMode
   ReplicaSetName string
   SeedList []string
   ServerOpts []ServerOption
   URI string
   ServerSelectionTimeout time.Duration
   ServerMonitor *event.ServerMonitor
   SRVMaxHosts int
   SRVServiceName string
   LoadBalanced bool
}

通过一个config对象来描述一个mongo集群信息,包括:

  • 集群名
  • 节点列表
  • 访问URI
初始化Client

mongo-driver通过topo概念来表示一个集群Client对象

代码语言:javascript复制
func NewConfig(co *options.ClientOptions, clock *session.ClusterClock) (*Config, error) {
  // Hosts
  cfgp.SeedList = []string{"localhost:27017"} // default host
  if len(co.Hosts) > 0 {
     cfgp.SeedList = co.Hosts
  }
}

上面是先初始化topo配置对象,其中就有初始化实例节点列表(从访问URI里面获取)

代码语言:javascript复制
func NewClient(opts ...*options.ClientOptions) (*Client, error) {
  if client.deployment == nil {
     client.deployment, err = topology.New(cfg)
     if err != nil {
      return nil, replaceErrors(err)
     }
  }
  return client, nil
}

然后通过配置对象初始化topo

代码语言:javascript复制
func New(cfg *Config) (*Topology, error) {
   if cfg == nil {
      var err error
      cfg, err = NewConfig(options.Client(), nil)
      if err != nil {
         return nil, err
      }
   }

   t := &Topology{
      cfg: cfg,
      done: make(chan struct{}),
      pollingDone: make(chan struct{}),
      rescanSRVInterval: 60 * time.Second,
      fsm: newFSM(),
      subscribers: make(map[uint64]chan description.Topology),
      servers: make(map[address.Address]*Server),
      dnsResolver: dns.DefaultResolver,
      id: primitive.NewObjectID(),
   }
   t.desc.Store(description.Topology{})
   t.updateCallback = func(desc description.Server) description.Server {
      return t.apply(context.TODO(), desc)
   }

   if t.cfg.URI != "" {
      t.pollingRequired = strings.HasPrefix(t.cfg.URI, "mongodb srv://") && !t.cfg.LoadBalanced
   }

   t.publishTopologyOpeningEvent()

   return t, nil
}
连接集群
代码语言:javascript复制
func (c *Client) Connect(ctx context.Context) error {
   if connector, ok := c.deployment.(driver.Connector); ok {
      err := connector.Connect()
      if err != nil {
         return replaceErrors(err)
      }
   }

   。。。

   var updateChan <-chan description.Topology
   if subscriber, ok := c.deployment.(driver.Subscriber); ok {
      sub, err := subscriber.Subscribe()
      if err != nil {
         return replaceErrors(err)
      }
      updateChan = sub.Updates
   }
   c.sessionPool = session.NewPool(updateChan)
   return nil
}

上面的大致逻辑是:

  • 进行topo集群连接,包括节点的连接信息等
  • 订阅topo变更
  • 将topo变更channel存入session池中
topo连接
代码语言:javascript复制
func (t *Topology) Connect() error {
   if !atomic.CompareAndSwapInt64(&amp;t.state, topologyDisconnected, topologyConnecting) {
      return ErrTopologyConnected
   }

   t.desc.Store(description.Topology{})
   var err error
   t.serversLock.Lock()

   for _, a := range t.cfg.SeedList {
      addr := address.Address(a).Canonicalize()
      t.fsm.Servers = append(t.fsm.Servers, description.NewDefaultServer(addr))
   }

   switch {
   。。。。。。
   default:
      // In non-LB mode, we only publish an initial TopologyDescriptionChanged event from Unknown with no servers to
      // the current state (e.g. Unknown with one or more servers if we're discovering or Single with one server if
      // we're connecting directly). Other events are published when state changes occur due to responses in the
      // server monitoring goroutines.

      newDesc := description.Topology{
         Kind: t.fsm.Kind,
         Servers: t.fsm.Servers,
         SessionTimeoutMinutes: t.fsm.SessionTimeoutMinutes,
      }
      t.desc.Store(newDesc)
      t.publishTopologyDescriptionChangedEvent(description.Topology{}, t.fsm.Topology)
      for _, a := range t.cfg.SeedList {
         addr := address.Address(a).Canonicalize()
         err = t.addServer(addr)
         if err != nil {
            t.serversLock.Unlock()
            return err
         }
      }
   }
}

这里只看non-LB的情况:

  • 增加节点server到topo
  • 初始化节点连接,构造节点连接池,描述信息等
Client使用

下面我们来看下具体的mongo数据操作过程,比如find方法中:

代码语言:javascript复制
func (coll *Collection) Find(ctx context.Context, filter interface{},
  opts ...*options.FindOptions) (cur *Cursor, err error) {
  if sess == nil &amp;&amp; coll.client.sessionPool != nil {
     sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
   
   if err = op.Execute(ctx); err != nil {
    return nil, replaceErrors(err)
  }

  bc, err := op.Result(cursorOpts)
  if err != nil {
    return nil, replaceErrors(err)
  }
  return newCursorWithSession(bc, coll.registry, sess)
}

通过层层调用,进入到获取节点连接的方法中

获取集群节点连接

代码语言:javascript复制
// getServerAndConnection should be used to retrieve a Server and Connection to execute an operation.
func (op Operation) getServerAndConnection(ctx context.Context) (Server, Connection, error) {
   server, err := op.selectServer(ctx)
   if err != nil {
      return nil, nil, err
   }

   // If the provided client session has a pinned connection, it should be used for the operation because this
   // indicates that we're in a transaction and the target server is behind a load balancer.
   if op.Client != nil &amp;&amp; op.Client.PinnedConnection != nil {
      return server, op.Client.PinnedConnection, nil
   }

   // Otherwise, default to checking out a connection from the server's pool.
   conn, err := server.Connection(ctx)
   if err != nil {
      return nil, nil, err
   }

   return server, conn, nil
}

上面的逻辑大致就是:

  • 获取一个可用的sever节点
  • 从该节点对象的连接池中获取一个可用的连接对象

下面来看下具体的获取server节点的逻辑

代码语言:javascript复制
func (t *Topology) SelectServer(ctx context.Context, ss description.ServerSelector) (driver.Server, error) {
   if atomic.LoadInt64(&amp;t.state) != topologyConnected {
      return nil, ErrTopologyClosed
   }
   var ssTimeoutCh <-chan time.Time

   if t.cfg.ServerSelectionTimeout > 0 {
      ssTimeout := time.NewTimer(t.cfg.ServerSelectionTimeout)
      ssTimeoutCh = ssTimeout.C
      defer ssTimeout.Stop()
   }

   var doneOnce bool
   var sub *driver.Subscription
   selectionState := newServerSelectionState(ss, ssTimeoutCh)
   for {
      var suitable []description.Server
      var selectErr error

      if !doneOnce {
         // for the first pass, select a server from the current description.
         // this improves selection speed for up-to-date topology descriptions.
         suitable, selectErr = t.selectServerFromDescription(t.Description(), selectionState)
         doneOnce = true
      } else {
         // if the first pass didn't select a server, the previous description did not contain a suitable server, so
         // we subscribe to the topology and attempt to obtain a server from that subscription
         if sub == nil {
            var err error
            sub, err = t.Subscribe()
            if err != nil {
               return nil, err
            }
            defer t.Unsubscribe(sub)
         }

         suitable, selectErr = t.selectServerFromSubscription(ctx, sub.Updates, selectionState)
      }
      if selectErr != nil {
         return nil, selectErr
      }

      if len(suitable) == 0 {
         // try again if there are no servers available
         continue
      }

      。。。。。。
   }
}

上面的逻辑大致就是,启动一个for死循环,进行如下逻辑:

  • 获取topo的订阅对象,包含topo变更channel
  • 从channel中获取一个可用的server节点
  • 如果发生异常则退出循环
  • 否则一致循环获取,直到超时channel到来

下面来看下获取server的逻辑:

代码语言:javascript复制
// selectServerFromSubscription loops until a topology description is available for server selection. It returns
// when the given context expires, server selection timeout is reached, or a description containing a selectable
// server is available.
func (t *Topology) selectServerFromSubscription(ctx context.Context, subscriptionCh <-chan description.Topology,
   selectionState serverSelectionState) ([]description.Server, error) {

   current := t.Description()
   for {
      select {
      case <-ctx.Done():
         return nil, ServerSelectionError{Wrapped: ctx.Err(), Desc: current}
      case <-selectionState.timeoutChan:
         return nil, ServerSelectionError{Wrapped: ErrServerSelectionTimeout, Desc: current}
      case current = <-subscriptionCh:
      }

      suitable, err := t.selectServerFromDescription(current, selectionState)
      if err != nil {
         return nil, err
      }

      if len(suitable) > 0 {
         return suitable, nil
      }
      t.RequestImmediateCheck()
   }
}

根据上面问题的错误信息,可以知道,问题就出在这一句

代码语言:javascript复制
case <-selectionState.timeoutChan:
   return nil, ServerSelectionError{Wrapped: ErrServerSelectionTimeout, Desc: current}

说明一致没有获取到可用的server,直到超时,那么问题来了:

  • 这个topo变更channel是谁在往里面发送数据(包括服务器节点)
  • 为啥发送的节点都是不可用的

下面来一一解答

topo变更channel

在上面最开始初始化Client的时候,有这样一个设置:

代码语言:javascript复制
t.updateCallback = func(desc description.Server) description.Server {
   return t.apply(context.TODO(), desc)
}

上面设置了topo变更回调方法

代码语言:javascript复制
func (t *Topology) apply(ctx context.Context, desc description.Server) description.Server {
   t.serversLock.Lock()
   defer t.serversLock.Unlock()

   ind, ok := t.fsm.findServer(desc.Addr)
   if t.serversClosed || !ok {
      return desc
   }

   prev := t.fsm.Topology
   oldDesc := t.fsm.Servers[ind]
   if oldDesc.TopologyVersion.CompareToIncoming(desc.TopologyVersion) > 0 {
      return oldDesc
   }

   var current description.Topology
   current, desc = t.fsm.apply(desc)

   if !oldDesc.Equal(desc) {
      t.publishServerDescriptionChangedEvent(oldDesc, desc)
   }

   diff := diffTopology(prev, current)

   for _, removed := range diff.Removed {
      if s, ok := t.servers[removed.Addr]; ok {
         go func() {
            cancelCtx, cancel := context.WithCancel(ctx)
            cancel()
            _ = s.Disconnect(cancelCtx)
         }()
         delete(t.servers, removed.Addr)
         t.publishServerClosedEvent(s.address)
      }
   }

   for _, added := range diff.Added {
      _ = t.addServer(added.Addr)
   }

   t.desc.Store(current)
   if !prev.Equal(current) {
      t.publishTopologyDescriptionChangedEvent(prev, current)
   }

   t.subLock.Lock()
   for _, ch := range t.subscribers {
      // We drain the description if there's one in the channel
      select {
      case <-ch:
      default:
      }
      ch <- current
   }
   t.subLock.Unlock()

   return desc
}

上面的大致逻辑就是:

  • 找到旧topo中的节点列表跟新的topo中的节点列表进行对比
  • 关闭已经不存在的节点连接
  • 创建新增节点的连接
  • 将新的topo发送到topo变更channel中
  • 。。。

这就回到了上面的第一个问题,

那么又是在哪里触发的topo更新回调的呢:

代码语言:javascript复制
func ConnectServer(addr address.Address, updateCallback updateTopologyCallback, topologyID primitive.ObjectID, opts ...ServerOption) (*Server, error) {
   srvr := NewServer(addr, topologyID, opts...)
   err := srvr.Connect(updateCallback)
   if err != nil {
      return nil, err
   }
   return srvr, nil
}

// This method must be called before a Server can be used.
func (s *Server) Connect(updateCallback updateTopologyCallback) error {
  if !atomic.CompareAndSwapInt64(&s.state, serverDisconnected, serverConnected) {
    return ErrServerConnected
  }

  desc := description.NewDefaultServer(s.address)
  if s.cfg.loadBalanced {
    // LBs automatically start off with kind LoadBalancer because there is no monitoring routine for state changes.
    desc.Kind = description.LoadBalancer
  }
  s.desc.Store(desc)
  s.updateTopologyCallback.Store(updateCallback)

  if !s.cfg.monitoringDisabled && !s.cfg.loadBalanced {
    s.rttMonitor.connect()
    s.closewg.Add(1)
    go s.update()
  }
}

在初始化sever连接的时候,这里启动了一个协程来调用update方法

代码语言:javascript复制
// update handles performing heartbeats and updating any subscribers of the
// newest description.Server retrieved.
func (s *Server) update() {

   timeoutCnt := 0
   for {
      // Check if the server is disconnecting. Even if waitForNextCheck has already read from the done channel, we
      // can safely read from it again because Disconnect closes the channel.
      select {
      case <-done:
         closeServer()
         return
      default:
      }

      previousDescription := s.Description()

      // Perform the next check.
      desc, err := s.check()
      if err == errCheckCancelled {
         if atomic.LoadInt64(&amp;s.state) != serverConnected {
            continue
         }

         // If the server is not disconnecting, the check was cancelled by an application operation after an error.
         // Wait before running the next check.
         waitUntilNextCheck()
         continue
      }

      if isShortcut := func() bool {
         s.updateDescription(desc)
         // Retry after the first timeout before clearing the pool in case of a FAAS pause as
         // described in GODRIVER-2577.
         if err := unwrapConnectionError(desc.LastError); err != nil &amp;&amp; timeoutCnt < 1 {
            if err == context.Canceled || err == context.DeadlineExceeded {
               timeoutCnt  
               // We want to immediately retry on timeout error. Continue to next loop.
               return true
            }
            if err, ok := err.(net.Error); ok &amp;&amp; err.Timeout() {
               timeoutCnt  
               // We want to immediately retry on timeout error. Continue to next loop.
               return true
            }
         }
         if err := desc.LastError; err != nil {
            // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear
            // because the monitoring routine only runs for non-load balanced deployments in which servers don't return
            // IDs.
            s.pool.clear(err, nil)
         }
         // We're either not handling a timeout error, or we just handled the 2nd consecutive
         // timeout error. In either case, reset the timeout count to 0 and return false to
         // continue the normal check process.
         timeoutCnt = 0
         return false
      }(); isShortcut {
         continue
      }
}

上面的逻辑大致就是:

  • 启动一个for死循环
  • 在循环里会不断轮训检查server连接,得到最新的server连接描述信息 如果连接异常,会将server的kind重置为Unknown
  • 更新server描述信息,如果发现连接异常则重试,超过重试后则直接清空连接池

下面来看下更新描述方法

代码语言:javascript复制
func (s *Server) updateDescription(desc description.Server) {
   // Use the updateTopologyCallback to update the parent Topology and get the description that should be stored.
   callback, ok := s.updateTopologyCallback.Load().(updateTopologyCallback)
   if ok &amp;&amp; callback != nil {
      desc = callback(desc)
   }
   s.desc.Store(desc)

   s.subLock.Lock()
   for _, c := range s.subscribers {
      select {
      // drain the channel if it isn't empty
      case <-c:
      default:
      }
      c <- desc
   }
   s.subLock.Unlock()
}

这里会执行server的更新回调方法,也就是上面说的apply方法

也就是说:是在初始化client的时候,针对每个server都会启动一个后台协程(常驻不退出)

  • 这个协程会不断定时检查server连接,
  • 然后将最新的serve连接描述对象回调给topo更新方法,
  • topo更新方法会那些这些信息,然后得到最新的topo描述对象,发送到topo变更channel中

下面来回答第二个问题

server筛选
代码语言:javascript复制
func (t *Topology) selectServerFromDescription(desc description.Topology,
   selectionState serverSelectionState) ([]description.Server, error) {

   // Unlike selectServerFromSubscription, this code path does not check ctx.Done or selectionState.timeoutChan because
   // selecting a server from a description is not a blocking operation.

   if desc.CompatibilityErr != nil {
      return nil, desc.CompatibilityErr
   }

   // If the topology kind is LoadBalanced, the LB is the only server and it is always considered selectable. The
   // selectors exported by the driver should already return the LB as a candidate, so this but this check ensures that
   // the LB is always selectable even if a user of the low-level driver provides a custom selector.
   if desc.Kind == description.LoadBalanced {
      return desc.Servers, nil
   }

   var allowed []description.Server
   for _, s := range desc.Servers {
      if s.Kind != description.Unknown {
         allowed = append(allowed, s)
      }
   }

   suitable, err := selectionState.selector.SelectServer(desc, allowed)
   if err != nil {
      return nil, ServerSelectionError{Wrapped: err, Desc: desc}
   }
   return suitable, nil
}

上面的代码就是:

  • 首先从topo变更描述对象中获取到不是unknown的server
  • 然后根据选择器选择可用的server

0 人点赞