正文
1、k8s源码中针对pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()进行。如下所示:
代码语言:javascript复制// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
// syncLoop是处理更改的主循环。它感知来自三个channel(file,apiserver,http)的pod的变化,并且聚合它们。有任何的改变发生,将运行状态同步为期望状态。反之,则在每个同步周期内同步最后已知的期望状态。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
在syncLoop()中则通过kl.syncLoopIteration()针对pod具体执行具体的操作。
代码语言:javascript复制
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
2、在syncLoopIteration有几个重要的参数,如下所示:
代码语言:javascript复制// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. housekeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * housekeepingCh: trigger cleanup of pods
// * health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
SyncHandler是一个interface。包含对pod常见操作的几个方法。该接口由kubelet来实现。如下所示:
代码语言:javascript复制// SyncHandler is an interface implemented by Kubelet, for testability
# pod创建、更新、 删除...
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
3、针对pod可进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法
代码语言:javascript复制// These constants identify the PodOperations that can be made on a pod configuration.
const (
// SET is the current pod configuration.
SET PodOperation = iota
// ADD signifies pods that are new to this source.
ADD
// DELETE signifies pods that are gracefully deleted from this source.
DELETE
// REMOVE signifies pods that have been removed from this source.
REMOVE
// UPDATE signifies pods have been updated in this source.
UPDATE
// RECONCILE signifies pods that have unexpected status in this source,
// kubelet should reconcile status with this source.
RECONCILE
)
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
4、HandlePodAdditions又是如何去执行创建pod的呢?主要有以下几个操作:
代码语言:javascript复制1. 根据pod的创建时间进行排序
sort.Sort(sliceutils.PodsByCreationTime(pods))
2. 将pod添加到podmanager中.因为kubelet它会依赖这个pod manager作为期望状态的一个凭证。
如果一个在pod manager中无法查询,那么就意味着它已经被apiserver删除了,不再需要其他操作
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
3. 判断pod是不是静态pod
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
4. 通过dispatchWork分发任务
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
5. 将pod加入到probe manager,即健康检查.包括startup probe、liveness probe、readiness probe。
kl.probeManager.AddPod(pod)
dispatchWork又做了哪些事情呢?如下:
代码语言:javascript复制// Run the sync in an async worker. 在一个异步worker中执行同步
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
那么UpdatePod()又做哪些事情呢?
代码语言:javascript复制 // Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
// 创建一个新的pod worker,意味着这是一个新的pod
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
managePodLoop()去执行同步。
代码语言:javascript复制for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
// 这里去做同步
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
5、最终调用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()进行pod的创建
代码语言:javascript复制// SyncPod syncs the running pod into the desired pod by executing following steps:
// 执行以下的步骤将运行的pod同步到期望的状态
// 1. Compute sandbox and container changes.
// 计算sanbox和container改变
// 2. Kill pod sandbox if necessary.
// 如果有必要就删除pod sandbox
// 3. Kill any containers that should not be running.
// 删除不需要运行的容器
// 4. Create sandbox if necessary.
// 需要的情况下创建sandbox
// 5. Create ephemeral containers.
// 创建临时容器
// 6. Create init containers.
// 创建初始化容器
// 7. Create normal containers.
// 创建普通容器
func (m *kubeGenericRuntimeManager) SyncPod()
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
return
}
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start("init container", containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
// Step 7: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start("container", containerStartSpec(&pod.Spec.Containers[idx]))
}
6、另外,pod worker还要做以下事情:
代码语言:javascript复制# 创建pod数据目录、volume、获取image pull secrets。。。
newPodWorkers(klet.syncPod --->pkg/kubelet/kubelet.go) //通过syncPod
kubetypes.SyncPodKill
kubetypes.SyncPodCreate
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
runnable.Admit
kubetypes.IsStaticPod(pod)
kl.makePodDataDirs(pod)
kl.volumeManager.WaitForAttachAndMount(pod)
kl.getPullSecretsForPod(pod)
kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go)