k8s设计模式之Command

命令模式也叫动作,事务,Action,Transaction,Command

命令模式是一种行为设计模式, 它可将请求转换为一个包含与请求相关的所有信息的独立对象。 该转换让你能根据不同的请求将方法参数化、 延迟请求执行或将其放入队列中, 且能实现可撤销操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pkg/kubelet/config/config.go
// 处理Pod配置的相关操作
type PodConfig struct {
// 存储当前kubelet所管理的所有的Pod信息
pods *podStorage
// 用于合并多个Pod配置来源的更新,
mux *config.Mux

// the channel of denormalized changes passed to listeners
// 通常被用于将不同来源的更新合并并发送给订阅者。订阅者可以通过该通道获取最新的 Pod 配置信息
updates chan kubetypes.PodUpdate

// contains the list of all configured sources
sourcesLock sync.Mutex
// Pod配置来源,i.e.(file, api)
sources sets.String
}

// podStorage管理当前Pod状态,确保更新信息在updates通道是严格有序的传递
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode

// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
// 表示的是Pod状态的更新
updates chan<- kubetypes.PodUpdate

// contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String

// the EventRecorder to use
recorder record.EventRecorder

startupSLIObserver podStartupSLIObserver
}

PodConfig 结构体使用了命令模式的思想,将多个来源的 Pod 配置信息抽象为一个接口,通过 mux 对象实现了命令对象的统一管理和执行,同时通过 updates 通道将执行结果传递给其他组件。这样做的好处是可以很方便地扩展 Pod 配置来源的种类,也可以保证不同来源的更新按顺序执行,避免了竞态问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// pkg/kubelet/kubelet.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())

// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
ctx := context.Background()
if kl.logServer == nil {
file := http.FileServer(http.Dir(nodeLogDir))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
return
} else if nlq != nil {
if req.URL.Path != "/" && req.URL.Path != "" {
http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
return
}
if errs := nlq.validate(); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
return
}
// Validation ensures that the request does not query services and files at the same time
if len(nlq.Services) > 0 {
journal.ServeHTTP(w, req)
return
}
// Validation ensures that the request does not explicitly query multiple files at the same time
if len(nlq.Files) == 1 {
// Account for the \ being used on Windows clients
req.URL.Path = filepath.ToSlash(nlq.Files[0])
}
}
// Fall back in case the caller is directly trying to query a file
// Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
file.ServeHTTP(w, req)
}))
} else {
kl.logServer = http.StripPrefix("/logs/", file)
}
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}

// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}

if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()

// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}

// Start component sync loops.
kl.statusManager.Start()

// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

// Start the pod lifecycle event generator.
kl.pleg.Start()

// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
// syncLoop 是一个处理变化的主循环,监听三个通道(file, apiserver, http)
kl.syncLoop(ctx, updates, kl)
}

REF:
1.https://refactoringguru.cn/design-patterns/command
2.pkg/kubelet/config/config.go
3.pkg/kubelet/kubelet.go