【K8s】controller-manager 源码分析 01-01

2022-06-21 08:16:59 浏览数 (1)

【注】源码分析均以 k8s 的第一个 commit 代码分析;

controller-manager 的入口函数 main():

代码语言:go复制
cmd/controller-manager/controller-manager.go

启动 controller-manager 的参数:

代码语言:go复制
var (
	etcd_servers = flag.String("etcd_servers", "", "Servers for the etcd (http://ip:port).")
	master       = flag.String("master", "", "The address of the Kubernetes API server")
)
  • 指定 etcd 的 server 的节点 IP;
  • 指定 controller-manager 的监听 IP 地址;

初始化 controller-manager 的实例:

代码语言:go复制
controllerManager := registry.MakeReplicationManager()

函数返回 ReplicationManager 的实例;

代码语言:go复制
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running tasks.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
	etcdClient  *etcd.Client
	kubeClient  client.ClientInterface
	taskControl TaskControlInterface
	updateLock  sync.Mutex
}
  • etcd 的客户端
  • task/controller/service 的客户端
  • 任务控制接口
  • 更新自旋锁

这里讲下 client.ClientInterface 接口:

代码语言:go复制
pkg/client/client.go

// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
	ListTasks(labelQuery map[string]string) (api.TaskList, error)
	GetTask(name string) (api.Task, error)
	DeleteTask(name string) error
	CreateTask(api.Task) (api.Task, error)
	UpdateTask(api.Task) (api.Task, error)

	GetReplicationController(name string) (api.ReplicationController, error)
	CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	DeleteReplicationController(string) error

	GetService(name string) (api.Service, error)
	CreateService(api.Service) (api.Service, error)
	UpdateService(api.Service) (api.Service, error)
	DeleteService(string) error
}

通过接口可以看到,这里是 Task/Controller/Service 资源的增删改查;

最后启动两个 gorouting 来同步任务和监控任务;

代码语言:go复制
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)

1)Synchronize() :副本控制器

  • 定期同步副本数量
代码语言:go复制
func (rm *ReplicationManager) Synchronize() {}
  • 从 etcd 中获取 controller 的列表信息; -- 获取 json 信息,进行反序列化for _, value := range response.Node.Nodes {}

// 每 10s 同步一次

time.Sleep(10 * time.Second)遍历每个节点,获取每个节点上的 controller 列表信息;

代码语言:go复制
var controllerSpec ReplicationController
err = rm.syncReplicationController(controllerSpec)

这里对 syncReplicationController() 函数进行分析:

代码语言:go复制
func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {}
代码语言:go复制
func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {
	// 加锁,更新任务信息
	rm.updateLock.Lock()
	// 根据 label 标签,获取任务列表
	taskList, err := rm.kubeClient.ListTasks(controllerSpec.DesiredState.ReplicasInSet)
	if err != nil {
		return err
	}
	// 过滤所有退出状态的任务
	filteredList := rm.filterActiveTasks(taskList.Items)
	// 看期望的副本数和当前任务数的差值
	diff := len(filteredList) - controllerSpec.DesiredState.Replicas
	log.Printf("%#v", filteredList)
	if diff < 0 {
		// 将 负数 转换成 正数
		diff *= -1
		log.Printf("Too few replicas, creating %dn", diff)
		// 根据期望数,使用期望状态创建任务
		for i := 0; i < diff; i   {
			rm.taskControl.createReplica(controllerSpec)
		}
	} else if diff > 0 {
		// 任务数超过期望数,删除多余的任务
		log.Print("Too many replicas, deleting")
		for i := 0; i < diff; i   {
			rm.taskControl.deleteTask(filteredList[i].ID)
		}
	}
	// 释放锁
	rm.updateLock.Unlock()
	return nil
}

通过对上面的函数分析,根据 controller 中的期望状态和任务的当前状态进行对比,这个是副本数空值的协程, 每 10s 同步一次;

2)WatchControllers():监控 etcd 中的状态,控制副本数;

代码语言:go复制
func (rm *ReplicationManager) WatchControllers() {}

函数源码:

代码语言:go复制
func (rm *ReplicationManager) WatchControllers() {
	// 创建无缓冲的管道
	watchChannel := make(chan *etcd.Response)
	// 通过起协程来监控 etcd 中 key 的状态变化,将变化的信息同步给 controller
	go util.Forever(func() { rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) }, 0)
	for {
		// 持续从管道中读取消息,如果没有变化,这里会阻塞
		watchResponse := <-watchChannel
		// 读出空值,直接休眠 10s,重新读取
		if watchResponse == nil {
			time.Sleep(time.Second * 10)
			continue
		}
		log.Printf("Got watch: %#v", watchResponse)
		// 监控 etcd 中 key 的变化, 对 set 的动作进行响应
		controller, err := rm.handleWatchResponse(watchResponse)
		if err != nil {
			log.Printf("Error handling data: %#v, %#v", err, watchResponse)
			continue
		}
		// 将变动后的信息,同步给副本控制器
		rm.syncReplicationController(*controller)
	}
}
  • 持续监控 etcd 中的值的变化,对于有变化的 key 进行操作;

0 人点赞