在k8s
中kubelet
是运行在k8s
节点中的daemon
程序,负责管理节点上的pod
并将状态上报给apiserver
,并驱动对应的pod
更新到预期的状态。为此kubelet
需要对pod specs
和容器状态进行响应。kubelet
从多个源中监听pod specs
变化,kubelet
会定期调用容器运行时获取容器最新的状态。
随着 pod/容器
数量的增加,轮询会产生不可忽略的开销,而且因为kubelet
的并行性(它会为每一个pod创建一个goroutine调用容器)更加会增加资源的消耗。同期性的大量并发请求将导致容器运行时消耗大量CPU
(即使spec/state没有改变),这将导致极差的性能和一些可靠性问题。 最终,它限制了 Kubelet 的可扩展性。
这个提案的目标是通过降低pod
的管理开销以改善kubelet
的可扩展性和性能。
- 减少不必要的工作(如果spec/state没有变化则不需要处理)
- 降低对容器运行时的并发访问
通过事件驱动的方式代替轮询
通过Relisting
获取容器状态变化。为了获取pod
生命周期事件,。我们可以通过relisting
所有的容器(比如docker ps),获取PLEG
所需要的容器状态变化。虽然这跟原来的轮询有点类似,但这只会有一个线程对容器运行时进行访问。
以上的内容来自于k8s-design-proposals,详细的内容可以自己去看看。
PLEG
全称Pod Lifecycle Event Generator
PodLifecycleEvent
1 | // pkg/kubelet/pleg/pleg.go |
1 | // pkg/kubelet/kubelet.go |
GenericPLEG
GenericPLEG
通过周期性的listing
以发现容器变化,它假设在一个relist
周期中容器不会发生创建,终止和垃圾回收。
如果在relisting
过程中发生了容器变化,GenericPLEG
将会丢失这些信息。如果此次relisting
发生了错误时间窗口会变得更长。
许多kubelet的内部组件都依赖于终止的容器作为保留记录的墓碑。垃圾回收器的实现可以处理这种情况。
然而,为了保证kubelet能够处理丢失的容器事件,建议将重列周期设置短,并在kubelet中设置一个辅助的、较长的定期同步作为安全措施1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231// pkg/kubelet/pleg/generic.go
type GenericPLEG struct {
// The container runtime.
runtime kubecontainer.Runtime
// The channel from which the subscriber listens events.
eventChannel chan *PodLifecycleEvent
// The internal cache for pod/container information.
podRecords podRecords
// Time of the last relisting.
relistTime atomic.Value
// Cache for storing the runtime states required for syncing pods.
cache kubecontainer.Cache
// For testability.
clock clock.Clock
// Pods that failed to have their status retrieved during a relist. These pods will be
// retried during the next relisting.
podsToReinspect map[types.UID]*kubecontainer.Pod
// Stop the Generic PLEG by closing the channel.
stopCh chan struct{}
// Locks the relisting of the Generic PLEG
relistLock sync.Mutex
// Indicates if the Generic PLEG is running or not
isRunning bool
// Locks the start/stop operation of Generic PLEG
runningMu sync.Mutex
// Indicates relisting related parameters
relistDuration *RelistDuration
// Mutex to serialize updateCache called by relist vs UpdateCache interface
podCacheMutex sync.Mutex
}
// NewGenericPLEG 创建一个GenericPLEG
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
}
}
// 运行Relist, 周期为g.relistDuration.RelistPeriod
func (g *GenericPLEG) Start() {
g.runningMu.Lock()
defer g.runningMu.Unlock()
if !g.isRunning {
g.isRunning = true
g.stopCh = make(chan struct{})
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}
// 定期执行的函数
// 查询容器运行时获取pods/containers列表并和本地缓存中的pods/containers比较,然后生成事件
func (g *GenericPLEG) Relist() {
g.relistLock.Lock()
defer g.relistLock.Unlock()
ctx := context.Background()
klog.V(5).InfoS("GenericPLEG: Relisting")
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}
timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()
// 获取所的Pod
podList, err := g.runtime.GetPods(ctx, true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
return
}
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)
// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// 获取oldPod和pod中的container
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
// 生成事件
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
}
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
// Map from containerId to exit code; used as a temporary cache for lookup
containerExitCode := make(map[string]int)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
// 将事件发送到eventChannel中
// 在kubelet的主循环syncLoop中会对eventChannel进行watch
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
}
}
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
}
}
if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
}
// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}
// make sure we retain the list of pods that need reinspecting the next time relist is called玄武
g.podsToReinspect = needsReinspection
}
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
plegCh := kl.pleg.Watch()
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
// 在syncLoopIteration中处理plegCh,也就是上面的eventChannel
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
EventedPLEG
通过事件驱动的PLEG(pod lifecycle event generator)降低relisting频率以提高性能。
1 | // pkg/kubelet/pleg/evented.go |
REF:
1.https://github.com/kubernetes/design-proposals-archive/blob/main/node/pod-lifecycle-event-generator.md
2.pkg/kubelet/pleg/pleg.go
3.pkg/kubelet/kubelet.go
4.pkg/kubelet/pleg/evented.go
5.pkg/kubelet/pleg/generic.go