上一篇文章里,我们主要介绍了和对象存储相关的组件 Store 接口以及它的实现结构体 cache,本质上说该接口和它的实现是对以前文章中介绍的 ThreadSafeStore 接口和它具体实现的更高级抽象,即 ThreadSafeStore 接口的操作需要针对资源对象以及对象的 key, 而 Store 接口有能力获取资源对象的 key, 所以该接口只针对资源对象操作。当然,两种组件针对资源对象的操作在底层上都是并发安全的。本篇文章中我们主要来介绍 Queue 和 DeltaFIFO 组件 ,也是资源对象存储组件。
Queue 接口
Queue 是接口,图解和源码如下:
代码语言:javascript复制//staging/src/k8s.io/client-go/tools/cache/fifo.go
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
- 该接口是对于上一篇文章中介绍的 Store 接口的扩展,当然本质上也是并发安全的,因为 Store 就是并发安全的。
- 该接口中扩展了一系列额外方法,例如 Pop/AddIfNotPresent/HasSynced/Close 等等,其中的 Pop 方法就是典型的 Queue 数据结构相关操作。
Delta 结构体
Delta 结构体定义资源对象的创建,更新,删除等操作的元数据信息,图解和源码如下:
代码语言:javascript复制//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
- Delta 结构体定义中封装了 DelteType 和 Object,代表针对资源对象的操作类型以及资源对象本身。
- Added/Updated/Deleted/Replaced/Sync 等针对资源的不同操作类型被定义在 DelteType 中,其本质上就是 String 的别名。
- 从上面图解中看,一个资源对象会有其对应的 key,而针对同一个资源对象的操作可能会有多个,例如创建,更新,删除等等, 所以一个 key 会对应多个 Delta 对象。
DeltaFIFO 结构体
DeltaFIFI 结构体实现了上面介绍的 Queue 接口,针对的元素都是 Delta 类型的对象,其图解和源码如下:
代码语言:javascript复制//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
}
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error){...}
func (f *DeltaFIFO) Add(obj interface{}) error{...}
func (f *DeltaFIFO) Update(obj interface{}) error{...}
func (f *DeltaFIFO) Delete(obj interface{}){...}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error{...}
......//other methods impl defined in Store interface
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
- 该结构体中有 map[string]Deltas 类型的属性,针对的元素都是 Deltas 类型,所以本质上该结构体就是存储资源对象操作元数据的一个队列。
- 该结构体实现了上面介绍的 Queue 接口。
- 在 Pop 方法的实现中,我们看到的就是典型队列的列操作定义,取出第一个元素,然后用传来的 PopProcessFunc 处理该元素。
DeltaFIFO 结构体的创建
DeltaFIFI 结构体创建的源码如下:
代码语言:javascript复制//src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFOOptions struct {
KeyFunction KeyFunc
KnownObjects KeyListerGetter
EmitDeltaTypeReplaced bool
}
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
}
// k8s.io/client-go/tools/cache/store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() "/" meta.GetName(), nil
}
return meta.GetName(), nil
}
- 结构体 DeltaFIFOOptions 封装定义了创建 DeltaFIFO 对象所需要的一些关键参数,例如 KeyFunc 等。
- DeltaFIFO 可以通过 DeltaFIFOOptions 来创建,也可以直接用相关参数进行,例如直接传入 KeyFunc 和 KeyListerGetter。
- 对于没有指定 KeyFunc 的时候,默认会使用 MetaNamespaceKeyFunc 来作为资源对象的 key 生成函数。
- 在 MetaNamespaceKeyFunc 的定义中,如果资源是基于 namespace 的,那么 key 为 {namespace}/{resource-name} 。如果资源对象不是基于 namespace 的,那么 key 的值为
目前我们先写到这里,在下一篇文章中我们继续来介绍 kubernetes 资源对象的 list and watch 机制。