k8s subresource status

什么是subresource status

statusk8s中常见的一种子资源。

  1. /status 子资源用于更新和获取主资源的状态部
  2. 通常由控制器或操作系统更新,以反映系统中对象的当前状态
  3. 通过将状态更新与主资源的其他修改隔离开来,可以防止用户意外覆盖状态信息

subresource status的作用与限制

spec-and-status

  1. 在Kubernetes API中,在使用PUT或POST动词对Kubernetes对象进行操作时,必须忽略对象的status字段。这是为了防止在读取-修改-写入(read-modify-write)的场景中意外地覆盖status字段。PUT方法不能更新很多理解,防止意外覆盖status,使用POST也不能写入status字段(如果status没有定义为subresource POST和PUT方法是可以写入的),应该是和status的作用有关,一般由控制器来更新,所以在POST方法中也不能设置status。
  2. 通过subresource这一概念可以将对象的spec与status分开,设置不同的访问权限。
  3. 必须提供单独的方法来对status进行修改

如何定义一个subresource

在结构体上加上//+kubebuilder:subresource:status就可以将status定义为subresource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +kubebuilder:subresource:status
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Foo is a specification for a Foo resource
type Foo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// FooList is a list of Foo resources
type FooList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`

Items []Foo `json:"items"`
}

crd server

k8scrd这类资源的创建走的接口是和原生资源不一样的,crd资源调用的接口是apis/<group>/<version/<cr-name>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   // vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// 创建crdHandler
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
// url为/apis或者以/apis/为前缀的请求都会调用crdhandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
crdHandler实现了自己的ServeHTTP方法
// vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

// 所有url为/apis或/apis/的请求都会进入这一方法
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("no RequestInfo found in the context")),
Codecs, schema.GroupVersion{}, w, req,
)
return
}
if !requestInfo.IsResourceRequest {
pathParts := splitPath(requestInfo.Path)
// only match /apis/<group>/<version>
// only registered under /apis
if len(pathParts) == 3 {
r.versionDiscoveryHandler.ServeHTTP(w, req)
return
}
// only match /apis/<group>
if len(pathParts) == 2 {
r.groupDiscoveryHandler.ServeHTTP(w, req)
return
}

r.delegate.ServeHTTP(w, req)
return
}

crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
if apierrors.IsNotFound(err) {
r.delegate.ServeHTTP(w, req)
return
}
if err != nil {
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}

// if the scope in the CRD and the scope in request differ (with exception of the verbs in possiblyAcrossAllNamespacesVerbs
// for namespaced resources), pass request to the delegate, which is supposed to lead to a 404.
namespacedCRD, namespacedReq := crd.Spec.Scope == apiextensionsv1.NamespaceScoped, len(requestInfo.Namespace) > 0
if !namespacedCRD && namespacedReq {
r.delegate.ServeHTTP(w, req)
return
}
if namespacedCRD && !namespacedReq && !possiblyAcrossAllNamespacesVerbs.Has(requestInfo.Verb) {
r.delegate.ServeHTTP(w, req)
return
}

if !apiextensionshelpers.HasServedCRDVersion(crd, requestInfo.APIVersion) {
r.delegate.ServeHTTP(w, req)
return
}

// There is a small chance that a CRD is being served because NamesAccepted condition is true,
// but it becomes "unserved" because another names update leads to a conflict
// and EstablishingController wasn't fast enough to put the CRD into the Established condition.
// We accept this as the problem is small and self-healing.
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.NamesAccepted) &&
!apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
r.delegate.ServeHTTP(w, req)
return
}

terminating := apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating)

// 获取crd信息,如果已经存在于系统中则直接返回,否则会进行一些设置加载到系统中
crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
if apierrors.IsNotFound(err) {
r.delegate.ServeHTTP(w, req)
return
}
if err != nil {
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
if !hasServedCRDVersion(crdInfo.spec, requestInfo.APIVersion) {
r.delegate.ServeHTTP(w, req)
return
}

deprecated := crdInfo.deprecated[requestInfo.APIVersion]
for _, w := range crdInfo.warnings[requestInfo.APIVersion] {
warning.AddWarning(req.Context(), "", w)
}

verb := strings.ToUpper(requestInfo.Verb)
resource := requestInfo.Resource
subresource := requestInfo.Subresource
scope := metrics.CleanScope(requestInfo)
supportedTypes := []string{
string(types.JSONPatchType),
string(types.MergePatchType),
string(types.ApplyPatchType),
}

var handlerFunc http.HandlerFunc
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
if err != nil {
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("could not properly serve the subresource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
switch {
case subresource == "status" && subresources != nil && subresources.Status != nil:
// 如果是对status的操作,走serveStatus
handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case subresource == "scale" && subresources != nil && subresources.Scale != nil:
handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case len(subresource) == 0:
// 创建crd时走的是serveResource
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, crd, terminating, supportedTypes)
default:
responsewriters.ErrorNegotiated(
apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
}

if handlerFunc != nil {
handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, deprecated, "", handlerFunc)
handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
handler.ServeHTTP(w, req)
return
}
}

func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crdInfo, error) {
storageMap := r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[uid]; ok {
return ret, nil
}

r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()

// Get the up-to-date CRD when we have the lock, to avoid racing with updateCustomResourceDefinition.
// If updateCustomResourceDefinition sees an update and happens later, the storage will be deleted and
// we will re-create the updated storage on demand. If updateCustomResourceDefinition happens before,
// we make sure that we observe the same up-to-date CRD.
crd, err := r.crdLister.Get(name)
if err != nil {
return nil, err
}
storageMap = r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[crd.UID]; ok {
return ret, nil
}

storageVersion, err := apiextensionshelpers.GetCRDStorageVersion(crd)
if err != nil {
return nil, err
}

...

var statusSpec *apiextensionsinternal.CustomResourceSubresourceStatus
var statusValidator apiservervalidation.SchemaValidator
// 获取subresources
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR subresources")
}
if subresources != nil && subresources.Status != nil {
equivalentResourceRegistry.RegisterKindFor(resource, "status", kind)
// 如果status是subresource则设置statusSpec
statusSpec = &apiextensionsinternal.CustomResourceSubresourceStatus{}
if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceStatus_To_apiextensions_CustomResourceSubresourceStatus(subresources.Status, statusSpec, nil); err != nil {
return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
}
// for the status subresource, validate only against the status schema
if internalValidationSchema != nil && internalValidationSchema.OpenAPIV3Schema != nil && internalValidationSchema.OpenAPIV3Schema.Properties != nil {
if statusSchema, ok := internalValidationSchema.OpenAPIV3Schema.Properties["status"]; ok {
statusValidator, _, err = apiservervalidation.NewSchemaValidator(&statusSchema)
if err != nil {
return nil, err
}
}
}


var scaleSpec *apiextensionsinternal.CustomResourceSubresourceScale
if subresources != nil && subresources.Scale != nil {
equivalentResourceRegistry.RegisterKindFor(resource, "scale", autoscalingv1.SchemeGroupVersion.WithKind("Scale"))
scaleSpec = &apiextensionsinternal.CustomResourceSubresourceScale{}
if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceScale_To_apiextensions_CustomResourceSubresourceScale(subresources.Scale, scaleSpec, nil); err != nil {
return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
}
}

columns, err := getColumnsForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR columns")
}
table, err := tableconvertor.New(columns)
if err != nil {
klog.V(2).Infof("The CRD for %v has an invalid printer specification, falling back to default printing: %v", kind, err)
}

storages[v.Name] = customresource.NewStorage(
resource.GroupResource(),
singularResource.GroupResource(),
kind,
schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.ListKind},
// 创建strategy, 不同的crd对应的不同的crd
customresource.NewStrategy(
typer,
crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
kind,
validator,
statusValidator,
structuralSchemas,
statusSpec,
scaleSpec,
),
crdConversionRESTOptionsGetter{
RESTOptionsGetter: r.restOptionsGetter,
converter: safeConverter,
decoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
encoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: storageVersion},
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: crd.Spec.PreserveUnknownFields,
},
crd.Status.AcceptedNames.Categories,
table,
replicasPathInCustomResource,
)

clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped

// CRDs explicitly do not support protobuf, but some objects returned by the API server do
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
converter: safeConverter,
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: crd.Spec.PreserveUnknownFields,
}
var standardSerializers []runtime.SerializerInfo
for _, s := range negotiatedSerializer.SupportedMediaTypes() {
if s.MediaType == runtime.ContentTypeProtobuf {
continue
}
standardSerializers = append(standardSerializers, s)
}

reqScope := handlers.RequestScope{
Namer: handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
ClusterScoped: clusterScoped,
},
Serializer: negotiatedSerializer,
ParameterCodec: parameterCodec,
StandardSerializers: standardSerializers,

Creater: creator,
Convertor: safeConverter,
Defaulter: unstructuredDefaulter{parameterScheme, structuralSchemas, kind.GroupKind()},
Typer: typer,
UnsafeConvertor: unsafeConverter,

EquivalentResourceMapper: equivalentResourceRegistry,

Resource: schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural},
Kind: kind,

// a handler for a specific group-version of a custom resource uses that version as the in-memory representation
HubGroupVersion: kind.GroupVersion(),

MetaGroupVersion: metav1.SchemeGroupVersion,

TableConvertor: storages[v.Name].CustomResource,

Authorizer: r.authorizer,

MaxRequestBodyBytes: r.maxRequestBodyBytes,
}

resetFields := storages[v.Name].CustomResource.GetResetFields()
reqScope, err = scopeWithFieldManager(
typeConverter,
reqScope,
resetFields,
"",
)
if err != nil {
return nil, err
}
requestScopes[v.Name] = &reqScope

scaleColumns, err := getScaleColumnsForVersion(crd, v.Name)
if err != nil {
return nil, fmt.Errorf("the server could not properly serve the CR scale subresource columns %w", err)
}
scaleTable, _ := tableconvertor.New(scaleColumns)

// override scale subresource values
// shallow copy
scaleScope := *requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
ClusterScoped: clusterScoped,
}
scaleScope.TableConvertor = scaleTable

if subresources != nil && subresources.Scale != nil {
scaleScope, err = scopeWithFieldManager(
typeConverter,
scaleScope,
nil,
"scale",
)
if err != nil {
return nil, err
}
}

scaleScopes[v.Name] = &scaleScope

// override status subresource values
// shallow copy
statusScope := *requestScopes[v.Name]
statusScope.Subresource = "status"
statusScope.Namer = handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
ClusterScoped: clusterScoped,
}

if subresources != nil && subresources.Status != nil {
resetFields := storages[v.Name].Status.GetResetFields()
statusScope, err = scopeWithFieldManager(
typeConverter,
statusScope,
resetFields,
"status",
)
if err != nil {
return nil, err
}
}

statusScopes[v.Name] = &statusScope

if v.Deprecated {
deprecated[v.Name] = true
if v.DeprecationWarning != nil {
warnings[v.Name] = append(warnings[v.Name], *v.DeprecationWarning)
} else {
warnings[v.Name] = append(warnings[v.Name], defaultDeprecationWarning(v.Name, crd.Spec))
}
}
}

ret := &crdInfo{
spec: &crd.Spec,
acceptedNames: &crd.Status.AcceptedNames,
storages: storages,
requestScopes: requestScopes,
scaleRequestScopes: scaleScopes,
statusRequestScopes: statusScopes,
deprecated: deprecated,
warnings: warnings,
storageVersion: storageVersion,
waitGroup: &utilwaitgroup.SafeWaitGroup{},
}

// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()

storageMap2[crd.UID] = ret
r.customStorage.Store(storageMap2)

return ret, nil
}


func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, crd *apiextensionsv1.CustomResourceDefinition, terminating bool, supportedTypes []string) http.HandlerFunc {
requestScope := crdInfo.requestScopes[requestInfo.APIVersion]
storage := crdInfo.storages[requestInfo.APIVersion].CustomResource

switch requestInfo.Verb {
case "get":
return handlers.GetResource(storage, requestScope)
case "list":
forceWatch := false
return handlers.ListResource(storage, storage, requestScope, forceWatch, r.minRequestTimeout)
case "watch":
forceWatch := true
return handlers.ListResource(storage, storage, requestScope, forceWatch, r.minRequestTimeout)
case "create":
// we want to track recently created CRDs so that in HA environments we don't have server A allow a create and server B
// not have observed the established, so a followup get,update,delete results in a 404. We've observed about 800ms
// delay in some CI environments. Two seconds looks long enough and reasonably short for hot retriers.
justCreated := time.Since(apiextensionshelpers.FindCRDCondition(crd, apiextensionsv1.Established).LastTransitionTime.Time) < 2*time.Second
if justCreated {
time.Sleep(2 * time.Second)
}
if terminating {
err := apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb)
err.ErrStatus.Message = fmt.Sprintf("%v not allowed while custom resource definition is terminating", requestInfo.Verb)
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return nil
}
return handlers.CreateResource(storage, requestScope, r.admission)
case "update":
return handlers.UpdateResource(storage, requestScope, r.admission)
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
case "delete":
allowsOptions := true
return handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
case "deletecollection":
checkBody := true
return handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return nil
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// CreateResource returns a function that will handle a resource creation.
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
// 最终走的还是Store实现的Create,这个方法是往etcd中写入数据
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
var finishCreate FinishFunc = finishNothing

// Init metadata as early as possible.
if objectMeta, err := meta.Accessor(obj); err != nil {
return nil, err
} else {
rest.FillObjectMetaSystemFields(objectMeta)
if len(objectMeta.GetGenerateName()) > 0 && len(objectMeta.GetName()) == 0 {
objectMeta.SetName(e.CreateStrategy.GenerateName(objectMeta.GetGenerateName()))
}
}

if e.BeginCreate != nil {
fn, err := e.BeginCreate(ctx, obj, options)
if err != nil {
return nil, err
}
finishCreate = fn
defer func() {
finishCreate(ctx, false)
}()
}

if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}

name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
out := e.NewFunc()
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, obj)
if !apierrors.IsAlreadyExists(err) {
return nil, err
}
if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
return nil, err
}
accessor, errGetAcc := meta.Accessor(out)
if errGetAcc != nil {
return nil, err
}
if accessor.GetDeletionTimestamp() != nil {
msg := &err.(*apierrors.StatusError).ErrStatus.Message
*msg = fmt.Sprintf("object is being deleted: %s", *msg)
}
return nil, err
}
// The operation has succeeded. Call the finish function if there is one,
// and then make sure the defer doesn't call it again.
fn := finishCreate
finishCreate = finishNothing
fn(ctx, true)

if e.AfterCreate != nil {
e.AfterCreate(out, options)
}
if e.Decorator != nil {
e.Decorator(out)
}
return out, nil
}

subresource status是怎么忽略的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在上面的`Create`方法中会调用, `rest.BeforeCreat`这个方法会执行`customResource`的策略, 最终会调用`func (a customResourceStrategy) PrepareForCreate`

// staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresource/strategy.go

// PrepareForCreate clears the status of a CustomResource before creation.
func (a customResourceStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
// 如果subresource status != nil, 则会删除status,所以在创建方法中的status会被忽略
if a.status != nil {
customResourceObject := obj.(*unstructured.Unstructured)
customResource := customResourceObject.UnstructuredContent()

// create cannot set status
delete(customResource, "status")
}

accessor, _ := meta.Accessor(obj)
accessor.SetGeneration(1)
}

status更新

如果是subresource,code-gen生成代码会为status生成UpdateStatus方法,可以使用这个方法对status进行更新操作。
对status的更新也会增加对象的ResourceVersion,并产生更新事件。

kubectl edit修改status

如果你使用kubectl edit命令对status进行修改发现根本不了。使用kubectl edit修改crd调用的接口是apis/<group>/<version/<cr-name>,http方法是PATCH。假如把status中的availableReplicas更新为2,则
patchType ="application/merge-patch+json"
patchBytes = {"status":{"availableReplicas":2}}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
...
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}
...
}

// vendor/k8s.io/apiserver/pkg/registry/rest/update.go
func BeforeUpdate(strategy RESTUpdateStrategy, ctx context.Context, obj, old runtime.Object) error {
objectMeta, kind, kerr := objectMetaAndKind(strategy, obj)
if kerr != nil {
return kerr
}

// ensure namespace on the object is correct, or error if a conflicting namespace was set in the object
requestNamespace, ok := genericapirequest.NamespaceFrom(ctx)
if !ok {
return errors.NewInternalError(fmt.Errorf("no namespace information found in request context"))
}
objectMeta.SetGeneration(oldMeta.GetGeneration())

...
strategy.PrepareForUpdate(ctx, obj, old)
...
}

// staging/src/k8s.io/apiextensions-apiserver/pkg/registry/customresource/strategy.go
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
func (a customResourceStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
newCustomResourceObject := obj.(*unstructured.Unstructured)
oldCustomResourceObject := old.(*unstructured.Unstructured)

newCustomResource := newCustomResourceObject.UnstructuredContent()
oldCustomResource := oldCustomResourceObject.UnstructuredContent()

// If the /status subresource endpoint is installed, update is not allowed to set status.
// a.status!=nil表明status是subresource,会使用旧的status覆盖新的status
if a.status != nil {
_, ok1 := newCustomResource["status"]
_, ok2 := oldCustomResource["status"]
switch {
case ok2:
newCustomResource["status"] = oldCustomResource["status"]
case ok1:
delete(newCustomResource, "status")
}
}

// except for the changes to `metadata`, any other changes
// cause the generation to increment.
newCopyContent := copyNonMetadata(newCustomResource)
oldCopyContent := copyNonMetadata(oldCustomResource)
if !apiequality.Semantic.DeepEqual(newCopyContent, oldCopyContent) {
oldAccessor, _ := meta.Accessor(oldCustomResourceObject)
newAccessor, _ := meta.Accessor(newCustomResourceObject)
newAccessor.SetGeneration(oldAccessor.GetGeneration() + 1)
}
}

updateStatus

PUT /apis/<group>/<version>/<resources>/<name>/status, verb=update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
...
switch {
case subresource == "status" && subresources != nil && subresources.Status != nil:
// 进入r.serveStatus
handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case subresource == "scale" && subresources != nil && subresources.Scale != nil:
handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case len(subresource) == 0:
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, crd, terminating, supportedTypes)
default:
responsewriters.ErrorNegotiated(
apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
}
...
}

func (r *crdHandler) serveStatus(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
requestScope := crdInfo.statusRequestScopes[requestInfo.APIVersion]
storage := crdInfo.storages[requestInfo.APIVersion].Status

switch requestInfo.Verb {
case "get":
return handlers.GetResource(storage, requestScope)
case "update":
return handlers.UpdateResource(storage, requestScope, r.admission)
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return nil
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
...
// 这里执行的策略是status_starategy
// 没有丢弃status,所以能正常更新
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}
...
}


func (a statusStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
// update is only allowed to set status
newCustomResourceObject := obj.(*unstructured.Unstructured)
newCustomResource := newCustomResourceObject.UnstructuredContent()
status, ok := newCustomResource["status"]

// managedFields must be preserved since it's been modified to
// track changed fields in the status update.
managedFields := newCustomResourceObject.GetManagedFields()

// copy old object into new object
oldCustomResourceObject := old.(*unstructured.Unstructured)
// overridding the resourceVersion in metadata is safe here, we have already checked that
// new object and old object have the same resourceVersion.
*newCustomResourceObject = *oldCustomResourceObject.DeepCopy()

// set status
newCustomResourceObject.SetManagedFields(managedFields)
newCustomResource = newCustomResourceObject.UnstructuredContent()
if ok {
newCustomResource["status"] = status
} else {
delete(newCustomResource, "status")
}
}

  1. k8s-spec-and-status
  2. https://github.com/kubernetes/sample-controller/blob/master/pkg/apis/samplecontroller/v1alpha1/types.go