kube-apiserver目录下文件比较多
代码语言:javascript复制.
|____app
| |____options
| | |____globalflags_test.go
| | |____options.go
| | |____globalflags.go
| | |____globalflags_providers.go
| | |____globalflags_providerless.go
| | |____options_test.go
| | |____validation.go
| | |____validation_test.go
| |____server.go
| |____aggregator.go
| |____testing
| | |____testdata
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.key
| | | |____README.md
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.crt
| | |____testserver.go
| |____apiextensions.go
| |____server_test.go
|____apiserver.go
|____OWNERS
入口文件仅仅是一个封装cmd/kube-apiserver/apiserver.go
代码语言:javascript复制command := app.NewAPIServerCommand()
command.Execute()
调用的是cmd/kube-apiserver/app/server.go里面的cobra.Command ,先创建对象,注册方法,和kubectl 一个套路:
代码语言:javascript复制func NewAPIServerCommand() *cobra.Command
s := options.NewServerRunOptions()
err := checkNonZeroInsecurePort(fs)
completedOptions, err := Complete(s)
completedOptions.Validate()
Run(completedOptions, genericapiserver.SetupSignalHandler())
在complete中校验了权限
代码语言:javascript复制 apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
s.Authentication.ApplyAuthorization(s.Authorization)
然后run
代码语言:javascript复制func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error
server, err := CreateServerChain(completeOptions, stopCh)
prepared, err := server.PrepareRun()
prepared.Run(stopCh)
重点看下CreateServerChain:
代码语言:javascript复制nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
其中proxyTransport是一个httpTransport
代码语言:javascript复制proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
然后创建了kubeAPIServer,创建server,包括扩展的apiserver和原生的apiserver,调用方法为createAPIExtensionsServer和CreateKubeAPIServer
代码语言:javascript复制kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
PrepareRun 主要是加了一些hook方法:
代码语言:javascript复制s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
return nil
})
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
同样apiserver的实现也在vendor里包装了下 vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
代码语言:javascript复制func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error
创建Handler的方法实现在pkg/controlplane/instance.go
代码语言:javascript复制func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error)
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider);
restStorageProviders := []RESTStorageProvider
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...);
InstallLegacyAPI里面注册了路由
代码语言:javascript复制legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo);
具体实现位置是pkg/registry/core/rest/storage_core.go
代码语言:javascript复制func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.LegacyBinding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
restStorageMap["pods/eviction"] = podStorage.Eviction
restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers
将各个handler的路由方法注册到Container中去,完全遵循go-restful的设计模式,即将处理方法注册到Route中去,同一个根路径下的Route注册到WebService中去,WebService注册到Container中,Container负责分发。访问的过程为Container-->WebService-->Route。可以看到我们常用的接口,需要获取的资源路由都定义在这里。这里只是定义了一个map,真正注册成路由的地方在这里:
vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
代码语言:javascript复制func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error
s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels);
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
上面路由的map其实定义了一个路径到对应storage的映射,以pod为例看下storage的实现:pkg/registry/core/pod/storage/storage.go
代码语言:javascript复制type PodStorage struct {
Pod *REST
Binding *BindingREST
LegacyBinding *LegacyBindingREST
Eviction *EvictionREST
Status *StatusREST
EphemeralContainers *EphemeralContainersREST
Log *podrest.LogREST
Proxy *podrest.ProxyREST
Exec *podrest.ExecREST
Attach *podrest.AttachREST
PortForward *podrest.PortForwardREST
}
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error)
Pod: &REST{store, proxyTransport},
其中pod是一个REST类型的对象,REST的定义如下:
代码语言:javascript复制type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error)
return registrypod.ResourceLocation(ctx, r, r.proxyTransport, name)
ResourceLocation定义在pkg/registry/core/pod/strategy.go
代码语言:javascript复制func ResourceLocation(ctx context.Context, getter ResourceGetter, rt http.RoundTripper, id string) (*url.URL, http.RoundTripper, error)
pod, err := getPod(ctx, getter, name)
podIP := getPodIP(pod)
err := proxyutil.IsProxyableIP(podIP);
代码语言:javascript复制func getPod(ctx context.Context, getter ResourceGetter, name string) (*api.Pod, error)
obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
pod := obj.(*api.Pod)
其中的Get方法,是实现了REST的一个接口
pkg/registry/core/namespace/storage/storage.go
代码语言:javascript复制func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
return r.store.Get(ctx, name, options)
}
type REST struct {
store *genericregistry.Store
status *genericregistry.Store
}
最终调用的是store的GET,从存储,也就是etcd里面读取元数据返回给用户。vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
代码语言:javascript复制func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj);
type Store struct {
Storage DryRunnableStorage
// Storage is the interface for the underlying storage for the
// resource. It is wrapped into a "DryRunnableStorage" that will
// either pass-through or simply dry-run.
}
vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
代码语言:javascript复制func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error
return s.Storage.Get(ctx, key, opts, objPtr)
apiserver本质上就是一个server服务器,所有代码核心就是如何配置server,包括路由、访问权限以及同数据库(etcd)的交互等。
最终调用etcd的路径是:staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
代码语言:javascript复制type store struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
groupResource schema.GroupResource
groupResourceString string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
代码语言:javascript复制func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}
对应的Get方法如下
代码语言:javascript复制func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}