《一起读 kubernetes 源码》deployment 滚动更新是如何实现的

2024-07-13 15:00:54 浏览数 (3)

前言

这一节终于来到了我们最为熟悉的一个对象 deployment,通常这可能是我们学习 k8s 接触的第一个大对象了,我们一般的应用也是以 deployment 来进行部署的,那么对于熟悉的它来说,我们应该从源码里面去找什么目标来看呢?对于我来说,deployment 的更新是我最好奇的,在我重新修改镜像版本之后,deployment 是如何一步步控制已有的 pod 进行更新的呢?这一节我们就从源码中揭秘这个过程。

前置知识

  • deployment 的基础使用
  • 滚动更新

心路历程

在我看来其他的属性与 pod 类似,而 deployment 作为一个 pod 的集合。那,为什么 deployment 要让 pod 的有多个副本呢?从最初的角度角度来说肯定是高可用了,所以 deployment 中最为关键的就是对 pod 的控制,也就是当 pod 的数量变化的时候,它是如何操作的。

码前提问

  1. deployment 是由哪个对象控制的?
  2. 应用更新的时候 deployment 是如何控制更新过程的?

源码分析

寻码过程

像 deployment 这样的源码比起其他就好找很多了,毕竟命名比较直接。在看来前几节之后,我不知道你是否发现了一个规律。通常看源码的正向思路可以被总结为:

  1. 找到对应实现的数据结构,通常是一个或多个结构体
  2. 看它的初始化,初始化能告诉你其中哪些必要的准备步骤,和具体一些字段的基础能力
  3. 看它的方法,通常就能知道你想要具体实现原理了

Deployment 结构

话不多说,先找到它的数据结构

代码语言:javascript复制
// vendor/k8s.io/api/apps/v1/types.go:355
type Deployment struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	//  optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Specification of the desired behavior of the Deployment.
	//  optional
	Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

	// Most recently observed status of the Deployment.
	//  optional
	Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

可以看到,数据结构的定义和我们平常使用的 yaml 文件的定义是一一对应的,非常容易理解,可以简单浏览一下 Spec 的属性。

那么关键的问题来了,是哪个结构在控制 deployment?于是开始寻找 Deployment 的引用,看哪些位置在使用这个数据结构,引用很多,但是你只需要按文件去看就可以了。

阅读其他源码时,如果一个对象的引用,不要去寻找每一个代码引用的位置,而应该先从文件入手,如果引用的文件还是很多,可以从包的角度入手,一个包下通常能力方向也是类似的

于是,我找到了 DeploymentController 这个关键的对象(看命名也应该是它了,控制器嘛),今天我们后面就是围绕着它展开的。注意哦,

DeploymentController

DeploymentController 结构

还是类似的,先来看看结构

代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:66
type DeploymentController struct {
	// rsControl is used for adopting/releasing replica sets.
	rsControl controller.RSControlInterface
	client    clientset.Interface

	eventBroadcaster record.EventBroadcaster
	eventRecorder    record.EventRecorder

	// To allow injection of syncDeployment for testing.
	syncHandler func(ctx context.Context, dKey string) error
	// used for unit testing
	enqueueDeployment func(deployment *apps.Deployment)

	// dLister can list/get deployments from the shared informer's store
	dLister appslisters.DeploymentLister
	// rsLister can list/get replica sets from the shared informer's store
	rsLister appslisters.ReplicaSetLister
	// podLister can list/get pods from the shared informer's store
	podLister corelisters.PodLister

	// dListerSynced returns true if the Deployment store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	dListerSynced cache.InformerSynced
	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	rsListerSynced cache.InformerSynced
	// podListerSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podListerSynced cache.InformerSynced

	// Deployments that need to be synced
	queue workqueue.RateLimitingInterface
}

注意两个点就好,一个是 syncHandler 还有一个是 queue 看到这两个字段我心里其实已经有个大概的思路了。下面就要用到我们在第一节提到的 informer 机制了。

NewDeploymentController

初始化是在 NewDeploymentController 方法中,我省略了其中一些部分,留下了重要的几个例子

代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:101
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	//....
	dc := &DeploymentController{
		//....
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	//....

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dc.addDeployment(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dc.updateDeployment(logger, oldObj, newObj)
		},
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: func(obj interface{}) {
			dc.deleteDeployment(logger, obj)
		},
	})
	//....

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	//....
	return dc, nil
}

有了前面的知识,这里的代码我们就很容易理解了,关键是在于注册了有个 ResourceEvent 处理的各种能力,比如当 Add 事件来的时候,调用 addDeployment。先留心注意下面的两个部分 syncHandlerenqueueDeployment 后面会用到。接下来我们肯定会好奇,addDeployment 究竟是如何处理这个事件的,所以我们继续深入看里面的实现。

addDeployment

这里面的调用链路很清晰:addDeployment -> enqueueDeployment -> enqueue -> dc.queue.Add(key)

