K8s源码分析(9)-codec的decode和encode操作

2021-11-12 11:05:30 浏览数 (1)

上一篇文章中,我们主要去介绍了 codec 和 codec factory 对象的创建过程,包括利用支持各种不同协议格式(json, yaml, prtotbuf)的 serializer 对象来构建 codec factory, 以及利用 codec factory 去创建最终用来完成 decode 和 encode 操作的 codec 对象。在这里我们主要来介绍 codec 对象是如何完成 decode 和 encode 操作的。

codec 的 decode 操作

codec 对象的 Decode() 方法完成 decode 操作,即将请求中的数据转化成相应目标版本的 kuberbenes resource, 然后会将其转化为相关 resource 的内部版本来管理, 例如对于一个创建资源的请求来说, 我们以常见的 apps/v1/deployment 资源为例。 decode 操作首先会将 v1 版本的 deployment 对象在请求数据中 decode 出来,然后在将其转化为 apps group 下内部版本的 deployment 对象。在后续处理中,一般对于资源创建请求就是将这个内部版本的资源持久化在 etcd 集群里, 其核心逻辑如下:

codec 对象的 Decode() 方法核心代码如下:

代码语言:javascript复制
// staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go
func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  // If the into object is unstructured and expresses an opinion about its group/version,
  // create a new instance of the type so we always exercise the conversion path (skips short-circuiting on `into == obj`)
  decodeInto := into
  if into != nil {
    if _, ok := into.(runtime.Unstructured); ok && !into.GetObjectKind().GroupVersionKind().GroupVersion().Empty() {
      decodeInto = reflect.New(reflect.TypeOf(into).Elem()).Interface().(runtime.Object)
    }
  }

  obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto)
  if err != nil {
    return nil, gvk, err
  }

  if d, ok := obj.(runtime.NestedObjectDecoder); ok {
    if err := d.DecodeNestedObjects(runtime.WithoutVersionDecoder{c.decoder}); err != nil {
      return nil, gvk, err
    }
  }

  // if we specify a target, use generic conversion.
  if into != nil {
    // perform defaulting if requested
    if c.defaulter != nil {
      c.defaulter.Default(obj)
    }

    // Short-circuit conversion if the into object is same object
    if into == obj {
      return into, gvk, nil
    }

    if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil {
      return nil, gvk, err
    }

    return into, gvk, nil
  }

  // perform defaulting if requested
  if c.defaulter != nil {
    c.defaulter.Default(obj)
  }

  out, err := c.convertor.ConvertToVersion(obj, c.decodeVersion)
  if err != nil {
    return nil, gvk, err
  }
  return out, gvk, nil
}

从 end to end 角度看,资源 decode 流程如下所示:

codec 的 encode 操作

codec 对象的 doEncode() 方法完成 decode 核心操作,即将目标对象转化成相应版本的对象, 然后序列化到响应中去。例如对于一个获取资源的请求来说(一般为资源的查询请求), 我们以常见的 apps/v1/deployment 资源为例。 encode 操作会首先将目标对象转化成相应版本的对象,这个目标对象一般是在 etcd 集群中获取的内部版本对象(根据以前文章, kubenetes 各种资源永远会以内部版本的形式存储在 etcd 集群中)。然后在 convert 转化成请求中的 v1 版本的对象,最后序列化数据到 response 数据流中, 其核心逻辑如下:

codec 对象的 doEncode() 方法核心代码如下:

代码语言:javascript复制
// staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go
func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
  switch obj := obj.(type) {
  case *runtime.Unknown:
    return c.encoder.Encode(obj, w)
  case runtime.Unstructured:
    // An unstructured list can contain objects of multiple group version kinds. don't short-circuit just
    // because the top-level type matches our desired destination type. actually send the object to the converter
    // to give it a chance to convert the list items if needed.
    if _, ok := obj.(*unstructured.UnstructuredList); !ok {
      // avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl)
      objGVK := obj.GetObjectKind().GroupVersionKind()
      if len(objGVK.Version) == 0 {
        return c.encoder.Encode(obj, w)
      }
      targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK})
      if !ok {
        return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion)
      }
      if targetGVK == objGVK {
        return c.encoder.Encode(obj, w)
      }
    }
  }

  gvks, isUnversioned, err := c.typer.ObjectKinds(obj)
  if err != nil {
    return err
  }

  objectKind := obj.GetObjectKind()
  old := objectKind.GroupVersionKind()
  // restore the old GVK after encoding
  defer objectKind.SetGroupVersionKind(old)

  if c.encodeVersion == nil || isUnversioned {
    if e, ok := obj.(runtime.NestedObjectEncoder); ok {
      if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
        return err
      }
    }
    objectKind.SetGroupVersionKind(gvks[0])
    return c.encoder.Encode(obj, w)
  }

  // Perform a conversion if necessary
  out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
  if err != nil {
    return err
  }

  if e, ok := out.(runtime.NestedObjectEncoder); ok {
    if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Version: c.encodeVersion, Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
      return err
    }
  }

  // Conversion is responsible for setting the proper group, version, and kind onto the outgoing object
  return c.encoder.Encode(out, w)
}

从 end to end 角度看,资源 encode 流程如下所示:

目前先我们写到这里,在下一篇文章中我们继续来介绍 kubernetest 资源相关的另一个比较重要概念, schema。

0 人点赞