k8s first commit 源码分析之 Cloudcfg

2022-11-15 21:37:21 浏览数 (1)

上一次分析了api server的源码,这一次来分析 cloudcfg 的源码。

简介

cloudcfg 可以看做是 kubectl 的前身,负责与 API server 的交互,只存在于上古时代的 k8s 中,我们现在接触到的都是叫做 kubectl 的命令行工具了。该组件做的事情非常简单,就是将用户的命令行操作转化为对 API server 的 HTTP 请求。

命令行主体逻辑

从 cloudcfg 的命令行入口开始分析,命令行代码位于 cmd/apiserver/cloudcfg.go

代码语言:javascript复制
// The flag package provides a default help printer via -h switch
var versionFlag *bool = flag.Bool("v", false, "Print the version number.")
var httpServer *string = flag.String("h", "", "The host to connect to.")
var config *string = flag.String("c", "", "Path to the config file.")
var labelQuery *string = flag.String("l", "", "Label query to use for listing")
var updatePeriod *time.Duration = flag.Duration("u", 60*time.Second, "Update interarrival in seconds")
var portSpec *string = flag.String("p", "", "The port spec, comma-separated list of <external>:<internal>,...")
var servicePort *int = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'")
var authConfig *string = flag.String("auth", os.Getenv("HOME") "/.kubernetes_auth", "Path to the auth info file.  If missing, prompt the user")

最开始定义了 cloudcfg 命令行工具提供的所有参数。

main 函数中先从命令行中获取 method, api server 的 host,资源对象的类型以及鉴权参数。

代码语言:javascript复制
method := flag.Arg(0)
url := *httpServer   "/api/v1beta1"   flag.Arg(1)

auth, err := cloudcfg.LoadAuthInfo(*authConfig)
if err != nil {
    log.Fatalf("Error loading auth: %#v", err)
}

再根据 method 判断是对资源的何种操作方式。

对资源的 CURD 是通过拼接 method 和 labelQuery 作为 api server 的请求 URL 去发送 HTTP 请求,没有其他多余的逻辑。

代码语言:javascript复制
if method == "get" || method == "list" {
    if len(*labelQuery) > 0 && method == "list" {
        url = url   "?labels="   *labelQuery
    }
    request, err = http.NewRequest("GET", url, nil)
} else if method == "delete" {
    request, err = http.NewRequest("DELETE", url, nil)
} else if method == "create" {
    request, err = cloudcfg.RequestWithBody(*config, url, "POST")
} else if method == "update" {
    request, err = cloudcfg.RequestWithBody(*config, url, "PUT")
}

var body string
body, err = cloudcfg.DoRequest(request, auth.User, auth.Password)
if err != nil {
    log.Fatalf("Error: %#v", err)
}
fmt.Println(body)

还有对 rollingupdate 的操作和 controller 的操作,后面分别展开分析

rollingupdate操作

判断 method 为 rollingupdate 后通过 client 执行 update

代码语言:javascript复制
else if method == "rollingupdate" {
    client := &kube_client.Client{
        Host: *httpServer,
        Auth: &auth,
    }
    cloudcfg.Update(flag.Arg(1), client, *updatePeriod)
}

具体操作是通过pkg/client/client.go中实现的 client 进行操作, client 实现了以下接口

代码语言:javascript复制
// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
	ListTasks(labelQuery map[string]string) (api.TaskList, error)
	GetTask(name string) (api.Task, error)
	DeleteTask(name string) error
	CreateTask(api.Task) (api.Task, error)
	UpdateTask(api.Task) (api.Task, error)

	GetReplicationController(name string) (api.ReplicationController, error)
	CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	DeleteReplicationController(string) error

	GetService(name string) (api.Service, error)
	CreateService(api.Service) (api.Service, error)
	UpdateService(api.Service) (api.Service, error)
	DeleteService(string) error
}

可以看到就是对 tasks,RC 和 serveice 的操作。以 create tasks 为例

代码语言:javascript复制
// CreateTask takes the representation of a task.  Returns the server's representation of the task, and an error, if it occurs
func (client Client) CreateTask(task api.Task) (api.Task, error) {
	var result api.Task
	body, err := json.Marshal(task)
	if err == nil {
		_, err = client.rawRequest("POST", "tasks", bytes.NewBuffer(body), &result)
	}
	return result, err
}

可以看到同样是通过请求 api server 来完成操作的。

回到命令行的 main 函数,创建 client 后调用cloudcfg.Update(flag.Arg(1), client, *updatePeriod)

代码语言:javascript复制
// 代码路径:pkg/cloudcfg/cloudcfg.go
// Perform a rolling update of a collection of tasks.
// 'name' points to a replication controller.
// 'client' is used for updating tasks.
// 'updatePeriod' is the time between task updates.
func Update(name string, client client.ClientInterface, updatePeriod time.Duration) error {
	controller, err := client.GetReplicationController(name)
	if err != nil {
		return err
	}
	labels := controller.DesiredState.ReplicasInSet

	taskList, err := client.ListTasks(labels)
	if err != nil {
		return err
	}
	for _, task := range taskList.Items {
		_, err = client.UpdateTask(task)
		if err != nil {
			return err
		}
		time.Sleep(updatePeriod)
	}
	return nil
}

