源码地址:https://github.com/polarismesh/polaris-controller/blob/main/README-zh.md
通过mesh配置文件设置controller的配置管理对象 https://fankangbest.github.io/2017/10/12/kubernetes-client分析(一)-kubeconfig-v1-5-2/
下面就从源码开始分析polaris是怎么通过进行服务发现的
polaris通过k8s的扩展api机制自定义了controller实现,下面选取一些关键代码进行分析
初始化controller
对每个资源增加创建、更新、删除操作的监控回调方法:
代码语言:javascript复制p := PolarisController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
polarisControllerName),
workerLoopPeriod: time.Second,
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.onServiceAdd,
UpdateFunc: p.onServiceUpdate,
DeleteFunc: p.onServiceDelete,
})
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.onEndpointAdd,
UpdateFunc: p.onEndpointUpdate,
DeleteFunc: p.onEndpointDelete,
})
namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.onNamespaceAdd,
UpdateFunc: p.onNamespaceUpdate,
})
上面是典型的k8s资源监听代码
以service创建为例:
当k8s中一个service被创建成功之后,就会调用polaris的这个onServiceAdd方法:
代码语言:javascript复制func (p *PolarisController) onServiceAdd(obj interface{}) {
service := obj.(*v1.Service)
if !util.IsPolarisService(service, p.config.PolarisController.SyncMode) {
return
}
key, err := util.GenServiceQueueKey(service)
if err != nil {
log.Errorf("generate queue key for %s/%s error, %v", service.Namespace, service.Name, err)
return
}
p.enqueueService(key, service, "Add")
}
逻辑如下:
- 判断是否可以注册为北极星服务
- 根据该service生成相应的key
- 将该key放入
workqueue
中
启动controller
代码语言:javascript复制func (p *PolarisController) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer p.queue.ShutDown()
defer p.consumer.Destroy()
defer p.provider.Destroy()
defer log.Infof("Shutting down polaris controller")
if !cache.WaitForCacheSync(stopCh, p.podsSynced, p.servicesSynced, p.endpointsSynced, p.namespaceSynced) {
return
}
p.CounterPolarisService()
for i := 0; i < workers; i {
go wait.Until(p.worker, p.workerLoopPeriod, stopCh)
}
//定时任务
go p.MetricTracker(stopCh)
<-stopCh
}
逻辑如下:
- 等待k8s资源cache同步完成
- 统计k8s服务资源能够注册为北极星服务的数量:通过k8s接口获取所有k8s服务,对每个service判断是否可以转换为北极星service
- 启动多个work协程,每个协程处理流程如下:
- 从workqueue中获取元素key
- 从key中解析出namespace、service名等信息
- 根据namespace和service名从informer的cache中获取service
- 根据service是否存在做不同的处理
- 如果不存在,则调用北极星接口创建相应的namespace、service等
- work工作协程会一直轮询中,直到收到stop信号,也就是说会一直轮询取消workqueue中的元素进行上述的处理工作
informer回调
从上面我们知道,在controller的初始化和启动方法中,分别对workqueue进行push和pop元素,那么workqueue的push操作所在的回调方法是什么时候触发的呢?
我们看到在polaris-controller-manager的run方法中,还有这样的逻辑:
代码语言:javascript复制controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)
答案就是在这里,具体我们来分析下: 经过层层调用,最终在这里,我们看到,
代码语言:javascript复制func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
上面的主要逻辑如下:
(1)调用NewDeltaFIFO,初始化DeltaFIFO; (2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法; (3)调用New,利用Config结构体来初始化controller; (4)调用s.processor.run,启动processor; (5)调用s.controller.Run,启动controller;
启动procesor
代码语言:javascript复制func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
启动informer的controller
代码语言:javascript复制func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
上面的主要逻辑是:
(1)调用NewReflector,初始化Reflector; (2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector; (3)调用c.processLoop,开始controller的核心处理;
启动reflector
代码语言:javascript复制func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
主要就是从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。
调用自定义eventhandler
代码语言:javascript复制func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。
c.config.Process其实就是sharedIndexInformer.HandleDeltas。 HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。
https://cloudsre.me/2020/03/client-go-0-informer/ https://qiankunli.github.io/2020/07/20/client_go.html https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html http://dockerone.com/article/2434596 https://www.cnblogs.com/lianngkyle/p/16244872.html