【K8s】kubelet 源码分析 01-01

2022-06-26 14:58:08 浏览数 (1)

【注】源码分析均以 k8s 的第一个 commit 代码分析;

kubelet 的入口函数 main()

代码语言:go复制
cmd/kubelet/kubelet.go

kubelet 通过本地套接字的方式与 docker 进行链接,可以认为 kubelet 是 docker 的客户端:

代码语言:go复制
endpoint := "unix:///var/run/docker.sock"
dockerClient, err := docker.NewClient(endpoint)

最后通过 RunKubelet() 函数来启动 kubelet:

代码语言:go复制
my_kubelet := kubelet.Kubelet{
	DockerClient:       dockerClient,
	FileCheckFrequency: *fileCheckFrequency,
	SyncFrequency:      *syncFrequency,
	HTTPCheckFrequency: *httpCheckFrequency,
}
my_kubelet.RunKubelet(*file, *manifest_url, *etcd_servers, *address, *port)

下面就主要讲解 kubelet 的启动函数:

代码语言:go复制
pkg/kubelet/kubelet.go

func (sl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {}

在分析函数之前,可以看下早期的 kubelet 操作容器的接口:

代码语言:go复制
// Interface for testability
type DockerInterface interface {
	ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error)
	InspectContainer(id string) (*docker.Container, error)
	CreateContainer(docker.CreateContainerOptions) (*docker.Container, error)
	StartContainer(id string, hostConfig *docker.HostConfig) error
	StopContainer(id string, timeout uint) error
}

可以看到,主要有 5 个方法:

  • 获取容器列表
  • 查看容器信息
  • 创建容器
  • 启动容器
  • 停止容器

在 RunKubelet() 方法中, 首先定义了四个 同步 channel:

代码语言:go复制
fileChannel := make(chan api.ContainerManifest)
etcdChannel := make(chan []api.ContainerManifest)
httpChannel := make(chan api.ContainerManifest)
serverChannel := make(chan api.ContainerManifest)

启动第一个 file 的 gorouting:

  • 主要监控目录下的资源配置清单的变更信息;func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {}

如果监控的资源配置清单有变更,就将资源配置信息写入 fileChannel 中;

如果资源清单的 URL 不为空,则开启监控 URL 的 gorouting:

代码语言:go复制
	if manifest_url != "" {
		go util.Forever(func() { sl.WatchHTTP(manifest_url, httpChannel) }, 20*time.Second)
	}
代码语言:go复制
func (sl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {}
  • 监控 URL 对应的资源配置清单的是否变更;

如果 etcd_server 不为空,则开启 etcd 的监控;

代码语言:go复制
go util.Forever(func() { sl.SyncAndSetupEtcdWatch(etcdChannel) }, 20*time.Second)
代码语言:go复制
unc (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {}

-- 同步和监控 etcd 资源的变动;

如果 kubelet 的 server 地址不为空,则开启 kubelet 的端口监听:

代码语言:go复制
s.ListenAndServe()

最终通过对所有的 channel 的同步进行汇总:

代码语言:go复制
sl.RunSyncLoop(etcdChannel, fileChannel, serverChannel, httpChannel, sl)

至此,kubelet 则启动完成;

0 人点赞