【注】源码分析均以 k8s 的第一个 commit 代码分析;
controller-manager 的入口函数 main():
代码语言:go复制cmd/controller-manager/controller-manager.go
启动 controller-manager 的参数:
代码语言:go复制var (
etcd_servers = flag.String("etcd_servers", "", "Servers for the etcd (http://ip:port).")
master = flag.String("master", "", "The address of the Kubernetes API server")
)
- 指定 etcd 的 server 的节点 IP;
- 指定 controller-manager 的监听 IP 地址;
初始化 controller-manager 的实例:
代码语言:go复制controllerManager := registry.MakeReplicationManager()
函数返回 ReplicationManager 的实例;
代码语言:go复制// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running tasks.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient *etcd.Client
kubeClient client.ClientInterface
taskControl TaskControlInterface
updateLock sync.Mutex
}
- etcd 的客户端
- task/controller/service 的客户端
- 任务控制接口
- 更新自旋锁
这里讲下 client.ClientInterface 接口:
代码语言:go复制pkg/client/client.go
// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
ListTasks(labelQuery map[string]string) (api.TaskList, error)
GetTask(name string) (api.Task, error)
DeleteTask(name string) error
CreateTask(api.Task) (api.Task, error)
UpdateTask(api.Task) (api.Task, error)
GetReplicationController(name string) (api.ReplicationController, error)
CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
DeleteReplicationController(string) error
GetService(name string) (api.Service, error)
CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error)
DeleteService(string) error
}
通过接口可以看到,这里是 Task/Controller/Service 资源的增删改查;
最后启动两个 gorouting 来同步任务和监控任务;
代码语言:go复制go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)
1)Synchronize() :副本控制器
- 定期同步副本数量
func (rm *ReplicationManager) Synchronize() {}
- 从 etcd 中获取 controller 的列表信息; -- 获取 json 信息,进行反序列化for _, value := range response.Node.Nodes {}
// 每 10s 同步一次
time.Sleep(10 * time.Second)遍历每个节点,获取每个节点上的 controller 列表信息;
代码语言:go复制var controllerSpec ReplicationController
err = rm.syncReplicationController(controllerSpec)
这里对 syncReplicationController() 函数进行分析:
代码语言:go复制func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {}
代码语言:go复制func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {
// 加锁,更新任务信息
rm.updateLock.Lock()
// 根据 label 标签,获取任务列表
taskList, err := rm.kubeClient.ListTasks(controllerSpec.DesiredState.ReplicasInSet)
if err != nil {
return err
}
// 过滤所有退出状态的任务
filteredList := rm.filterActiveTasks(taskList.Items)
// 看期望的副本数和当前任务数的差值
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
log.Printf("%#v", filteredList)
if diff < 0 {
// 将 负数 转换成 正数
diff *= -1
log.Printf("Too few replicas, creating %dn", diff)
// 根据期望数,使用期望状态创建任务
for i := 0; i < diff; i {
rm.taskControl.createReplica(controllerSpec)
}
} else if diff > 0 {
// 任务数超过期望数,删除多余的任务
log.Print("Too many replicas, deleting")
for i := 0; i < diff; i {
rm.taskControl.deleteTask(filteredList[i].ID)
}
}
// 释放锁
rm.updateLock.Unlock()
return nil
}
通过对上面的函数分析,根据 controller 中的期望状态和任务的当前状态进行对比,这个是副本数空值的协程, 每 10s 同步一次;
2)WatchControllers():监控 etcd 中的状态,控制副本数;
代码语言:go复制func (rm *ReplicationManager) WatchControllers() {}
函数源码:
代码语言:go复制func (rm *ReplicationManager) WatchControllers() {
// 创建无缓冲的管道
watchChannel := make(chan *etcd.Response)
// 通过起协程来监控 etcd 中 key 的状态变化,将变化的信息同步给 controller
go util.Forever(func() { rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) }, 0)
for {
// 持续从管道中读取消息,如果没有变化,这里会阻塞
watchResponse := <-watchChannel
// 读出空值,直接休眠 10s,重新读取
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
// 监控 etcd 中 key 的变化, 对 set 的动作进行响应
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
// 将变动后的信息,同步给副本控制器
rm.syncReplicationController(*controller)
}
}
- 持续监控 etcd 中的值的变化,对于有变化的 key 进行操作;