k8s Indexer索引器实现

1.Indexer是什么?
Indexer是client-go用来存储资源对象并自带索引功能的本地存储(内存),Reflector从DeltaFIFO中将消费出来的资源对象存储到Indexer。Indexer中的数据与Etcd集群中的数据保持完全一致。client-go可以很方便的从本地存储(内存)读取相应的资源对象数据,而不用每次都远程从Etcd集群中读取数据,减轻kube-apiserver和Etcd的压力。

2.Indexer涉及到的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// staging/src/k8s.io/client-go/tools/cache/index.go

// 存储缓存数据
type Index map[string]sets.String
// 索引器,key为索引器名称;value为索引器的实现函数
type Indexers map[string]IndexFunc
// 索引器实现函数,接受一个资源对象返回一个字符串列表
type IndexFunc func(obj interface{}) ([]string, error)

// key这索引器名称,value为Index
type Indices map[string]Index

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
// cache.NewIndexer返回一个Indexer对象,为cache{}
type cache struct {
// 可看作一个并发安全的Map,并实现了ByIndex方法(下文会用到),
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}

// ThreadSafeStore的实现
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices
}

3.通过一个例子了解Indexer的使用和实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 代码来自于<<kubernetes源码剖析>>
package main
import (
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"strings"
)

func UsersIndexFunc(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
userString := pod.Annotations["users"]
return strings.Split(userString, ","), nil
}

func main() {
// 实例化一个Indexer对象,第一个参数为计算对象key值的函数,第二个参数用于定义索引器,其中key为索引器名称(byUser),值为索引器函数
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
"byUser": UsersIndexFunc,
})
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one",
Annotations: map[string]string{"users": "ernie,bert"}}}

// 添加pod对象到index
index.Add(pod1)
// 执行索引器得到索引结果
erniePods, err := index.ByIndex("byUser", "ernie")
if err != nil {
panic(err)
}
for _, erniePod := range erniePods {
fmt.Println(erniePod.(*v1.Pod).Name)
}
}

4.程序执行步骤
初始化后的index对象,items和indices都为空
image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// index.Add(pod1)
// 将资源对象添加到缓存
func (c *cache) Add(obj interface{}) error {
// 计算对象的key
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
}

// 真正将资源对象添加到缓存中的操作
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
// 添加数据到缓存中
c.items[key] = obj
// 更新indices
c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 遍历索引器
for name, indexFunc := range c.indexers {
// indexValues = ['ernie','bert']
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
// 判断Index是否存在,不存在则新建
if index == nil {
index = Index{}
c.indices[name] = index
}

// 遍历indexValues(['ernie','bert']),构建一个反向映射,从indexValue指向key
// 类似于构建成{"ernie":"one","bert":"one"}字典结构,用于快速通过索引找到对应的资源对象
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}

执行完Add操作后的对象
image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// index.ByIndex 通过索引查询资源对象
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}

//
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

// 通过索引器名称(此例为:byUser),获取索引器函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

// 通过索引器名称,获取对应的缓存数据
index := c.indices[indexName]

// 获取index对应的值,此例indexedValue=ernie,set={"one"}
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())

// 遍历set,从缓存中取出对象并返回
for key := range set {
list = append(list, c.items[key])
}

return list, nil
}

5.小结
Indexer通过索引器函数来实现索引的可定制化,用户可以定制自己的索引器实现函数,实现自定义条件查询。Index构建了一个类似于map的结构(只不过值为k8s实现的set类型),键为索引,值为资源对象通过keyFunc计算出来的key,可提供快速查询的功能。要想更好理解的话,最好自己调试一下。


1.Kubernetes源码剖析