代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:391
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
	key, err := controller.KeyFunc(deployment)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
		return
	}

	dc.queue.Add(key)
}

其实这些处理的工作,将 deployment 对应的 key 丢到队列里面去,所以下面我们只需要找到哪里在处理队列中的消息就可以了

Run

地方也很好找,是在 Run 里面,运行的时候启动了一定数量的 worker,然后每个 worker 循环去取消息。

代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:157
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
	//...

	for i := 0; i < workers; i   {
		go wait.UntilWithContext(ctx, dc.worker, time.Second)
	}

	<-ctx.Done()
}
代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:473
func (dc *DeploymentController) worker(ctx context.Context) {
	for dc.processNextWorkItem(ctx) {
	}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
	key, quit := dc.queue.Get()
	if quit {
		return false
	}
	defer dc.queue.Done(key)

	err := dc.syncHandler(ctx, key.(string))
	dc.handleErr(ctx, err, key)

	return true
}

整个路径就是:Run -> worker -> processNextWorkItem -> syncHandler

可以看到就是一个标准的生产者消费者模型。然后关键就来到了 syncHandler 变量,还记得 dc.syncHandler = dc.syncDeployment 吗?对的,它在初始化时候被赋值为了 syncDeployment 这就到了我们这一节的重点方法了,注意看。

syncDeployment

这里我不想省略太多的代码,因为它本身是一个顺序结构,很容易理解。

代码语言:javascript复制
// pkg/controller/deployment/deployment_controller.go:581
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
	//...

	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	if errors.IsNotFound(err) {
		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
		return nil
	}
	if err != nil {
		return err
	}

	// Deep-copy otherwise we are mutating our cache.
	// TODO: Deep-copy only when needed.
	d := deployment.DeepCopy()

	//...

	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
	// through adoption/orphaning.
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	if err != nil {
		return err
	}
	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
	// Current uses of the podMap are:
	//
	// * check if a Pod is labeled correctly with the pod-template-hash label.
	// * check that no old Pods are running in the middle of Recreate Deployments.
	podMap, err := dc.getPodMapForDeployment(d, rsList)
	if err != nil {
		return err
	}

	//...

	if d.Spec.Paused {
		return dc.sync(ctx, d, rsList)
	}

	// rollback is not re-entrant in case the underlying replica sets are updated with a new
	// revision so we should ensure that we won't proceed to update replica sets until we
	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}

	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(ctx, d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  1. 第一步就是找到 deployment
  2. 第二步是找到 rsList 也是我们说的 RS
  3. 然后找到 podMap

寻找完了之后就开始根据状态进行操作。有哪些操作呢?

  • rollback 回滚
  • scaling 判断现在是不是在调整大小
  • rollout 关键来了,这就是更新,有两种模式
    • Recreate 重建
    • Rolling 滚动更新

这里我们最关心的策略终于暴露出来了,那就是滚动更新了,我们赶快来看看里面是怎么实现的。

rolloutRolling

代码语言:javascript复制
// pkg/controller/deployment/rolling.go:31
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
	if err != nil {
		return err
	}
	allRSs := append(oldRSs, newRS)

	// Scale up, if we can.
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
	if err != nil {
		return err
	}
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	// Scale down, if we can.
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if err != nil {
		return err
	}
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	// Sync deployment status
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

步骤其实比我想的要简单:

  1. 得到新旧的 RS 进行比较
  2. 先看要不要扩副本数,如果要则直接扩,并且就直接 sync 了不继续了
  3. 然后才轮到缩容,能操作就直接操作了。

然后,我们来回忆一下 pod 数量在实际更新中的变动过程,如果目前的 pod 是 3/3(目标/现有),那么扩容之后就会变成 3/4,此时下一次进来就不能扩了,只能变成缩了变成 3/3 然后不断往复,直到所以 pod 都满足期望要求的版本。想想真的蛮奇妙的,就是利用了简单的状态管理就实现了整个滚动更新过程,慢慢的就靠近了目标。这可能就是状态机的优雅吧,你只管改状态,剩下的协调交给我。

码后解答

  1. deployment 是由哪个对象控制的?
    1. DeploymentController
  2. 应用更新的时候 deployment 是如何控制更新过程的?
    1. 关键其实就在于:rolloutRolling ,将目标态的 pod 添加,打破平衡(状态变化),将不满足的旧状态移除,从而慢慢协调到最终状态。再说的简单一点:先尝试 scaledUp 然后尝试 scaledDown

总结提升

设计上

deployment 这里我们能学到哪些设计上的提升点呢?我个人有下面几个

  1. 首先就是 NewDeploymentController 里面对于 Informer 机制的运用
  2. 利用策略模式 RollingUpdateRecreate 两种不同实现很清晰
  3. 利用状态的管理来构建当前 rolloutRolling 的操作,对于编码来说清晰

熟悉了这部分的实现,那么对于其他对象类似的功能,我觉得你应该也能有自己的把握了。

0 人点赞