名字服务Polaris中服务发现详解

2023-08-19 09:46:36 浏览数 (1)

源码地址:https://github.com/polarismesh/polaris-controller/blob/main/README-zh.md

通过mesh配置文件设置controller的配置管理对象 https://fankangbest.github.io/2017/10/12/kubernetes-client分析(一)-kubeconfig-v1-5-2/

下面就从源码开始分析polaris是怎么通过进行服务发现的

polaris通过k8s的扩展api机制自定义了controller实现,下面选取一些关键代码进行分析

初始化controller

对每个资源增加创建、更新、删除操作的监控回调方法:

代码语言:javascript复制
p := PolarisController{
   client: client,
   queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
      polarisControllerName),
   workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onServiceAdd,
   UpdateFunc: p.onServiceUpdate,
   DeleteFunc: p.onServiceDelete,
})

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onEndpointAdd,
   UpdateFunc: p.onEndpointUpdate,
   DeleteFunc: p.onEndpointDelete,
})

namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onNamespaceAdd,
   UpdateFunc: p.onNamespaceUpdate,
})

上面是典型的k8s资源监听代码

以service创建为例:

当k8s中一个service被创建成功之后,就会调用polaris的这个onServiceAdd方法:

代码语言:javascript复制
func (p *PolarisController) onServiceAdd(obj interface{}) {

   service := obj.(*v1.Service)

   if !util.IsPolarisService(service, p.config.PolarisController.SyncMode) {
      return
   }

   key, err := util.GenServiceQueueKey(service)
   if err != nil {
      log.Errorf("generate queue key for %s/%s error, %v", service.Namespace, service.Name, err)
      return
   }

   p.enqueueService(key, service, "Add")
}

逻辑如下:

  • 判断是否可以注册为北极星服务
  • 根据该service生成相应的key
  • 将该key放入workqueue

启动controller

代码语言:javascript复制
func (p *PolarisController) Run(workers int, stopCh <-chan struct{}) {
   defer runtime.HandleCrash()
   defer p.queue.ShutDown()
   defer p.consumer.Destroy()
   defer p.provider.Destroy()

   defer log.Infof("Shutting down polaris controller")

   if !cache.WaitForCacheSync(stopCh, p.podsSynced, p.servicesSynced, p.endpointsSynced, p.namespaceSynced) {
      return
   }

   p.CounterPolarisService()

   for i := 0; i < workers; i   {
      go wait.Until(p.worker, p.workerLoopPeriod, stopCh)
   }

   //定时任务
   go p.MetricTracker(stopCh)

   <-stopCh
}

逻辑如下:

  • 等待k8s资源cache同步完成
  • 统计k8s服务资源能够注册为北极星服务的数量:通过k8s接口获取所有k8s服务,对每个service判断是否可以转换为北极星service
  • 启动多个work协程,每个协程处理流程如下:
    • 从workqueue中获取元素key
    • 从key中解析出namespace、service名等信息
    • 根据namespace和service名从informer的cache中获取service
    • 根据service是否存在做不同的处理
    • 如果不存在,则调用北极星接口创建相应的namespace、service等
  • work工作协程会一直轮询中,直到收到stop信号,也就是说会一直轮询取消workqueue中的元素进行上述的处理工作

informer回调

从上面我们知道,在controller的初始化和启动方法中,分别对workqueue进行push和pop元素,那么workqueue的push操作所在的回调方法是什么时候触发的呢?

我们看到在polaris-controller-manager的run方法中,还有这样的逻辑:

代码语言:javascript复制
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)

答案就是在这里,具体我们来分析下: 经过层层调用,最终在这里,我们看到,

代码语言:javascript复制
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()

   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

   cfg := &amp;Config{
      Queue: fifo,
      ListerWatcher: s.listerWatcher,
      ObjectType: s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError: false,
      ShouldResync: s.processor.shouldResync,

      Process: s.HandleDeltas,
   }

   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()

   // Separate stop channel because Processor should be stopped strictly after controller
   processorStopCh := make(chan struct{})
   var wg wait.Group
   defer wg.Wait() // Wait for Processor to stop
   defer close(processorStopCh) // Tell Processor to stop
   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
   wg.StartWithChannel(processorStopCh, s.processor.run)

   defer func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
      s.stopped = true // Don't want any new listeners
   }()
   s.controller.Run(stopCh)
}

上面的主要逻辑如下:

(1)调用NewDeltaFIFO,初始化DeltaFIFO; (2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法; (3)调用New,利用Config结构体来初始化controller; (4)调用s.processor.run,启动processor; (5)调用s.controller.Run,启动controller;

启动procesor

代码语言:javascript复制
func (p *processorListener) run() {
   // this call blocks until the channel is closed. When a panic happens during the notification
   // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   // the next notification will be attempted. This is usually better than the alternative of never
   // delivering again.
   stopCh := make(chan struct{})
   wait.Until(func() {
      // this gives us a few quick retries before a long pause and then a few more quick retries
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
         for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
               p.handler.OnAdd(notification.newObj)
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
         }
         // the only way to get here is if the p.nextCh is empty and closed
         return true, nil
      })

      // the only way to get here is if the p.nextCh is empty and closed
      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}

启动informer的controller

代码语言:javascript复制
func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.clock = c.clock

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   defer wg.Wait()

   wg.StartWithChannel(stopCh, r.Run)

   wait.Until(c.processLoop, time.Second, stopCh)
}

上面的主要逻辑是:

(1)调用NewReflector,初始化Reflector; (2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector; (3)调用c.processLoop,开始controller的核心处理;

启动reflector
代码语言:javascript复制
func (r *Reflector) Run(stopCh <-chan struct{}) {
   klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
   wait.Until(func() {
      if err := r.ListAndWatch(stopCh); err != nil {
         utilruntime.HandleError(err)
      }
   }, r.period, stopCh)
}

主要就是从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。

调用自定义eventhandler
代码语言:javascript复制
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

c.config.Process其实就是sharedIndexInformer.HandleDeltas。 HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。

https://cloudsre.me/2020/03/client-go-0-informer/ https://qiankunli.github.io/2020/07/20/client_go.html https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html http://dockerone.com/article/2434596 https://www.cnblogs.com/lianngkyle/p/16244872.html

0 人点赞