大致逻辑是通过 name 获取 RC 对象,再通过 RC 对象的期望状态获取 tasks label,再通过 task label 来 list 所有 task 后更新 task。不过笔者没太看懂这里的更新逻辑,看上去把遍历 task 的时候由原封不动传回去了,或许第一个版本还不支持滚动更新?待笔者后续完整深入看完所有组件逻辑再来补充。

controller 操作

判断 method 为 run 时,拿到 image,replicas, name 后执行 RunController

代码语言:javascript复制
if method == "run" {
    args := flag.Args()
    if len(args) < 4 {
        log.Fatal("usage: cloudcfg -h <host> run <image> <replicas> <name>")
    }
    image := args[1]
    replicas, err := strconv.Atoi(args[2])
    name := args[3]
    if err != nil {
        log.Fatalf("Error parsing replicas: %#v", err)
    }
    err = cloudcfg.RunController(image, name, replicas, kube_client.Client{Host: *httpServer, Auth: &auth}, *portSpec, *servicePort)
    if err != nil {
        log.Fatalf("Error: %#v", err)
    }
    return
}

进入到 cloudcfg.RunController 函数内部分析

代码语言:javascript复制
controller := api.ReplicationController{
    JSONBase: api.JSONBase{
        ID: name,
    },
    DesiredState: api.ReplicationControllerState{
        Replicas: replicas,
        ReplicasInSet: map[string]string{
            "name": name,
        },
        TaskTemplate: api.TaskTemplate{
            DesiredState: api.TaskState{
                Manifest: api.ContainerManifest{
                    Containers: []api.Container{
                        api.Container{
                            Image: image,
                            Ports: makePorts(portSpec),
                        },
                    },
                },
            },
            Labels: map[string]string{
                "name": name,
            },
        },
    },
    Labels: map[string]string{
        "name": name,
    },
}

controllerOut, err := client.CreateReplicationController(controller)
if err != nil {
	return err
}

根据外部传入的参数构造 RC 对象,然后调用 client 的 CreateReplicationController 函数,本质还是向 API Server 发起请求

代码语言:javascript复制
data, err := yaml.Marshal(controllerOut)
if err != nil {
    return err
}
fmt.Print(string(data))

if servicePort > 0 {
    svc, err := createService(name, servicePort, client)
    if err != nil {
        return err
    }
    data, err = yaml.Marshal(svc)
    if err != nil {
        return err
    }
    fmt.Printf(string(data))
}

再根据掺入的 servicePort 创建 service

代码语言:javascript复制
func createService(name string, port int, client client.ClientInterface) (api.Service, error) {
	svc := api.Service{
		JSONBase: api.JSONBase{ID: name},
		Port:     port,
		Labels: map[string]string{
			"name": name,
		},
	}
	svc, err := client.CreateService(svc)
	return svc, err
}

同样是向 API server 发起请求。

回到 cloudcfg 命令行中,判断 method 为 stop 时执行 StopController

代码语言:javascript复制
else if method == "stop" {
    err = cloudcfg.StopController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth})
    if err != nil {
        log.Fatalf("Error: %#v", err)
    }
    return
}

分析 cloudcfg.StopController

代码语言:javascript复制
// StopController stops a controller named 'name' by setting replicas to zero
func StopController(name string, client client.ClientInterface) error {
	controller, err := client.GetReplicationController(name)
	if err != nil {
		return err
	}
	controller.DesiredState.Replicas = 0
	controllerOut, err := client.UpdateReplicationController(controller)
	if err != nil {
		return err
	}
	data, err := yaml.Marshal(controllerOut)
	if err != nil {
		return err
	}
	fmt.Print(string(data))
	return nil
}

可以看到将 RC 的 Replicas 置为 0 后发送给了 API server。

Controller 的操作还有一个 rm

代码语言:javascript复制
lse if method == "rm" {
    err = cloudcfg.DeleteController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth})
    if err != nil {
        log.Fatalf("Error: %#v", err)
    }
    return
}

cloudcfg.DeleteController 也很简单,获取当前 RC 的副本数,如果副本数不为 0 则报错退出,否则请求 API server 删除

代码语言:javascript复制
// DeleteController deletes a replication controller named 'name', requires that the controller
// already be stopped
func DeleteController(name string, client client.ClientInterface) error {
	controller, err := client.GetReplicationController(name)
	if err != nil {
		return err
	}
	if controller.DesiredState.Replicas != 0 {
		return fmt.Errorf("controller has non-zero replicas (%d)", controller.DesiredState.Replicas)
	}
	return client.DeleteReplicationController(name)
}

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2td5ld5xvow0o

本文作者: Ifan Tsai  (菜菜)

本文链接: https://cloud.tencent.com/developer/article/2164603

版权声明: 本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!

0 人点赞