mac 上学习k8s系列(15)kube-apiserver源码阅读

2022-08-02 19:30:12 浏览数 (1)

kube-apiserver目录下文件比较多

代码语言:javascript复制
.
|____app
| |____options
| | |____globalflags_test.go
| | |____options.go
| | |____globalflags.go
| | |____globalflags_providers.go
| | |____globalflags_providerless.go
| | |____options_test.go
| | |____validation.go
| | |____validation_test.go
| |____server.go
| |____aggregator.go
| |____testing
| | |____testdata
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.key
| | | |____README.md
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.crt
| | |____testserver.go
| |____apiextensions.go
| |____server_test.go
|____apiserver.go
|____OWNERS

入口文件仅仅是一个封装cmd/kube-apiserver/apiserver.go

代码语言:javascript复制
command := app.NewAPIServerCommand()
command.Execute()

调用的是cmd/kube-apiserver/app/server.go里面的cobra.Command ,先创建对象,注册方法,和kubectl 一个套路:

代码语言:javascript复制
func NewAPIServerCommand() *cobra.Command 
    s := options.NewServerRunOptions()
    err := checkNonZeroInsecurePort(fs)
    completedOptions, err := Complete(s)
     completedOptions.Validate()
     Run(completedOptions, genericapiserver.SetupSignalHandler())

在complete中校验了权限

代码语言:javascript复制
  apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
  s.Authentication.ApplyAuthorization(s.Authorization)

然后run

代码语言:javascript复制
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error 
    server, err := CreateServerChain(completeOptions, stopCh)
    prepared, err := server.PrepareRun()
    prepared.Run(stopCh)

重点看下CreateServerChain:

代码语言:javascript复制
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

其中proxyTransport是一个httpTransport

代码语言:javascript复制
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
    DialContext:     proxyDialerFn,
    TLSClientConfig: proxyTLSClientConfig,
  })

然后创建了kubeAPIServer,创建server,包括扩展的apiserver和原生的apiserver,调用方法为createAPIExtensionsServer和CreateKubeAPIServer

代码语言:javascript复制
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)

PrepareRun 主要是加了一些hook方法:

代码语言:javascript复制
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
    go s.openAPIAggregationController.Run(context.StopCh)
    return nil
    })
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)

同样apiserver的实现也在vendor里包装了下 vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

代码语言:javascript复制
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error

创建Handler的方法实现在pkg/controlplane/instance.go

代码语言:javascript复制
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error)
  routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
  Install(s.Handler.GoRestfulContainer)
  m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider);
  restStorageProviders := []RESTStorageProvider
  m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...);

InstallLegacyAPI里面注册了路由

代码语言:javascript复制
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo);

具体实现位置是pkg/registry/core/rest/storage_core.go

代码语言:javascript复制
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)
    restStorageMap := map[string]rest.Storage{
      "pods":             podStorage.Pod,
      "pods/attach":      podStorage.Attach,
      "pods/status":      podStorage.Status,
      "pods/log":         podStorage.Log,
      "pods/exec":        podStorage.Exec,
      "pods/portforward": podStorage.PortForward,
      "pods/proxy":       podStorage.Proxy,
      "pods/binding":     podStorage.Binding,
      "bindings":         podStorage.LegacyBinding,
      "podTemplates": podTemplateStorage,
      "replicationControllers":        controllerStorage.Controller,
      "replicationControllers/status": controllerStorage.Status,
      "services":        serviceRest,
      "services/proxy":  serviceRestProxy,
      "services/status": serviceStatusStorage,
      "endpoints": endpointsStorage,
      "nodes":        nodeStorage.Node,
      "nodes/status": nodeStorage.Status,
      "nodes/proxy":  nodeStorage.Proxy,
      "events": eventStorage,
      "limitRanges":                   limitRangeStorage,
      "resourceQuotas":                resourceQuotaStorage,
      "resourceQuotas/status":         resourceQuotaStatusStorage,
      "namespaces":                    namespaceStorage,
      "namespaces/status":             namespaceStatusStorage,
      "namespaces/finalize":           namespaceFinalizeStorage,
      "secrets":                       secretStorage,
      "serviceAccounts":               serviceAccountStorage,
      "persistentVolumes":             persistentVolumeStorage,
      "persistentVolumes/status":      persistentVolumeStatusStorage,
      "persistentVolumeClaims":        persistentVolumeClaimStorage,
      "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
      "configMaps":                    configMapStorage,
      "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    restStorageMap["pods/eviction"] = podStorage.Eviction
    restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers

将各个handler的路由方法注册到Container中去,完全遵循go-restful的设计模式,即将处理方法注册到Route中去,同一个根路径下的Route注册到WebService中去,WebService注册到Container中,Container负责分发。访问的过程为Container-->WebService-->Route。可以看到我们常用的接口,需要获取的资源路由都定义在这里。这里只是定义了一个map,真正注册成路由的地方在这里:

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

代码语言:javascript复制
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error
    s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels);
    r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
    apiResources, resourceInfos, ws, registrationErrors := installer.Install()
    for _, path := range paths {
        apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)

