k8s list请求源码分析

我们知道在k8sList请求是很常见的一种操作,用于获取一种资源对象的列表,如podList方法用于获取k8s集群中的所有pod资源。

在深入理解List操作之前,我们需要了解resourceVersion这个参数。

在每k8s资源对象中都有一个字段resourceVersion用于标识当前资源的版本,可用于乐观并发控制和获取资源的方式。

k8s中有三种操作方法支持resourceVersion参数。apiserver根据不同的请求方法和不同的参数,对resourceVersion
都有不同的解释。

resourceVersion unset resourceVersion=”0” resourceVersion=”{value other than 0}”
Most Recent Any Not older than

Most Recent: 从etcd中取数据
Any: 优先用最新的,但不保证一定是最新的
Not older than: 不低于某个版本号

v1.19开始,List方法开始支持resourceVersionMatch,如果你设置了resourceVersionMatch参数,将会决定apiserver如何解析resourceVersion
官方文档建议当在List请求中如果使用了resourceVersion应该带上resourceVersionMatch参数。当然也会处理好一些特殊情况apiserver会忽略resourceVersonMatch
除非你有很强的一致性要求,建议使用resourceVersionMatch=NotOlderThan以获得更好的性能。
如果你设置了resourceVersionMatch而没设置resourceVersion将会是无效的。

resourceVersionresourceVersionMatch的关系可以查看官方文档中的表格


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
// List 返回匹配labels和field的资源对象列表
// fieldSelector只支持官方指定的字段,不支持CRD中的自定义字段
// List 方法请求入口
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
// 分别设置label 和 field
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
e.Decorator(out)
}
return out, nil
}

// ListPredicate returns a list of all the items matching the given
// SelectionPredicate.
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
// 如果option == nil, ResourceVersion=""
if options == nil {
// By default we should serve the request from etcd.
options = &metainternalversion.ListOptions{ResourceVersion: ""}
}
p.Limit = options.Limit
p.Continue = options.Continue
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
// 设置ListOptions,SelectionPredicate中包含了labelSelector和fieldSelector
storageOpts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: p,
Recursive: true,
}

// if we're not already namespace-scoped, see if the field selector narrows the scope of the watch
if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 {
if selectorNamespace, ok := p.MatchesSingleNamespace(); ok {
if len(validation.ValidateNamespaceName(selectorNamespace, false)) == 0 {
ctx = genericapirequest.WithNamespace(ctx, selectorNamespace)
}
}
}

// 是否匹配单个资源
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
storageOpts.Recursive = false
// 这里指定了key,应该是单个资源的名称
err := e.Storage.GetList(ctx, key, storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
// 调用获取列表的接口
err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (s *DryRunnableStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.Storage.GetList(ctx, key, opts, listObj)
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
// 判断是否需要从etcd中读取数据
if shouldDelegateList(opts) {
return c.storage.GetList(ctx, key, opts, listObj)
}

listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
// resourceVersion=0 && apiserver缓存未建好
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
// 从etcd中读取数据
return c.storage.GetList(ctx, key, opts, listObj)
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
// 从etcd中获取resourceVersion
listRV, err = c.getCurrentResourceVersionFromStorage(ctx)
if err != nil {
return err
}
}

ctx, span := tracing.Start(ctx, "cacher list",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)

if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
span.AddEvent("Ready")


// List elements with at least 'listRV' from cache.
// 从缓存中读取数据

// 返回指向Items的指针
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
// 确保指针是有效的,并指针指向的值(reflect.Value)
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
// listVal的kind必须是slice
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
// 返回filter函数
filter := filterWithAttrsFunction(key, pred)

objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, key, pred, recursive)
if err != nil {
return err
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
// so we try to delay this action as much as possible
var selectedObjects []runtime.Object
for _, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
}
}
if len(selectedObjects) == 0 {
// Ensure that we never return a nil Items pointer in the result for consistency.
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
} else {
// Resize the slice appropriately, since we already know that size of result set
listVal.Set(reflect.MakeSlice(listVal.Type(), len(selectedObjects), len(selectedObjects)))
span.AddEvent("Resized result")
for i, o := range selectedObjects {
listVal.Index(i).Set(reflect.ValueOf(o).Elem())
}
}
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
}
}
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len())
return nil
}

// 判断是否需要从etcd中读取数据
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)

// Serve consistent reads from storage if ConsistentListFromCache is disabled
// 如果resourceVersion=""且ConsistentListFromCache=false,从etcd读取数据
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
// Watch cache doesn't support continuations, so serve them from etcd.
// 如果开启了分页且len(pred.Continue)>0,从etcd读取数据
hasContinuation := pagingEnabled && len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
// 如果开启了分页且pred.Limit>0 且resourceVersion!="0",从etcd中读取数据
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
// 如果match!=""且!=ResourceVersionMatchNotOlderThan,从etcd中读取数据
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan

return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
}

REF:

  1. http://arthurchiao.art/blog/k8s-reliability-list-data-zh/#33-%E6%8C%87%E5%AE%9A-specnodenamenode1resourceversion0-vs-specnodenamenode1
  2. https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter