K8s源码分析(26)-Queue组件和DeltaFIFO组件

2022-10-30 13:27:15 浏览数 (1)

上一篇文章里,我们主要介绍了和对象存储相关的组件 Store 接口以及它的实现结构体 cache,本质上说该接口和它的实现是对以前文章中介绍的 ThreadSafeStore 接口和它具体实现的更高级抽象,即 ThreadSafeStore 接口的操作需要针对资源对象以及对象的 key, 而 Store 接口有能力获取资源对象的 key, 所以该接口只针对资源对象操作。当然,两种组件针对资源对象的操作在底层上都是并发安全的。本篇文章中我们主要来介绍 Queue 和 DeltaFIFO 组件 ,也是资源对象存储组件。

Queue 接口

Queue 是接口,图解和源码如下:

代码语言:javascript复制
//staging/src/k8s.io/client-go/tools/cache/fifo.go
type Queue interface {
  Store

  Pop(PopProcessFunc) (interface{}, error)

  AddIfNotPresent(interface{}) error

  HasSynced() bool

  Close()
}
  • 该接口是对于上一篇文章中介绍的 Store 接口的扩展,当然本质上也是并发安全的,因为 Store 就是并发安全的。
  • 该接口中扩展了一系列额外方法,例如 Pop/AddIfNotPresent/HasSynced/Close 等等,其中的 Pop 方法就是典型的 Queue 数据结构相关操作。

Delta 结构体

Delta 结构体定义资源对象的创建,更新,删除等操作的元数据信息,图解和源码如下:

代码语言:javascript复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string

type Delta struct {
  Type   DeltaType
  Object interface{}
}

type Deltas []Delta

const (
  Added   DeltaType = "Added"
  Updated DeltaType = "Updated"
  Deleted DeltaType = "Deleted"
  Replaced DeltaType = "Replaced"
  Sync DeltaType = "Sync"
)
  • Delta 结构体定义中封装了 DelteType 和 Object,代表针对资源对象的操作类型以及资源对象本身。
  • Added/Updated/Deleted/Replaced/Sync 等针对资源的不同操作类型被定义在 DelteType 中,其本质上就是 String 的别名。
  • 从上面图解中看,一个资源对象会有其对应的 key,而针对同一个资源对象的操作可能会有多个,例如创建,更新,删除等等, 所以一个 key 会对应多个 Delta 对象。

DeltaFIFO 结构体

DeltaFIFI 结构体实现了上面介绍的 Queue 接口,针对的元素都是 Delta 类型的对象,其图解和源码如下:

代码语言:javascript复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
  lock sync.RWMutex
  cond sync.Cond

  items map[string]Deltas

  queue []string

  populated bool

  initialPopulationCount int

  keyFunc KeyFunc

  knownObjects KeyListerGetter

  closed bool

  emitDeltaTypeReplaced bool
}

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error){...}
func (f *DeltaFIFO) Add(obj interface{}) error{...}
func (f *DeltaFIFO) Update(obj interface{}) error{...}
func (f *DeltaFIFO) Delete(obj interface{}){...}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error{...} 
......//other methods impl defined in Store interface
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  f.lock.Lock()
  defer f.lock.Unlock()
  for {
    for len(f.queue) == 0 {
      if f.closed {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    id := f.queue[0]
    f.queue = f.queue[1:]
    depth := len(f.queue)
    if f.initialPopulationCount > 0 {
      f.initialPopulationCount--
    }
    item, ok := f.items[id]
    if !ok {
      klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
      continue
    }
    delete(f.items, id)
    if depth > 10 {
      trace := utiltrace.New("DeltaFIFO Pop Process",
        utiltrace.Field{Key: "ID", Value: id},
        utiltrace.Field{Key: "Depth", Value: depth},
        utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
      defer trace.LogIfLong(100 * time.Millisecond)
    }
    err := process(item)
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err
    }
    return item, err
  }
}
  • 该结构体中有 map[string]Deltas 类型的属性,针对的元素都是 Deltas 类型,所以本质上该结构体就是存储资源对象操作元数据的一个队列。
  • 该结构体实现了上面介绍的 Queue 接口。
  • 在 Pop 方法的实现中,我们看到的就是典型队列的列操作定义,取出第一个元素,然后用传来的 PopProcessFunc 处理该元素。

DeltaFIFO 结构体的创建

DeltaFIFI 结构体创建的源码如下:

代码语言:javascript复制
//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFOOptions struct {

  KeyFunction KeyFunc

  KnownObjects KeyListerGetter

  EmitDeltaTypeReplaced bool
}

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KeyFunction:  keyFunc,
    KnownObjects: knownObjects,
  })
}

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  if opts.KeyFunction == nil {
    opts.KeyFunction = MetaNamespaceKeyFunc
  }

  f := &DeltaFIFO{
    items:        map[string]Deltas{},
    queue:        []string{},
    keyFunc:      opts.KeyFunction,
    knownObjects: opts.KnownObjects,

    emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  }
  f.cond.L = &f.lock
  return f
}

// k8s.io/client-go/tools/cache/store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
  if key, ok := obj.(ExplicitKey); ok {
    return string(key), nil
  }
  meta, err := meta.Accessor(obj)
  if err != nil {
    return "", fmt.Errorf("object has no meta: %v", err)
  }
  if len(meta.GetNamespace()) > 0 {
    return meta.GetNamespace()   "/"   meta.GetName(), nil
  }
  return meta.GetName(), nil
}
  • 结构体 DeltaFIFOOptions 封装定义了创建 DeltaFIFO 对象所需要的一些关键参数,例如 KeyFunc 等。
  • DeltaFIFO 可以通过 DeltaFIFOOptions 来创建,也可以直接用相关参数进行,例如直接传入 KeyFunc 和 KeyListerGetter。
  • 对于没有指定 KeyFunc 的时候,默认会使用 MetaNamespaceKeyFunc 来作为资源对象的 key 生成函数。
  • 在 MetaNamespaceKeyFunc 的定义中,如果资源是基于 namespace 的,那么 key 为 {namespace}/{resource-name} 。如果资源对象不是基于 namespace 的,那么 key 的值为

目前我们先写到这里,在下一篇文章中我们继续来介绍 kubernetes 资源对象的 list and watch 机制。

0 人点赞