kubelet之pleg

k8skubelet是运行在k8s节点中的daemon程序,负责管理节点上的pod并将状态上报给apiserver,并驱动对应的pod更新到预期的状态。为此kubelet需要对pod specs和容器状态进行响应。kubelet从多个源中监听pod specs变化,kubelet会定期调用容器运行时获取容器最新的状态。

随着 pod/容器数量的增加,轮询会产生不可忽略的开销,而且因为kubelet的并行性(它会为每一个pod创建一个goroutine调用容器)更加会增加资源的消耗。同期性的大量并发请求将导致容器运行时消耗大量CPU(即使spec/state没有改变),这将导致极差的性能和一些可靠性问题。 最终,它限制了 Kubelet 的可扩展性。

这个提案的目标是通过降低pod的管理开销以改善kubelet的可扩展性和性能。

  • 减少不必要的工作(如果spec/state没有变化则不需要处理)
  • 降低对容器运行时的并发访问

通过事件驱动的方式代替轮询
image

通过Relisting获取容器状态变化。为了获取pod生命周期事件,。我们可以通过relisting所有的容器(比如docker ps),获取PLEG所需要的容器状态变化。虽然这跟原来的轮询有点类似,但这只会有一个线程对容器运行时进行访问。


以上的内容来自于k8s-design-proposals,详细的内容可以自己去看看。


PLEG全称Pod Lifecycle Event Generator

PodLifecycleEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// pkg/kubelet/pleg/pleg.go
// PodLifecycleEvent is an event that reflects the change of the pod state.
// 描述pod状态变化
type PodLifecycleEvent struct {
// The pod ID.
ID types.UID
// The type of the event.
Type PodLifeCycleEventType
// The accompanied data which varies based on the event type.
// - ContainerStarted/ContainerStopped: the container name (string).
// - All other event types: unused.
Data interface{}
}

type PodLifecycleEventGenerator interface {
Start()
Stop()
Update(relistDuration *RelistDuration)
Watch() chan *PodLifecycleEvent
Healthy() (bool, error)
Relist()
UpdateCache(*kubecontainer.Pod, types.UID) (error, bool)
}


type PodLifeCycleEventType string

type RelistDuration struct {
// The period for relisting.
// 相当于轮询周期
RelistPeriod time.Duration
// The relisting threshold needs to be greater than the relisting period +
// the relisting time, which can vary significantly. Set a conservative
// threshold to avoid flipping between healthy and unhealthy.
// 这个时间必须大于 relistingPeriod + relisting所花费的时间
RelistThreshold time.Duration
}
changes
// 事件的类型
const (
// ContainerStarted - event type when the new state of container is running.
ContainerStarted PodLifeCycleEventType = "ContainerStarted"
// ContainerDied - event type when the new state of container is exited.
ContainerDied PodLifeCycleEventType = "ContainerDied"
// ContainerRemoved - event type when the old state of container is exited.
ContainerRemoved PodLifeCycleEventType = "ContainerRemoved"
// PodSync is used to trigger syncing of a pod when the observed change of
// the state of the pod cannot be captured by any single event above.
// 当一个pod的状态变化没有被任何一个事件补获到时会触发PodSync
PodSync PodLifeCycleEventType = "PodSync"
// ContainerChanged - event type when the new state of container is unknown.
ContainerChanged PodLifeCycleEventType = "ContainerChanged"
)

// pkg/kubelet/kubelet.go
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...) {
...
// pleg和eventedPleg使用的是同一个channel
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity)

if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: eventedPlegRelistPeriod,
RelistThreshold: eventedPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
// In case Evented PLEG has to fall back on Generic PLEG due to an error,
// Evented PLEG should be able to reset the Generic PLEG relisting duration
// to the default value.
eventedRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.eventedPleg = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel,
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{})
} else {
genericRelistDuration := &pleg.RelistDuration{
RelistPeriod: genericPlegRelistPeriod,
RelistThreshold: genericPlegRelistThreshold,
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
}
...
}
1
2
3
4
5
6
7
8
9
10
11
// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
// Start the pod lifecycle event generator.
kl.pleg.Start()
// Feature gate打开后才会远行eventedPLEG
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
...
}
GenericPLEG

GenericPLEG通过周期性的listing以发现容器变化,它假设在一个relist周期中容器不会发生创建,终止和垃圾回收。
如果在relisting过程中发生了容器变化,GenericPLEG将会丢失这些信息。如果此次relisting发生了错误时间窗口会变得更长。
许多kubelet的内部组件都依赖于终止的容器作为保留记录的墓碑。垃圾回收器的实现可以处理这种情况。
然而,为了保证kubelet能够处理丢失的容器事件,建议将重列周期设置短,并在kubelet中设置一个辅助的、较长的定期同步作为安全措施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// pkg/kubelet/pleg/generic.go

