前言
这一节终于来到了我们最为熟悉的一个对象 deployment,通常这可能是我们学习 k8s 接触的第一个大对象了,我们一般的应用也是以 deployment 来进行部署的,那么对于熟悉的它来说,我们应该从源码里面去找什么目标来看呢?对于我来说,deployment 的更新是我最好奇的,在我重新修改镜像版本之后,deployment 是如何一步步控制已有的 pod 进行更新的呢?这一节我们就从源码中揭秘这个过程。
前置知识
- deployment 的基础使用
- 滚动更新
心路历程
在我看来其他的属性与 pod 类似,而 deployment 作为一个 pod 的集合。那,为什么 deployment 要让 pod 的有多个副本呢?从最初的角度角度来说肯定是高可用了,所以 deployment 中最为关键的就是对 pod 的控制,也就是当 pod 的数量变化的时候,它是如何操作的。
码前提问
- deployment 是由哪个对象控制的?
- 应用更新的时候 deployment 是如何控制更新过程的?
源码分析
寻码过程
像 deployment 这样的源码比起其他就好找很多了,毕竟命名比较直接。在看来前几节之后,我不知道你是否发现了一个规律。通常看源码的正向思路可以被总结为:
- 找到对应实现的数据结构,通常是一个或多个结构体
- 看它的初始化,初始化能告诉你其中哪些必要的准备步骤,和具体一些字段的基础能力
- 看它的方法,通常就能知道你想要具体实现原理了
Deployment 结构
话不多说,先找到它的数据结构
代码语言:javascript复制// vendor/k8s.io/api/apps/v1/types.go:355
type Deployment struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the Deployment.
// optional
Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Most recently observed status of the Deployment.
// optional
Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
可以看到,数据结构的定义和我们平常使用的 yaml 文件的定义是一一对应的,非常容易理解,可以简单浏览一下 Spec 的属性。
那么关键的问题来了,是哪个结构在控制 deployment?于是开始寻找 Deployment 的引用,看哪些位置在使用这个数据结构,引用很多,但是你只需要按文件去看就可以了。
阅读其他源码时,如果一个对象的引用,不要去寻找每一个代码引用的位置,而应该先从文件入手,如果引用的文件还是很多,可以从包的角度入手,一个包下通常能力方向也是类似的
于是,我找到了 DeploymentController
这个关键的对象(看命名也应该是它了,控制器嘛),今天我们后面就是围绕着它展开的。注意哦,
DeploymentController
DeploymentController 结构
还是类似的,先来看看结构
代码语言:javascript复制// pkg/controller/deployment/deployment_controller.go:66
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
// To allow injection of syncDeployment for testing.
syncHandler func(ctx context.Context, dKey string) error
// used for unit testing
enqueueDeployment func(deployment *apps.Deployment)
// dLister can list/get deployments from the shared informer's store
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister
// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dListerSynced cache.InformerSynced
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}
注意两个点就好,一个是 syncHandler
还有一个是 queue
看到这两个字段我心里其实已经有个大概的思路了。下面就要用到我们在第一节提到的 informer 机制了。
NewDeploymentController
初始化是在 NewDeploymentController
方法中,我省略了其中一些部分,留下了重要的几个例子
// pkg/controller/deployment/deployment_controller.go:101
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
//....
dc := &DeploymentController{
//....
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
//....
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
//....
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
//....
return dc, nil
}
有了前面的知识,这里的代码我们就很容易理解了,关键是在于注册了有个 ResourceEvent
处理的各种能力,比如当 Add 事件来的时候,调用 addDeployment
。先留心注意下面的两个部分 syncHandler
和 enqueueDeployment
后面会用到。接下来我们肯定会好奇,addDeployment
究竟是如何处理这个事件的,所以我们继续深入看里面的实现。
addDeployment
这里面的调用链路很清晰:addDeployment
-> enqueueDeployment
-> enqueue
-> dc.queue.Add(key)
// pkg/controller/deployment/deployment_controller.go:391
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.Add(key)
}
其实这些处理的工作,将 deployment 对应的 key 丢到队列里面去,所以下面我们只需要找到哪里在处理队列中的消息就可以了
Run
地方也很好找,是在 Run
里面,运行的时候启动了一定数量的 worker,然后每个 worker 循环去取消息。
// pkg/controller/deployment/deployment_controller.go:157
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
//...
for i := 0; i < workers; i {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
}
<-ctx.Done()
}
代码语言:javascript复制// pkg/controller/deployment/deployment_controller.go:473
func (dc *DeploymentController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
}
}
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)
return true
}
整个路径就是:Run
-> worker
-> processNextWorkItem
-> syncHandler
。
可以看到就是一个标准的生产者消费者模型。然后关键就来到了 syncHandler
变量,还记得 dc.syncHandler = dc.syncDeployment
吗?对的,它在初始化时候被赋值为了 syncDeployment
这就到了我们这一节的重点方法了,注意看。
syncDeployment
这里我不想省略太多的代码,因为它本身是一个顺序结构,很容易理解。
代码语言:javascript复制// pkg/controller/deployment/deployment_controller.go:581
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
//...
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
//...
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
//...
if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
- 第一步就是找到 deployment
- 第二步是找到 rsList 也是我们说的 RS
- 然后找到 podMap
寻找完了之后就开始根据状态进行操作。有哪些操作呢?
- rollback 回滚
- scaling 判断现在是不是在调整大小
- rollout 关键来了,这就是更新,有两种模式
- Recreate 重建
- Rolling 滚动更新
这里我们最关心的策略终于暴露出来了,那就是滚动更新了,我们赶快来看看里面是怎么实现的。
rolloutRolling
代码语言:javascript复制// pkg/controller/deployment/rolling.go:31
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
步骤其实比我想的要简单:
- 得到新旧的 RS 进行比较
- 先看要不要扩副本数,如果要则直接扩,并且就直接
sync
了不继续了 - 然后才轮到缩容,能操作就直接操作了。
然后,我们来回忆一下 pod 数量在实际更新中的变动过程,如果目前的 pod 是 3/3(目标/现有),那么扩容之后就会变成 3/4,此时下一次进来就不能扩了,只能变成缩了变成 3/3 然后不断往复,直到所以 pod 都满足期望要求的版本。想想真的蛮奇妙的,就是利用了简单的状态管理就实现了整个滚动更新过程,慢慢的就靠近了目标。这可能就是状态机的优雅吧,你只管改状态,剩下的协调交给我。
码后解答
- deployment 是由哪个对象控制的?
DeploymentController
- 应用更新的时候 deployment 是如何控制更新过程的?
- 关键其实就在于:
rolloutRolling
,将目标态的 pod 添加,打破平衡(状态变化),将不满足的旧状态移除,从而慢慢协调到最终状态。再说的简单一点:先尝试scaledUp
然后尝试scaledDown
- 关键其实就在于:
总结提升
设计上
deployment 这里我们能学到哪些设计上的提升点呢?我个人有下面几个
- 首先就是
NewDeploymentController
里面对于 Informer 机制的运用 - 利用策略模式
RollingUpdate
和Recreate
两种不同实现很清晰 - 利用状态的管理来构建当前
rolloutRolling
的操作,对于编码来说清晰
熟悉了这部分的实现,那么对于其他对象类似的功能,我觉得你应该也能有自己的把握了。