【K8s】kubelet 源码分析 01-02

2022-06-26 15:57:11 浏览数 (1)

【注】源码分析均以 k8s 的第一个 commit 代码分析;

1)分析 WatchFile() 方法:

代码语言:go复制
func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {}

每隔一段时间,从监控的文件中读取数据:

代码语言:go复制
data, err := ioutil.ReadFile(file)

将读取的数据解析到 ContainerManifest 的 ymal 文件中:

代码语言:go复制
sl.ExtractYAMLData(data, &manifest)

再通过 byte.Equal 函数进行对比,对应的文件是否有变更:

代码语言:go复制
bytes.Equal(lastData, data)

如果对应的文件有更新,就将更新的数据写入 fileChannel 中;


2)分析 WatchHTTP() 方法:

  • 此方法和 WatchFile() 类似,只是这个方法是从 http 请求中获取数据;func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {}此方法先发起 http 的 Get 请求,然后对比最后的一次的数据,如果数据有变更,则将变更后的数据写入 httpChannel 中;

3)分析 SyncAndSetupEtcdWatch() 方法:

  • 对于使用了 etcd 存储的集群来说,需要监控对应的 key 的变动;func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {}

-- 现获取当前 kubelet 运行的主机 hostname:

代码语言:go复制
hostname, err := exec.Command("hostname", "-f").Output()

对应 etcd 中的 key 为:

代码语言:go复制
key := "/registry/hosts/"   strings.TrimSpace(string(hostname))

第一次只读取最初的配置,wanch 只会监控变化:

代码语言:go复制
    // First fetch the initial configuration (watch only gives changes...)
	for {
		err = sl.getKubeletStateFromEtcd(key, changeChannel)
		if err == nil {
			// We got a successful response, etcd is up, set up the watch.
			break
		}
		time.Sleep(30 * time.Second)
	}

第一启动中,获取到 key 报错,则每隔 30s 进行重试;

获取 etcd 中 kubelet 的 state:

代码语言:go复制
func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error {}

0 人点赞