type GenericPLEG struct {
// The container runtime.
runtime kubecontainer.Runtime
// The channel from which the subscriber listens events.
eventChannel chan *PodLifecycleEvent
// The internal cache for pod/container information.
podRecords podRecords
// Time of the last relisting.
relistTime atomic.Value
// Cache for storing the runtime states required for syncing pods.
cache kubecontainer.Cache
// For testability.
clock clock.Clock
// Pods that failed to have their status retrieved during a relist. These pods will be
// retried during the next relisting.
podsToReinspect map[types.UID]*kubecontainer.Pod
// Stop the Generic PLEG by closing the channel.
stopCh chan struct{}
// Locks the relisting of the Generic PLEG
relistLock sync.Mutex
// Indicates if the Generic PLEG is running or not
isRunning bool
// Locks the start/stop operation of Generic PLEG
runningMu sync.Mutex
// Indicates relisting related parameters
relistDuration *RelistDuration
// Mutex to serialize updateCache called by relist vs UpdateCache interface
podCacheMutex sync.Mutex
}

// NewGenericPLEG 创建一个GenericPLEG
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
}
}

// 运行Relist, 周期为g.relistDuration.RelistPeriod
func (g *GenericPLEG) Start() {
g.runningMu.Lock()
defer g.runningMu.Unlock()
if !g.isRunning {
g.isRunning = true
g.stopCh = make(chan struct{})
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}

// 定期执行的函数
// 查询容器运行时获取pods/containers列表并和本地缓存中的pods/containers比较,然后生成事件
func (g *GenericPLEG) Relist() {
g.relistLock.Lock()
defer g.relistLock.Unlock()

ctx := context.Background()
klog.V(5).InfoS("GenericPLEG: Relisting")

if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}

timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()

// 获取所的Pod
podList, err := g.runtime.GetPods(ctx, true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
return
}

g.updateRelistTime(timestamp)

pods := kubecontainer.Pods(podList)
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)

// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// 获取oldPod和pod中的container
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
// 生成事件
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}

var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}

// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))

// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod

continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
}
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)

// Map from containerId to exit code; used as a temporary cache for lookup
containerExitCode := make(map[string]int)

for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
// 将事件发送到eventChannel中
// 在kubelet的主循环syncLoop中会对eventChannel进行watch
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
}
}
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
}
}

if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
}

// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}

// make sure we retain the list of pods that need reinspecting the next time relist is called玄武
g.podsToReinspect = needsReinspection
}

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...

plegCh := kl.pleg.Watch()

for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base

kl.syncLoopMonitor.Store(kl.clock.Now())
// 在syncLoopIteration中处理plegCh,也就是上面的eventChannel
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}

EventedPLEG

通过事件驱动的PLEG(pod lifecycle event generator)降低relisting频率以提高性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// pkg/kubelet/pleg/evented.go
type EventedPLEG struct {
// The container runtime.
runtime kubecontainer.Runtime
// The runtime service.
runtimeService internalapi.RuntimeService
// The channel from which the subscriber listens events.
eventChannel chan *PodLifecycleEvent
// Cache for storing the runtime states required for syncing pods.
cache kubecontainer.Cache
// For testability.
clock clock.Clock
// GenericPLEG is used to force relist when required.
genericPleg PodLifecycleEventGenerator
// The maximum number of retries when getting container events from the runtime.
eventedPlegMaxStreamRetries int
// Indicates relisting related parameters
relistDuration *RelistDuration
// Stop the Evented PLEG by closing the channel.
stopCh chan struct{}
// Stops the periodic update of the cache global timestamp.
stopCacheUpdateCh chan struct{}
// Locks the start/stop operation of the Evented PLEG.
runningMu sync.Mutex
}

// NewEventedPLEG instantiates a new EventedPLEG object and return it.
func NewEventedPLEG(runtime kubecontainer.Runtime, runtimeService internalapi.RuntimeService, eventChannel chan *PodLifecycleEvent,
cache kubecontainer.Cache, genericPleg PodLifecycleEventGenerator, eventedPlegMaxStreamRetries int,
relistDuration *RelistDuration, clock clock.Clock) PodLifecycleEventGenerator {
return &EventedPLEG{
runtime: runtime,
runtimeService: runtimeService,
eventChannel: eventChannel,
cache: cache,
genericPleg: genericPleg,
eventedPlegMaxStreamRetries: eventedPlegMaxStreamRetries,
relistDuration: relistDuration,
clock: clock,
}
}

func (e *EventedPLEG) Start() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if isEventedPLEGInUse() {
return
}
setEventedPLEGUsage(true)
e.stopCh = make(chan struct{})
e.stopCacheUpdateCh = make(chan struct{})
go wait.Until(e.watchEventsChannel, 0, e.stopCh)
go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}

func (e *EventedPLEG) watchEventsChannel() {
containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, cap(e.eventChannel))
defer close(containerEventsResponseCh)