上面路由的map其实定义了一个路径到对应storage的映射,以pod为例看下storage的实现:pkg/registry/core/pod/storage/storage.go

代码语言:javascript复制
type PodStorage struct {
  Pod                 *REST
  Binding             *BindingREST
  LegacyBinding       *LegacyBindingREST
  Eviction            *EvictionREST
  Status              *StatusREST
  EphemeralContainers *EphemeralContainersREST
  Log                 *podrest.LogREST
  Proxy               *podrest.ProxyREST
  Exec                *podrest.ExecREST
  Attach              *podrest.AttachREST
  PortForward         *podrest.PortForwardREST
}

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) 
      Pod:  &REST{store, proxyTransport},

其中pod是一个REST类型的对象,REST的定义如下:

代码语言:javascript复制
type REST struct {
  *genericregistry.Store
  proxyTransport http.RoundTripper
}
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error)
  return registrypod.ResourceLocation(ctx, r, r.proxyTransport, name)

ResourceLocation定义在pkg/registry/core/pod/strategy.go

代码语言:javascript复制
func ResourceLocation(ctx context.Context, getter ResourceGetter, rt http.RoundTripper, id string) (*url.URL, http.RoundTripper, error) 
    pod, err := getPod(ctx, getter, name)
    podIP := getPodIP(pod)
     err := proxyutil.IsProxyableIP(podIP);
代码语言:javascript复制
func getPod(ctx context.Context, getter ResourceGetter, name string) (*api.Pod, error) 
    obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
    pod := obj.(*api.Pod)

其中的Get方法,是实现了REST的一个接口

pkg/registry/core/namespace/storage/storage.go

代码语言:javascript复制
func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
  return r.store.Get(ctx, name, options)
}

type REST struct {
  store  *genericregistry.Store
  status *genericregistry.Store
}

最终调用的是store的GET,从存储,也就是etcd里面读取元数据返回给用户。vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

代码语言:javascript复制
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) 
    err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj);

type Store struct {
  Storage DryRunnableStorage
  // Storage is the interface for the underlying storage for the
  // resource. It is wrapped into a "DryRunnableStorage" that will
  // either pass-through or simply dry-run.
}

vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go

代码语言:javascript复制
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error
    return s.Storage.Get(ctx, key, opts, objPtr)

apiserver本质上就是一个server服务器,所有代码核心就是如何配置server,包括路由、访问权限以及同数据库(etcd)的交互等。

最终调用etcd的路径是:staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

代码语言:javascript复制
type store struct {
  client              *clientv3.Client
  codec               runtime.Codec
  versioner           storage.Versioner
  transformer         value.Transformer
  pathPrefix          string
  groupResource       schema.GroupResource
  groupResourceString string
  watcher             *watcher
  pagingEnabled       bool
  leaseManager        *leaseManager
}
代码语言:javascript复制
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
  return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}

对应的Get方法如下

代码语言:javascript复制
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
  key = path.Join(s.pathPrefix, key)
  startTime := time.Now()
  getResp, err := s.client.KV.Get(ctx, key)
  metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
  if err != nil {
    return err
  }
  if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
    return err
  }

  if len(getResp.Kvs) == 0 {
    if opts.IgnoreNotFound {
      return runtime.SetZeroValue(out)
    }
    return storage.NewKeyNotFoundError(key, 0)
  }
  kv := getResp.Kvs[0]

  data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
  if err != nil {
    return storage.NewInternalError(err.Error())
  }

  return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

0 人点赞