上一篇文章中,我们主要去介绍了 codec 和 codec factory 对象的创建过程,包括利用支持各种不同协议格式(json, yaml, prtotbuf)的 serializer 对象来构建 codec factory, 以及利用 codec factory 去创建最终用来完成 decode 和 encode 操作的 codec 对象。在这里我们主要来介绍 codec 对象是如何完成 decode 和 encode 操作的。
codec 的 decode 操作
codec 对象的 Decode() 方法完成 decode 操作,即将请求中的数据转化成相应目标版本的 kuberbenes resource, 然后会将其转化为相关 resource 的内部版本来管理, 例如对于一个创建资源的请求来说, 我们以常见的 apps/v1/deployment 资源为例。 decode 操作首先会将 v1 版本的 deployment 对象在请求数据中 decode 出来,然后在将其转化为 apps group 下内部版本的 deployment 对象。在后续处理中,一般对于资源创建请求就是将这个内部版本的资源持久化在 etcd 集群里, 其核心逻辑如下:
codec 对象的 Decode() 方法核心代码如下:
代码语言:javascript复制// staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go
func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
// If the into object is unstructured and expresses an opinion about its group/version,
// create a new instance of the type so we always exercise the conversion path (skips short-circuiting on `into == obj`)
decodeInto := into
if into != nil {
if _, ok := into.(runtime.Unstructured); ok && !into.GetObjectKind().GroupVersionKind().GroupVersion().Empty() {
decodeInto = reflect.New(reflect.TypeOf(into).Elem()).Interface().(runtime.Object)
}
}
obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto)
if err != nil {
return nil, gvk, err
}
if d, ok := obj.(runtime.NestedObjectDecoder); ok {
if err := d.DecodeNestedObjects(runtime.WithoutVersionDecoder{c.decoder}); err != nil {
return nil, gvk, err
}
}
// if we specify a target, use generic conversion.
if into != nil {
// perform defaulting if requested
if c.defaulter != nil {
c.defaulter.Default(obj)
}
// Short-circuit conversion if the into object is same object
if into == obj {
return into, gvk, nil
}
if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil {
return nil, gvk, err
}
return into, gvk, nil
}
// perform defaulting if requested
if c.defaulter != nil {
c.defaulter.Default(obj)
}
out, err := c.convertor.ConvertToVersion(obj, c.decodeVersion)
if err != nil {
return nil, gvk, err
}
return out, gvk, nil
}
从 end to end 角度看,资源 decode 流程如下所示:
codec 的 encode 操作
codec 对象的 doEncode() 方法完成 decode 核心操作,即将目标对象转化成相应版本的对象, 然后序列化到响应中去。例如对于一个获取资源的请求来说(一般为资源的查询请求), 我们以常见的 apps/v1/deployment 资源为例。 encode 操作会首先将目标对象转化成相应版本的对象,这个目标对象一般是在 etcd 集群中获取的内部版本对象(根据以前文章, kubenetes 各种资源永远会以内部版本的形式存储在 etcd 集群中)。然后在 convert 转化成请求中的 v1 版本的对象,最后序列化数据到 response 数据流中, 其核心逻辑如下:
codec 对象的 doEncode() 方法核心代码如下:
代码语言:javascript复制// staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go
func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
switch obj := obj.(type) {
case *runtime.Unknown:
return c.encoder.Encode(obj, w)
case runtime.Unstructured:
// An unstructured list can contain objects of multiple group version kinds. don't short-circuit just
// because the top-level type matches our desired destination type. actually send the object to the converter
// to give it a chance to convert the list items if needed.
if _, ok := obj.(*unstructured.UnstructuredList); !ok {
// avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl)
objGVK := obj.GetObjectKind().GroupVersionKind()
if len(objGVK.Version) == 0 {
return c.encoder.Encode(obj, w)
}
targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK})
if !ok {
return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion)
}
if targetGVK == objGVK {
return c.encoder.Encode(obj, w)
}
}
}
gvks, isUnversioned, err := c.typer.ObjectKinds(obj)
if err != nil {
return err
}
objectKind := obj.GetObjectKind()
old := objectKind.GroupVersionKind()
// restore the old GVK after encoding
defer objectKind.SetGroupVersionKind(old)
if c.encodeVersion == nil || isUnversioned {
if e, ok := obj.(runtime.NestedObjectEncoder); ok {
if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
return err
}
}
objectKind.SetGroupVersionKind(gvks[0])
return c.encoder.Encode(obj, w)
}
// Perform a conversion if necessary
out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
if err != nil {
return err
}
if e, ok := out.(runtime.NestedObjectEncoder); ok {
if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Version: c.encodeVersion, Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
return err
}
}
// Conversion is responsible for setting the proper group, version, and kind onto the outgoing object
return c.encoder.Encode(out, w)
}
从 end to end 角度看,资源 encode 流程如下所示:
目前先我们写到这里,在下一篇文章中我们继续来介绍 kubernetest 资源相关的另一个比较重要概念, schema。