// Get the container events from the runtime.
go func() {
numAttempts := 0
for {
// 如果重试次数超过一定的次数则表示EventedPLEG
if numAttempts >= e.eventedPlegMaxStreamRetries {
// 判断是否使用EventedPLEG
// 即使打开了EventedPLEG feature gate,EventedPLEG也可能不可用
// 原因可能是容器运行时没实现容器事件流(container events stream)
if isEventedPLEGInUse() {
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
klog.V(4).InfoS("Fall back to Generic PLEG relisting since Evented PLEG is not working")
e.Stop()
e.genericPleg.Stop() // Stop the existing Generic PLEG which runs with longer relisting period when Evented PLEG is in use.
e.Update(e.relistDuration) // Update the relisting period to the default value for the Generic PLEG.
// 重新切换加genericPleg
e.genericPleg.Start()
break
}
}
// 从容器运行中获取事件
err := e.runtimeService.GetContainerEvents(containerEventsResponseCh)
if err != nil {
metrics.EventedPLEGConnErr.Inc()
numAttempts++
// 如果获取事件失败,则进行Relist以获取容器的最新状态
e.Relist() // Force a relist to get the latest container and pods running metric.
klog.V(4).InfoS("Evented PLEG: Failed to get container events, retrying: ", "err", err)
}
}
}()

if isEventedPLEGInUse() {
// 处理容器运行时事件
e.processCRIEvents(containerEventsResponseCh)
}
}

func (e *EventedPLEG) Watch() chan *PodLifecycleEvent {
return e.eventChannel
}

// Relist relists all containers using GenericPLEG
// 这里使用的是GenericPLEG中的Relist方法
func (e *EventedPLEG) Relist() {
e.genericPleg.Relist()
}

// In case the Evented PLEG experiences undetectable issues in the underlying
// GRPC connection there is a remote chance the pod might get stuck in a
// given state while it has progressed in its life cycle. This function will be
// called periodically to update the global timestamp of the cache so that those
// pods stuck at GetNewerThan in pod workers will get unstuck.
func (e *EventedPLEG) updateGlobalCache() {
e.cache.UpdateTime(time.Now())
}

func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeapi.ContainerEventResponse) {
for event := range containerEventsResponseCh {
// Ignore the event if PodSandboxStatus is nil.
// This might happen under some race condition where the podSandbox has
// been deleted, and therefore container runtime couldn't find the
// podSandbox for the container when generating the event.
// It is safe to ignore because
// a) a event would have been received for the sandbox deletion,
// b) in worst case, a relist will eventually sync the pod status.
// TODO(#114371): Figure out a way to handle this case instead of ignoring.
if event.PodSandboxStatus == nil || event.PodSandboxStatus.Metadata == nil {
klog.ErrorS(nil, "Evented PLEG: received ContainerEventResponse with nil PodSandboxStatus or PodSandboxStatus.Metadata", "containerEventResponse", event)
continue
}

podID := types.UID(event.PodSandboxStatus.Metadata.Uid)
shouldSendPLEGEvent := false

status, err := e.runtime.GeneratePodStatus(event)
if err != nil {
// nolint:logcheck // Not using the result of klog.V inside the
// if branch is okay, we just use it to determine whether the
// additional "podStatus" key and its value should be added.
if klog.V(6).Enabled() {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.ErrorS(err, "Evented PLEG: error generating pod status from the received event", "podUID", podID, "podStatus", status)
}
} else {
if klogV := klog.V(6); klogV.Enabled() {
klogV.InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID, "podStatus", status)
} else {
klog.V(4).InfoS("Evented PLEG: Generated pod status from the received event", "podUID", podID)
}
// Preserve the pod IP across cache updates if the new IP is empty.
// When a pod is torn down, kubelet may race with PLEG and retrieve
// a pod status after network teardown, but the kubernetes API expects
// the completed pod's IP to be available after the pod is dead.
status.IPs = e.getPodIPs(podID, status)
}

e.updateRunningPodMetric(status)
e.updateRunningContainerMetric(status)
e.updateLatencyMetric(event)

if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
for _, sandbox := range status.SandboxStatuses {
if sandbox.Id == event.ContainerId {
// When the CONTAINER_DELETED_EVENT is received by the kubelet,
// the runtime has indicated that the container has been removed
// by the runtime and hence, it must be removed from the cache
// of kubelet too.
e.cache.Delete(podID)
}
}
shouldSendPLEGEvent = true
} else {
if e.cache.Set(podID, status, err, time.Unix(event.GetCreatedAt(), 0)) {
shouldSendPLEGEvent = true
}
}

if shouldSendPLEGEvent {
// 将事件发送到e.eventChannel中
e.processCRIEvent(event)
}
}
}

REF:
1.https://github.com/kubernetes/design-proposals-archive/blob/main/node/pod-lifecycle-event-generator.md
2.pkg/kubelet/pleg/pleg.go
3.pkg/kubelet/kubelet.go
4.pkg/kubelet/pleg/evented.go
5.pkg/kubelet/pleg/generic.go