k8s源码设计模式之Chan of Responsibility

Chan of Responsibility我们常说的责任链模式
k8s中的实现

kube-apiserver中的admission控制器通过责任链模式实现。每个admission控制器都实现了一个AdmissionHandler接口,该接口包含一个名为Admit的方法,用于验证请求是否符合规则。多个admission控制器按照注册顺序形成一个责任链。当请求进来时,apiserver会遍历这个责任链,逐一调用每个admission控制器的Admit方法,只有当所有控制器都通过验证后,请求才会被通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// vendor/k8s.io/apiserver/pkg/admission/chain.go
// 下面定义了两种性质的webhook,修改性质在验证性质的之前被调用
// chainAdmissionHandler is an instance of admission.NamedHandler that performs admission control using
// a chain of admission handlers
type chainAdmissionHandler []Interface

// NewChainHandler creates a new chain handler from an array of handlers. Used for testing.
func NewChainHandler(handlers ...Interface) chainAdmissionHandler {
return chainAdmissionHandler(handlers)
}

// Admit performs an admission control check using a chain of handlers, and returns immediately on first error
// 修改性质的webhook,此类handler实现了MutationInterface
func (admissionHandler chainAdmissionHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if mutator, ok := handler.(MutationInterface); ok {
err := mutator.Admit(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}

// Validate performs an admission control check using a chain of handlers, and returns immediately on first error
// 验证性质的webhook, 此类handler实现了ValidationInterface
func (admissionHandler chainAdmissionHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if validator, ok := handler.(ValidationInterface); ok {
err := validator.Validate(ctx, a, o)
if err != nil {
return err
}
}
}
return nil
}

// Handles will return true if any of the handlers handles the given operation
func (admissionHandler chainAdmissionHandler) Handles(operation Operation) bool {
for _, handler := range admissionHandler {
if handler.Handles(operation) {
return true
}
}
return false
}
admission Interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// vendor/k8s.io/apiserver/pkg/admission/interfaces.go
// Interface is an abstract, pluggable interface for Admission Control decisions.
type Interface interface {
// Handles returns true if this admission controller can handle the given operation
// where operation can be one of CREATE, UPDATE, DELETE, or CONNECT
Handles(operation Operation) bool
}

type MutationInterface interface {
Interface

// Admit makes an admission decision based on the request attributes.
// Context is used only for timeout/deadline/cancellation and tracing information.
Admit(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}

// ValidationInterface is an abstract, pluggable interface for Admission Control decisions.
type ValidationInterface interface {
Interface

// Validate makes an admission decision based on the request attributes. It is NOT allowed to mutate
// Context is used only for timeout/deadline/cancellation and tracing information.
Validate(ctx context.Context, a Attributes, o ObjectInterfaces) (err error)
}
调用时机
Matation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
// 把有Post请求都会经过CreateHandler方法,这里使用了装饰器模式返回http.HandlerFunc
// 代码过长删除了部分逻辑
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
...

result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {
return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
}
obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
}
// 执行admission handler
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) {
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
accessor.SetManagedFields(nil)
result, err = requestFunc()
}
}
return result, err
})
...
// 调用Store中的Create方法,这里的Create方法是对etcd的一层封装
// 可以看到AdmissionToValidateObjectFunc
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
...
result, err := requestFunc()

}
}
Validate
1
2
3
4
5
6
7
8
9
10
11
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
...
// 这里将会执行validate
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}
...
}

REF:
1.staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
2.staging/src/k8s.io/apiserver/pkg/admission/chain.go
3.staging/src/k8s.io/apiserver/pkg/admission/interfaces.go
4.staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go