穿越回 2014 年,分析下 k8s 第一个提交的源码。
获取 first commit 源码
代码语言:javascript复制git clone https://github.com/kubernetes/kubernetes.git
cd kubernetes
git checkout `git rev-list --max-parents=0 HEAD`
简介
api-server 是 k8s 的核心组件之一,用于接收 kubelet 的请求,并将请求信息保存到后端存储 etcd 中。核心功能是提供 k8s 各类资源对象的 CURD 等操作。
源码分析
从 api-server 的命令行入口开始分析,命令行代码位于 cmd/apiserver/apiserver.go
var (
port = flag.Uint("port", 8080, "The port to listen on. Default 8080.")
address = flag.String("address", "127.0.0.1", "The address on the local server to listen to. Default 127.0.0.1")
apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
etcdServerList, machineList util.StringList
)
func init() {
flag.Var(&etcdServerList, "etcd_servers", "Servers for the etcd (http://ip:port), comma separated")
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
}
最开始定义了 api-server 启动所需要的相关参数,上古版本的 k8s 使用了标准库自带的 flag 库,其中 util.StringList
实现了flag.Value
接口。
type StringList []string
func (sl *StringList) String() string {
return fmt.Sprint(*sl)
}
func (sl *StringList) Set(value string) error {
for _, s := range strings.Split(value, ",") {
if len(s) == 0 {
return fmt.Errorf("value should not be an empty string")
}
*sl = append(*sl, s)
}
return nil
}
可以看到util.StringList
用于将以逗号分割的字符串转为[]string
类型。各个命令行参数含义如下:
- port: api-server 监听的 port
- address: api-server 监听的 ip
- apiPrefix: 访问 api-server 的 URL 前缀
- etcdServerList: 后端存储的 etcd 节点列表
- machineList: 工作节点的列表
从 main 函数开始分析 api-server 的具体实现
代码语言:javascript复制var (
taskRegistry registry.TaskRegistry
controllerRegistry registry.ControllerRegistry
serviceRegistry registry.ServiceRegistry
)
if len(etcdServerList) > 0 {
log.Printf("Creating etcd client pointing to %v", etcdServerList)
etcdClient := etcd.NewClient(etcdServerList)
taskRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
controllerRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
serviceRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
} else {
taskRegistry = registry.MakeMemoryRegistry()
controllerRegistry = registry.MakeMemoryRegistry()
serviceRegistry = registry.MakeMemoryRegistry()
}
registry 是对具体资源对象的后端存储的抽象,这里定义了三个 registry,并根据命令行参数判断是使用 etcd 还是内存作为存储后端。
代码语言:javascript复制// 代码路径:pkg/registry/interfaces.go
// TaskRegistry is an interface implemented by things that know how to store Task objects
type TaskRegistry interface {
// ListTasks obtains a list of tasks that match query.
// Query may be nil in which case all tasks are returned.
ListTasks(query *map[string]string) ([]api.Task, error)
// Get a specific task
GetTask(taskId string) (*api.Task, error)
// Create a task based on a specification, schedule it onto a specific machine.
CreateTask(machine string, task api.Task) error
// Update an existing task
UpdateTask(task api.Task) error
// Delete an existing task
DeleteTask(taskId string) error
}
其中taskRegistry
是对 task 的存储抽象,task 可以当作 pod 的前身看待,实现了对 task 的 list,get,create, update, delete 的操作。
// 代码路径:pkg/registry/interfaces.go
// ControllerRegistry is an interface for things that know how to store Controllers
type ControllerRegistry interface {
ListControllers() ([]api.ReplicationController, error)
GetController(controllerId string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error
UpdateController(controller api.ReplicationController) error
DeleteController(controllerId string) error
}
而ControllerRegistry
是对 RC(Replication Controller) 的存储抽象,而我们现在使用的较多的是 RS(RepicateSet), RS 正是 RC 的升级,同样是实现了对 RC 的 list,get,create,update,delete 操作。
// 代码路径:pkg/registry/service_registry.go
type ServiceRegistry interface {
ListServices() (ServiceList, error)
CreateService(svc Service) error
GetService(name string) (*Service, error)
DeleteService(name string) error
UpdateService(svc Service) error
UpdateEndpoints(e Endpoints) error
}
ServiceRegistry
是对 service 的存储抽象
containerInfo := &kube_client.HTTPContainerInfo{
Client: http.DefaultClient,
Port: 10250,
}
storage := map[string]apiserver.RESTStorage{
"tasks": registry.MakeTaskRegistryStorage(taskRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, taskRegistry)),
"replicationControllers": registry.MakeControllerRegistryStorage(controllerRegistry),
"services": registry.MakeServiceRegistryStorage(serviceRegistry),
}
storge 是对所有资源的 registry 的统一抽象,被定义为 REST 风格的资源操作接口。
代码语言:javascript复制// 代码路径: pkg/apiserver/api_server.go
// RESTStorage is a generic interface for RESTful storage services
type RESTStorage interface {
List(*url.URL) (interface{}, error)
Get(id string) (interface{}, error)
Delete(id string) error
Extract(body string) (interface{}, error)
Create(interface{}) error
Update(interface{}) error
}
实例化所有资源的 storage 后放在 map 中维护,用于后面 handler 的处理。
代码语言:javascript复制s := &http.Server{
Addr: fmt.Sprintf("%s:%d", *address, *port),
Handler: apiserver.New(storage, *apiPrefix), // 使用 REST storage 创建请求的 handler
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())
使用前面的 REST Storage map 和 api prefix 创建 handler,启动 HTTP 服务器等待接收请求。接下来转到 handler 分析源码
代码语言:javascript复制// 代码路径:pkg/apiserver/api_server.go
// New creates a new ApiServer object.
// 'storage' contains a map of handlers.
// 'prefix' is the hosting path prefix.
func New(storage map[string]RESTStorage, prefix string) *ApiServer {
return &ApiServer{
storage: storage,
prefix: prefix,
}
}
// HTTP Handler interface
func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
log.Printf("%s %s", req.Method, req.RequestURI)
url, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
server.error(err, w)
return
}
if url.Path == "/index.html" || url.Path == "/" || url.Path == "" {
server.handleIndex(w)
return
}
if !strings.HasPrefix(url.Path, server.prefix) {
server.notFound(req, w)
return
}
requestParts := strings.Split(url.Path[len(server.prefix):], "/")[1:]
if len(requestParts) < 1 {
server.notFound(req, w)
return
}
storage := server.storage[requestParts[0]]
if storage == nil {
server.notFound(req, w)
return
} else {
server.handleREST(requestParts, url, req, w, storage)
}
}
Golang HTTP 的标准库是通过实现 Handler 接口的 ServeHTTP 函数来实现处理请求,通过代码可以看出先对请求的 URL 进行解析获取具体的资源对象,再通过 REST storage map 拿到对应资源对象的 REST storage,最后调用 server.handleREST
来处理具体的请求。
// 代码路径:pkg/apiserver/api_server.go
func (server *ApiServer) handleREST(parts []string, url *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
switch req.Method {
case "GET":
switch len(parts) {
case 1:
controllers, err := storage.List(url)
if err != nil {
server.error(err, w)
return
}
server.write(200, controllers, w)
case 2:
task, err := storage.Get(parts[1])
if err != nil {
server.error(err, w)
return
}
if task == nil {
server.notFound(req, w)
return
}
server.write(200, task, w)
default:
server.notFound(req, w)
}
return
case "POST":
if len(parts) != 1 {
server.notFound(req, w)
return
}
body, err := server.readBody(req)
if err != nil {
server.error(err, w)
return
}
obj, err := storage.Extract(body)
if err != nil {
server.error(err, w)
return
}
storage.Create(obj)
server.write(200, obj, w)
return
case "DELETE":
if len(parts) != 2 {
server.notFound(req, w)
return
}
err := storage.Delete(parts[1])
if err != nil {
server.error(err, w)
return
}
server.write(200, Status{success: true}, w)
return
case "PUT":
if len(parts) != 2 {
server.notFound(req, w)
return
}
body, err := server.readBody(req)
if err != nil {
server.error(err, w)
}
obj, err := storage.Extract(body)
if err != nil {
server.error(err, w)
return
}
err = storage.Update(obj)
if err != nil {
server.error(err, w)
return
}
server.write(200, obj, w)
return
default:
server.notFound(req, w)
}
}
可以很清晰的看出,这段逻辑是根据请求方法和请求参数对实际的资源对象进行特定的 REST 的操作。
回到 main 函数,在启动 HTTP server 之前还启动了一个 goroutine 做定时任务
代码语言:javascript复制endpoints := registry.MakeEndpointController(serviceRegistry, taskRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
其中 util.Forever
就是周期性任务的封装
// 代码路径: pkg/util/util.go
// Loops forever running f every d. Catches any panics, and keeps going.
func Forever(f func(), period time.Duration) {
for {
func() {
defer HandleCrash()
f()
}()
time.Sleep(period)
}
}
任务实体endpoints.SyncServiceEndpoints
逻辑如下
// 代码路径: pkg/registry/endpoint.go
func (e *EndpointController) SyncServiceEndpoints() error {
services, err := e.serviceRegistry.ListServices()
if err != nil {
return err
}
var resultErr error
for _, service := range services.Items {
tasks, err := e.taskRegistry.ListTasks(&service.Labels)
if err != nil {
log.Printf("Error syncing service: %#v, skipping.", service)
resultErr = err
continue
}
endpoints := make([]string, len(tasks))
for ix, task := range tasks {
// TODO: Use port names in the service object, don't just use port #0
endpoints[ix] = fmt.Sprintf("%s:%d", task.CurrentState.Host, task.DesiredState.Manifest.Containers[0].Ports[0].HostPort)
}
err = e.serviceRegistry.UpdateEndpoints(Endpoints{
Name: service.ID,
Endpoints: endpoints,
})
if err != nil {
log.Printf("Error updating endpoints: %#v", err)
continue
}
}
return resultErr
}
可以看到主要逻辑就是定时获取所有 service 列表,再遍历 service 列表查询 service 下所有 task,最后根据 task 的 endpoint 来更新 service 的 endpoints。这一段逻辑其实就是为 kubeproxy 做负载均衡用的,让 kubeproxy 知道需要代理的 endpoint 有哪些。这一块逻辑在现在的 k8s 架构中已经从 api-server 中移除了。
笔者只分析了 api-server 主体的逻辑,后续会分析具体 registry 的逻辑。
本文作者: Ifan Tsai (菜菜)
本文链接: https://cloud.tencent.com/developer/article/2164602
版权声明: 本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!