kube-controller之resourceclaim

动态资源分配

动态资源分配是一个用于在 Pod 之间和 Pod 内部容器之间请求和共享资源的新 API。 它是对为通用资源所提供的持久卷 API 的泛化。第三方资源驱动程序负责跟踪和分配资源。 不同类型的资源支持用任意参数进行定义和初始化.

resource.k8s.io/v1alpha2 API 组提供四种新类型:

ResourceClass
定义由哪个资源驱动程序处理某种资源,并为其提供通用参数。 集群管理员在安装资源驱动程序时创建 ResourceClass。
ResourceClaim
定义工作负载所需的特定资源实例。 由用户创建(手动管理生命周期,可以在不同的 Pod 之间共享), 或者由控制平面基于 ResourceClaimTemplate 为特定 Pod 创建 (自动管理生命周期,通常仅由一个 Pod 使用)。
ResourceClaimTemplate
定义用于创建 ResourceClaim 的 spec 和一些元数据。 部署工作负载时由用户创建。
PodSchedulingContext
供控制平面和资源驱动程序内部使用, 在需要为 Pod 分配 ResourceClaim 时协调 Pod 调度。
ResourceClass 和 ResourceClaim 的参数存储在单独的对象中, 通常使用安装资源驱动程序时创建的 CRD 所定义的类型。

core/v1 的 PodSpec 在新的 resourceClaims 字段中定义 Pod 所需的 ResourceClaim。 该列表中的条目引用 ResourceClaim 或 ResourceClaimTemplate。 当引用 ResourceClaim 时,使用此 PodSpec 的所有 Pod (例如 Deployment 或 StatefulSet 中的 Pod)共享相同的 ResourceClaim 实例。 引用 ResourceClaimTemplate 时,每个 Pod 都有自己的实例。

容器资源的 resources.claims 列表定义容器可以访问的资源实例, 从而可以实现在一个或多个容器之间共享资源。

下面是一个虚构的资源驱动程序的示例。 该示例将为此 Pod 创建两个 ResourceClaim 对象,每个容器都可以访问其中一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
piVersion: resource.k8s.io/v1alpha2
kind: ResourceClass
name: resource.example.com
driverName: resource-driver.example.com
---
apiVersion: cats.resource.example.com/v1
kind: ClaimParameters
name: large-black-cat-claim-parameters
spec:
color: black
size: large
---
apiVersion: resource.k8s.io/v1alpha2
kind: ResourceClaimTemplate
metadata:
name: large-black-cat-claim-template
spec:
spec:
resourceClassName: resource.example.com
parametersRef:
apiGroup: cats.resource.example.com
kind: ClaimParameters
name: large-black-cat-claim-parameters
–--
apiVersion: v1
kind: Pod
metadata:
name: pod-with-cats
spec:
containers:
- name: container0
image: ubuntu:20.04
command: ["sleep", "9999"]
resources:
claims:
- name: cat-0
- name: container1
image: ubuntu:20.04
command: ["sleep", "9999"]
resources:
claims:
- name: cat-1
resourceClaims:
- name: cat-0
source:
resourceClaimTemplateName: large-black-cat-claim-template
- name: cat-1
source:
resourceClaimTemplateName: large-black-cat-claim-template
resourceClaim

resourceClaim Controller的作用就是根据pod spec中的ResourceClaimTemplates创建对应的ResourceClaims

resourceClaim源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/// pkg/controller/resourceclaim/controller.go
type Controller struct {
// kubeClient is the kube API client used to communicate with the API
// server.
kubeClient clientset.Interface

// claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim
// objects from the API server. It is shared with other controllers and
// therefore the ResourceClaim objects in its store should be treated as immutable.
claimLister resourcev1alpha2listers.ResourceClaimLister
claimsSynced cache.InformerSynced

// podLister is the shared Pod lister used to fetch Pod
// objects from the API server. It is shared with other controllers and
// therefore the Pod objects in its store should be treated as immutable.
podLister v1listers.PodLister
podSynced cache.InformerSynced

// templateLister is the shared ResourceClaimTemplate lister used to
// fetch template objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
// as immutable.
templateLister resourcev1alpha2listers.ResourceClaimTemplateLister
templatesSynced cache.InformerSynced

// podIndexer has the common PodResourceClaim indexer indexer installed To
// limit iteration over pods to those of interest.
podIndexer cache.Indexer

// recorder is used to record events in the API server
recorder record.EventRecorder

queue workqueue.RateLimitingInterface

// The deletedObjects cache keeps track of Pods for which we know that
// they have existed and have been removed. For those we can be sure
// that a ReservedFor entry needs to be removed.
deletedObjects *uidCache
}

