K8s源码分析(25)-Store组件和Indexer组件

2022-10-30 13:25:54 浏览数 (1)

上一篇文章里,我们主要介绍了和对象存储相关的组件 ThreadSafeStore 接口以及它的实现结构体 threadSafeMap,本质上来说该接口是并发安全的资源对象存储数据结构。在本篇文章里我们主要来介绍 Store 和 Indexer ,它们同样也是资源对象存储组件。

Store 接口

Store 是接口,图解和源码如下:

代码语言:javascript复制
//k8s.io/client-go/tools/cache/store.go
type Store interface {
  Add(obj interface{}) error
  Update(obj interface{}) error
  Delete(obj interface{}) error
  List() []interface{}
  ListKeys() []string
  Get(obj interface{}) (item interface{}, exists bool, err error)
  GetByKey(key string) (item interface{}, exists bool, err error)
  Replace([]interface{}, string) error
  Resync() error
}
  • 该接口定义了对于资源增删改查的方法,例如 Add/Update/Delete/Get/List 等等。
  • 和上一篇中介绍的 ThreadSafeStore 接口有所不同,这个接口的操作只是针对象本身,而 ThreadSafeStore 的操作则是针对对象的 key 和对象本身。就此来说 Store 是对 ThreadSafeStore 的更高一层的抽象,或者说 Store 的实现里一定有能力获取对象的 key,进而用 ThreadSafeStore 的能力来操作对象。

Indexer 接口

Indexer 是接口,图解和源码如下:

代码语言:javascript复制
//k8s.io/client-go/tools/cache/store.go
type Indexer interface {
  Store
  Index(indexName string, obj interface{}) ([]interface{}, error)
  IndexKeys(indexName, indexedValue string) ([]string, error)
  ListIndexFuncValues(indexName string) []string
  ByIndex(indexName, indexedValue string) ([]interface{}, error)
  GetIndexers() Indexers
  AddIndexers(newIndexers Indexers) error
}
  • 该接口内部有上部分介绍的 Store 类型属性,所以是对于 Store 更高层的抽象定义。
  • 该接口还定义了和索引相关的方法, 例如 Index/IndexKeys 等等,所以对于该接口来说是在 Store 的基础上拥有索引的能力。

cache 结构体

cache 是一个结构体,该结构体实现了上面介绍的 Indexer 接口,其相关图解和源码如下:

代码语言:javascript复制
//k8s.io/client-go/tools/cache/store.go
type KeyFunc func(obj interface{}) (string, error)
type cache struct {
  cacheStorage ThreadSafeStore
  keyFunc KeyFunc
}

func (c *cache) Add(obj interface{}) error {
  key, err := c.keyFunc(obj)
  if err != nil {
    return KeyError{obj, err}
  }
  c.cacheStorage.Add(key, obj)
  return nil
}
func (c *cache) Update(obj interface{}) error
func (c *cache) Delete(obj interface{}) error
func (c *cache) List() []interface{}
func (c *cache) ListKeys() []string 
func (c *cache) GetIndexers() Indexers  
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error)
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error)
func (c *cache) ListIndexFuncValues(indexName string) []string 
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error)
func (c *cache) AddIndexers(newIndexers Indexers) error
.......
  • 结构体中有 cacheStore 属性,其类型为上一篇文章中介绍的 ThreadSafeStore 组件,说明该结构体也是并发安全的,并且借助 ThreadSafeStore 来实现资源的并发安全操作。
  • 结构体中有 KeyFunc 属性,主要用来计算资源对象的 key 值,然后利用上一篇文章中介绍的 ThreadSafeStore 组件来对资源操作。这样间接表明了 Store 接口和实现本质上是对 ThreadSafeStore 接口和实现的更高级抽象。
  • 结构体实现了 Indexer 接口中对于资源对象进行增删改查的一系列相关方法,例如 Add/Update/Delete/Get/List 等等。

cache 结构体的创建

cache 结构体创建的源码如下:

代码语言:javascript复制
//k8s.io/client-go/tools/cache/store.go
func NewStore(keyFunc KeyFunc) Store {
  return &cache{
    cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
    keyFunc:      keyFunc,
  }
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  return &cache{
    cacheStorage: NewThreadSafeStore(indexers, Indices{}),
    keyFunc:      keyFunc,
  }
}
  • 结构体 cache 的创建必须有 KeyFunc 作为参数,其作用是用来生成资源对象的 key,从而内部操作资源对象。
  • 结构体 cache 的创建也可以接受以前文章中介绍的 Indexers 类型参数,本质上是用来创建内部的 ThreadSafeStore 组件。
  • NewStore 方法和 NewIndexer 方法虽然实现上都是去创建上面介绍的 cache 类型对象,但本质上 NewStore 是返回 Store 类型来提供存储能力,而 NewIndexer 是返回 Indexer 类型来提供索引能力。

目前我们先写到这里,在下一篇文章中我们继续介绍 Queue 组件以及 DeltaFIFO 组件。

0 人点赞