【注】源码分析均以 k8s 的第一个 commit 代码分析;
1)分析 WatchFile() 方法:
代码语言:go复制func (sl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {}
每隔一段时间,从监控的文件中读取数据:
代码语言:go复制data, err := ioutil.ReadFile(file)
将读取的数据解析到 ContainerManifest 的 ymal 文件中:
代码语言:go复制sl.ExtractYAMLData(data, &manifest)
再通过 byte.Equal 函数进行对比,对应的文件是否有变更:
代码语言:go复制bytes.Equal(lastData, data)
如果对应的文件有更新,就将更新的数据写入 fileChannel 中;
2)分析 WatchHTTP() 方法:
- 此方法和 WatchFile() 类似,只是这个方法是从 http 请求中获取数据;func (sl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {}此方法先发起 http 的 Get 请求,然后对比最后的一次的数据,如果数据有变更,则将变更后的数据写入 httpChannel 中;
3)分析 SyncAndSetupEtcdWatch() 方法:
- 对于使用了 etcd 存储的集群来说,需要监控对应的 key 的变动;func (sl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {}
-- 现获取当前 kubelet 运行的主机 hostname:
代码语言:go复制hostname, err := exec.Command("hostname", "-f").Output()
对应 etcd 中的 key 为:
代码语言:go复制key := "/registry/hosts/" strings.TrimSpace(string(hostname))
第一次只读取最初的配置,wanch 只会监控变化:
代码语言:go复制 // First fetch the initial configuration (watch only gives changes...)
for {
err = sl.getKubeletStateFromEtcd(key, changeChannel)
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
}
time.Sleep(30 * time.Second)
}
第一启动中,获取到 key 报错,则每隔 30s 进行重试;
获取 etcd 中 kubelet 的 state:
代码语言:go复制func (sl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []api.ContainerManifest) error {}