// 创建resourceClaim Controller
// resourceClaim为pod和resourceClaim添加事件监听
func NewController(
kubeClient clientset.Interface,
podInformer v1informers.PodInformer,
claimInformer resourcev1alpha2informers.ResourceClaimInformer,
templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) {

ec := &Controller{
kubeClient: kubeClient,
podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimsSynced: claimInformer.Informer().HasSynced,
templateLister: templateInformer.Lister(),
templatesSynced: templateInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
deletedObjects: newUIDCache(maxUIDCacheEntries),
}

metrics.RegisterMetrics()

if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ec.enqueuePod(obj, false)
},
UpdateFunc: func(old, updated interface{}) {
ec.enqueuePod(updated, false)
},
DeleteFunc: func(obj interface{}) {
ec.enqueuePod(obj, true)
},
}); err != nil {
return nil, err
}
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onResourceClaimAddOrUpdate,
UpdateFunc: func(old, updated interface{}) {
ec.onResourceClaimAddOrUpdate(updated)
},
DeleteFunc: ec.onResourceClaimDelete,
}); err != nil {
return nil, err
}
if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
}

return ec, nil
}


func (ec *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer ec.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting ephemeral volume controller")
defer logger.Info("Shutting down ephemeral volume controller")

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")})
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
defer eventBroadcaster.Shutdown()

if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
}

<-ctx.Done()
}

func (ec *Controller) runWorker(ctx context.Context) {
for ec.processNextWorkItem(ctx) {
}
}

func (ec *Controller) processNextWorkItem(ctx context.Context) bool {
key, shutdown := ec.queue.Get()
if shutdown {
return false
}
defer ec.queue.Done(key)

err := ec.syncHandler(ctx, key.(string))
if err == nil {
ec.queue.Forget(key)
return true
}

runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
ec.queue.AddRateLimited(key)

return true
}

// syncHandler is invoked for each work item which might need to be processed.
// If an error is returned from this function, the item will be requeued.
func (ec *Controller) syncHandler(ctx context.Context, key string) error {
sep := strings.Index(key, ":")
if sep < 0 {
return fmt.Errorf("unexpected key: %s", key)
}
prefix, object := key[0:sep+1], key[sep+1:]
namespace, name, err := cache.SplitMetaNamespaceKey(object)
if err != nil {
return err
}
// 通过prefix判断是pod事件还是resourceClaim事件,从而调用不同的处理函数
switch prefix {
case podKeyPrefix:
return ec.syncPod(ctx, namespace, name)
case claimKeyPrefix:
return ec.syncClaim(ctx, namespace, name)
default:
return fmt.Errorf("unexpected key prefix: %s", prefix)
}

}

func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
pod, err := ec.podLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
logger.V(5).Info("nothing to do for pod, it is gone")
return nil
}
return err
}

// Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil {
logger.V(5).Info("nothing to do for pod, it is marked for deletion")
return nil
}

for _, podClaim := range pod.Spec.ResourceClaims {
if err := ec.handleClaim(ctx, pod, podClaim); err != nil {
if ec.recorder != nil {
ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
}
return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
}
}

return nil
}

func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
logger.V(5).Info("nothing to do for claim, it is gone")
return nil
}
return err
}

// Check if the ReservedFor entries are all still valid.
valid := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
for _, reservedFor := range claim.Status.ReservedFor {
if reservedFor.APIGroup == "" &&
reservedFor.Resource == "pods" {
// A pod falls into one of three categories:
// - we have it in our cache -> don't remove it until we are told that it got removed
// - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it
// - not in our cache, not seen -> double-check with API server before removal

keepEntry := true

// Tracking deleted pods in the LRU cache is an
// optimization. Without this cache, the code would
// have to do the API call below for every deleted pod
// to ensure that the pod really doesn't exist. With
// the cache, most of the time the pod will be recorded
// as deleted and the API call can be avoided.
if ec.deletedObjects.Has(reservedFor.UID) {
// We know that the pod was deleted. This is
// easy to check and thus is done first.
keepEntry = false
} else {
pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name)
if err != nil && !errors.IsNotFound(err) {
return err
}
if pod == nil {
// We might not have it in our informer cache
// yet. Removing the pod while the scheduler is
// scheduling it would be bad. We have to be
// absolutely sure and thus have to check with
// the API server.
pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
if pod == nil || pod.UID != reservedFor.UID {
keepEntry = false
}
} else if pod.UID != reservedFor.UID {
// Pod exists, but is a different incarnation under the same name.
keepEntry = false
}
}

if keepEntry {
valid = append(valid, reservedFor)
}
continue
}

// TODO: support generic object lookup
return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor)
}

if len(valid) < len(claim.Status.ReservedFor) {
// TODO (#113700): patch
claim := claim.DeepCopy()
claim.Status.ReservedFor = valid
_, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return err
}
}

return nil
}

REF:
1.https://kubernetes.io/zh-cn/docs/concepts/scheduling-eviction/dynamic-resource-allocation/
2.pkg/controller/resourceclaim/controller.go