k8s源码设计模式之Visitor

Visitor模式也叫访问模式

1
2
3
4
5
6
// vendor/k8s.io/cli-runtime/pkg/resource/interfaces.go
type Visitor interface {
Visit(VisitorFunc) error
}

type VisitorFunc func(*Info, error) error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// vendor/k8s.io/cli-runtime/pkg/resource/visitor.go
type Info struct {
Client RESTClient

Mapping *meta.RESTMapping

Namespace string
Name string

Source string

Object runtime.Object
ResourceVersion string

Subresource string
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 从io.Reader读取数据
type StreamVisitor struct {
io.Reader
*mapper

Source string
Schema ContentValidator
}

// 不同的Visitor处理不同的对象,比如URLVisitor处理URL,FileVisitor处理文件
// 从URL中下载文件
type URLVisitor struct {
URL *url.URL
*StreamVisitor
HttpAttemptCount int
}

// FileVisitor is wrapping around a StreamVisitor, to handle open/close files
type FileVisitor struct {
Path string
*StreamVisitor
}

// Visit函数在一个流中实现了Visitor接口。StreamVisitor可以在一个流中区分多个资源
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
ext := runtime.RawExtension{}
if err := d.Decode(&ext); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error parsing %s: %v", v.Source, err)
}
// TODO: This needs to be able to handle object in other encodings and schemas.
ext.Raw = bytes.TrimSpace(ext.Raw)
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
continue
}
if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Source, err)
}
info, err := v.infoForData(ext.Raw, v.Source)
if err != nil {
if fnErr := fn(info, err); fnErr != nil {
return fnErr
}
continue
}
if err := fn(info, nil); err != nil {
return err
}
}
}




// Visit in a FileVisitor is just taking care of opening/closing files
func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
f, err = os.Open(v.Path)
if err != nil {
return err
}
defer f.Close()
}

// TODO: Consider adding a flag to force to UTF16, apparently some
// Windows tools don't write the BOM
utf16bom := unicode.BOMOverride(unicode.UTF8.NewDecoder())
v.StreamVisitor.Reader = transform.NewReader(f, utf16bom)

return v.StreamVisitor.Visit(fn)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 构造r.visitor
// 这里有两层嵌套,ContinueOnErrorVisitor,NewDecoratedVisitor
func (b *Builder) Do() *Result {
r := b.visitorResult()
r.mapper = b.Mapper()
if r.err != nil {
return r
}
if b.flatten {
r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
}
helpers := []VisitorFunc{}
if b.defaultNamespace {
helpers = append(helpers, SetNamespace(b.namespace))
}
if b.requireNamespace {
helpers = append(helpers, RequireNamespace(b.namespace))
}
helpers = append(helpers, FilterNamespace)
if b.requireObject {
helpers = append(helpers, RetrieveLazy)
}
if b.continueOnError {
r.visitor = ContinueOnErrorVisitor{Visitor: r.visitor}
}
r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
return r
}
调用过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {

// 这里会构造一个嵌套的visitor
r := f.NewBuilder().
Unstructured().
Schema(schema).
ContinueOnError().
NamespaceParam(cmdNamespace).DefaultNamespace().
FilenameParam(enforceNamespace, &o.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
err = r.Err()
if err != nil {
return err
}

count := 0
// 调用Result的Vistor
// 会调用不同的Visitor并填充info结构体 
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
if err := util.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, scheme.DefaultJSONEncoder()); err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}

if err := o.Recorder.Record(info.Object); err != nil {
klog.V(4).Infof("error recording current command: %v", err)
}

if o.DryRunStrategy != cmdutil.DryRunClient {
// 构造请求通过RESTClient向apiserver发送一个Post请求
obj, err := resource.
NewHelper(info.Client, info.Mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
WithFieldValidation(o.ValidationDirective).
Create(info.Namespace, true, info.Object)
if err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}
info.Refresh(obj, true)
}

count++

return o.PrintObj(info.Object)
})
if err != nil {
return err
}
if count == 0 {
return fmt.Errorf("no objects passed to create")
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// 下面是一层一层Visit方法的调用
// DecoratedVisitor.Visit
// ContinueOnErrorVisitor.Visit
// FlattenListVisitor.Visit
// FlattenListVisitor.Visit
// EagerVisitorList.Visit
// FileVisitor.Visit
// StreamVisitor.Visit
func (r *Result) Visit(fn VisitorFunc) error {
if r.err != nil {
return r.err
}
err := r.visitor.Visit(fn)
return utilerrors.FilterOut(err, r.ignoreErrors...)
}

// DecoratedVistor,这里使用的是装饰器模式
// golang不像python那样提供语法糖,所以只能通过这种方式实现
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
r := fn(info, nil)
return r
})
}

func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
var errs []error
err := v.Visitor.Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
})
if err != nil {
errs = append(errs, err)
}
if len(errs) == 1 {
return errs[0]
}
return utilerrors.NewAggregate(errs)
}

func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
if info.Object == nil {
return fn(info, nil)
}
if !meta.IsListType(info.Object) {
return fn(info, nil)
}

items := []runtime.Object{}
itemsToProcess := []runtime.Object{info.Object}

for i := 0; i < len(itemsToProcess); i++ {
currObj := itemsToProcess[i]
if !meta.IsListType(currObj) {
items = append(items, currObj)
continue
}

currItems, err := meta.ExtractList(currObj)
if err != nil {
return err
}
if errs := runtime.DecodeList(currItems, v.mapper.decoder); len(errs) > 0 {
return utilerrors.NewAggregate(errs)
}
itemsToProcess = append(itemsToProcess, currItems...)
}

// If we have a GroupVersionKind on the list, prioritize that when asking for info on the objects contained in the list
var preferredGVKs []schema.GroupVersionKind
if info.Mapping != nil && !info.Mapping.GroupVersionKind.Empty() {
preferredGVKs = append(preferredGVKs, info.Mapping.GroupVersionKind)
}
var errs []error
for i := range items {
item, err := v.mapper.infoForObject(items[i], v.typer, preferredGVKs)
if err != nil {
errs = append(errs, err)
continue
}
if len(info.ResourceVerEagerVisitorListsion) != 0 {
item.ResourceVersion = info.ResourceVersion
}
// propagate list source to items source
if len(info.Source) != 0 {
item.Source = info.Source
}
if err := fn(item, nil); err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
})
}

func (l EagerVisitorList) Visit(fn VisitorFunc) error {
var errs []error
for i := range l {
err := l[i].Visit(func(info *Info, err error) error {
if err != nil {
errs = append(errs, err)
return nil
}
if err := fn(info, nil); err != nil {
errs = append(errs, err)
}
return nil
})
if err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}


func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
f, err = os.Open(v.Path)
if err != nil {
return err
}
defer f.Close()
}

utf16bom := unicode.BOMOverride(unicode.UTF8.NewDecoder())
v.StreamVisitor.Reader = transform.NewReader(f, utf16bom)

return v.StreamVisitor.Visit(fn)
}

func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
ext := runtime.RawExtension{}
if err := d.Decode(&ext); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error parsing %s: %v", v.Source, err)
}
// TODO: This needs to be able to handle object in other encodings and schemas.
ext.Raw = bytes.TrimSpace(ext.Raw)
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
continue
}
if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Source, err)
}
info, err := v.infoForData(ext.Raw, v.Source)
if err != nil {
if fnErr := fn(info, err); fnErr != nil {
return fnErr
}
continue
}
if err := fn(info, nil); err != nil {
return err
}
}
}

REF:
1.https://coolshell.cn/articles/21263.html
2.staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
3.staging/src/k8s.io/cli-runtime/pkg/resource/visitor.go
4.staging/src/k8s.io/kubectl/pkg/cmd/create/create.go