一个pod的正常终止流程

当你使用命令kubectl delete pod pod_name来删除一个pod,你有了解过整个流程是怎么样的吗?pod是如何被删除的,kubelet做了什么,容器运行时又做了什么?今天就来分析一下一个pod正常的终止流程。

新建一个pod

你可以执行命令kubectl apply -f mycurl.yaml创建一个pod

1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: v1
kind: Pod
metadata:
name: my-curl
labels:
app: my-curl
spec:
containers:
- name: my-curl
image: hysyeah/my-curl:v1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080

可以看到pod已经运行起来

1
2
3
➜ k get pod
NAME READY STATUS RESTARTS AGE
my-curl 1/1 Running 0 3s

删除pod

执行命令kubectl delete pod my-curl --v=6,从日志中可以看到kubectl发送了一个DELETE请求给apiserver,请求路径为api/v1/namespaces/default/pods/my-curl。然后输出日志表示pod已经被删除,
看似只发送了一个请求就把pod删除了,但删除一个其实是需要多个组件的协作的

1
2
I0418 round_trippers.go:553] DELETE https://192.168.2.123:6443/api/v1/namespaces/default/pods/my-curl 200 OK in 12 milliseconds
pod "my-curl" deleted

apiserver删除pod

kubectl发送DELETE请求给apiserver,apiserver调用etcd接口删除对应的记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
// 对etcd接口的一层封装,比如会执行删除前的一些策略
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
obj := e.NewFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil {
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}

// support older consumers of delete by treating "nil" as delete immediately
if options == nil {
options = metav1.NewDeleteOptions(0)
}
var preconditions storage.Preconditions
if options.Preconditions != nil {
preconditions.UID = options.Preconditions.UID
preconditions.ResourceVersion = options.Preconditions.ResourceVersion
}

// BeforeDelete函数用于测试对象是否可以进行优雅删除。
// 如果设置了graceful参数,表示对象应该进行优雅删除。如果设置了gracefulPending参数,表示对象已经被优雅删除。
// BeforeDelete会设置对象的DeletionTimestamp
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
if err != nil {
return nil, false, err
}
// this means finalizers cannot be updated via DeleteOptions if a deletion is already pending
if pendingGraceful {
out, err := e.finalizeDelete(ctx, obj, false, options)
return out, false, err
}
// check if obj has pending finalizers
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, false, apierrors.NewInternalError(err)
}
pendingFinalizers := len(accessor.GetFinalizers()) != 0
var ignoreNotFound bool
var deleteImmediately bool = true
var lastExisting, out runtime.Object

// Handle combinations of graceful deletion and finalization by issuing
// the correct updates.
shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options)
// TODO: remove the check, because we support no-op updates now.
if graceful || pendingFinalizers || shouldUpdateFinalizers {
err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
// Update the preconditions.ResourceVersion if set since we updated the object.
if err == nil && deleteImmediately && preconditions.ResourceVersion != nil {
accessor, err = meta.Accessor(out)
if err != nil {
return out, false, apierrors.NewInternalError(err)
}
resourceVersion := accessor.GetResourceVersion()
preconditions.ResourceVersion = &resourceVersion
}
}

// !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.
if !deleteImmediately || err != nil {
return out, false, err
}

// Going further in this function is not useful when we are
// performing a dry-run request. Worse, it will actually
// override "out" with the version of the object in database
// that doesn't have the finalizer and deletiontimestamp set
// (because the update above was dry-run too). If we already
// have that version available, let's just return it now,
// otherwise, we can call dry-run delete that will get us the
// latest version of the object.
if dryrun.IsDryRun(options.DryRun) && out != nil {
return out, true, nil
}

// delete immediately, or no graceful deletion supported
klog.V(6).InfoS("Going to delete object from registry", "object", klog.KRef(genericapirequest.NamespaceValue(ctx), name))
out = e.NewFunc()
// 只有能立即删除的情况下才会直接删除etcd中的数据
if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun), nil); err != nil {
// Please refer to the place where we set ignoreNotFound for the reason
// why we ignore the NotFound error .
if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
// The lastExisting object may not be the last state of the object
// before its deletion, but it's the best approximation.
out, err := e.finalizeDelete(ctx, lastExisting, true, options)
return out, true, err
}
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
out, err = e.finalizeDelete(ctx, out, true, options)
return out, true, err
}

// staging/src/k8s.io/apiserver/pkg/registry/rest/delete.go
func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
objectMeta, gvk, kerr := objectMetaAndKind(strategy, obj)
if kerr != nil {
return false, false, kerr
}
if errs := validation.ValidateDeleteOptions(options); len(errs) > 0 {
return false, false, errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "DeleteOptions"}, "", errs)
}
// Checking the Preconditions here to fail early. They'll be enforced later on when we actually do the deletion, too.
if options.Preconditions != nil {
if options.Preconditions.UID != nil && *options.Preconditions.UID != objectMeta.GetUID() {
return false, false, errors.NewConflict(schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind}, objectMeta.GetName(), fmt.Errorf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *options.Preconditions.UID, objectMeta.GetUID()))
}
if options.Preconditions.ResourceVersion != nil && *options.Preconditions.ResourceVersion != objectMeta.GetResourceVersion() {
return false, false, errors.NewConflict(schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind}, objectMeta.GetName(), fmt.Errorf("the ResourceVersion in the precondition (%s) does not match the ResourceVersion in record (%s). The object might have been modified", *options.Preconditions.ResourceVersion, objectMeta.GetResourceVersion()))
}
}

// Negative values will be treated as the value `1s` on the delete path.
if gracePeriodSeconds := options.GracePeriodSeconds; gracePeriodSeconds != nil && *gracePeriodSeconds < 0 {
options.GracePeriodSeconds = utilpointer.Int64(1)
}
if deletionGracePeriodSeconds := objectMeta.GetDeletionGracePeriodSeconds(); deletionGracePeriodSeconds != nil && *deletionGracePeriodSeconds < 0 {
objectMeta.SetDeletionGracePeriodSeconds(utilpointer.Int64(1))
}

gracefulStrategy, ok := strategy.(RESTGracefulDeleteStrategy)
if !ok {
// If we're not deleting gracefully there's no point in updating Generation, as we won't update
// the obcject before deleting it.
return false, false, nil
}
// if the object is already being deleted, no need to update generation.
if objectMeta.GetDeletionTimestamp() != nil {
// if we are already being deleted, we may only shorten the deletion grace period
// this means the object was gracefully deleted previously but deletionGracePeriodSeconds was not set,
// so we force deletion immediately
// IMPORTANT:
// The deletion operation happens in two phases.
// 1. Update to set DeletionGracePeriodSeconds and DeletionTimestamp
// 2. Delete the object from storage.
// If the update succeeds, but the delete fails (network error, internal storage error, etc.),
// a resource was previously left in a state that was non-recoverable. We
// check if the existing stored resource has a grace period as 0 and if so
// attempt to delete immediately in order to recover from this scenario.
if objectMeta.GetDeletionGracePeriodSeconds() == nil || *objectMeta.GetDeletionGracePeriodSeconds() == 0 {
return false, false, nil
}
// only a shorter grace period may be provided by a user
if options.GracePeriodSeconds != nil {
period := int64(*options.GracePeriodSeconds)
if period >= *objectMeta.GetDeletionGracePeriodSeconds() {
return false, true, nil
}
newDeletionTimestamp := metav1.NewTime(
objectMeta.GetDeletionTimestamp().Add(-time.Second * time.Duration(*objectMeta.GetDeletionGracePeriodSeconds())).
Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
objectMeta.SetDeletionTimestamp(&newDeletionTimestamp)
objectMeta.SetDeletionGracePeriodSeconds(&period)
return true, false, nil
}
// graceful deletion is pending, do nothing
options.GracePeriodSeconds = objectMeta.GetDeletionGracePeriodSeconds()
return false, true, nil
}

// `CheckGracefulDelete` will be implemented by specific strategy
if !gracefulStrategy.CheckGracefulDelete(ctx, obj, options) {
return false, false, nil
}

if options.GracePeriodSeconds == nil {
return false, false, errors.NewInternalError(fmt.Errorf("options.GracePeriodSeconds should not be nil"))
}

now := metav1.NewTime(metav1.Now().Add(time.Second * time.Duration(*options.GracePeriodSeconds)))
objectMeta.SetDeletionTimestamp(&now)
objectMeta.SetDeletionGracePeriodSeconds(options.GracePeriodSeconds)
// If it's the first graceful deletion we are going to set the DeletionTimestamp to non-nil.
// Controllers of the object that's being deleted shouldn't take any nontrivial actions, hence its behavior changes.
// Thus we need to bump object's Generation (if set). This handles generation bump during graceful deletion.
// The bump for objects that don't support graceful deletion is handled in pkg/registry/generic/registry/store.go.
if objectMeta.GetGeneration() > 0 {
objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)
}

return true, false, nil
}

kubelet开始工作

apiserver删除pod后,因为kubeletpod资源进行了监听,所以当pod被删除后,kubelet可以获得pod删除的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 会进入kubetypes.DELETE分支
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}

switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}

然后会进入到容器删除流程(这里是进入了另一个goroutine里),函数调用流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// killPod instructs the container runtime to kill the pod. This method requires that
// the pod status contains the result of the last syncPod, otherwise it may fail to
// terminate newly created containers and sandboxes.
// 调用容器运行删除对应的容器
func (kl *Kubelet) killPod(ctx context.Context, pod *v1.Pod, p kubecontainer.Pod, gracePeriodOverride *int64) error {
// Call the container runtime KillPod method which stops all known running containers of the pod
if err := kl.containerRuntime.KillPod(ctx, pod, p, gracePeriodOverride); err != nil {
return err
}
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).InfoS("Failed to update QoS cgroups while killing pod", "err", err)
}
return nil
}

// 删除pod内的所有容器
func (m *kubeGenericRuntimeManager) KillPod(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
err := m.killPodWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
return err.Error()
}

// 删除一个Runing状态的pod
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
killContainerResults := m.killContainersWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
for _, containerResult := range killContainerResults {
result.AddSyncResult(containerResult)
}

// stop sandbox, the sandbox will be removed in GarbageCollect
killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
result.AddSyncResult(killSandboxResult)
// Stop all sandboxes belongs to same pod
for _, podSandbox := range runningPod.Sandboxes {
if err := m.runtimeService.StopPodSandbox(ctx, podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {
killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)
}
}

return
}


// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}

wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer wg.Done()

killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(ctx, pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
// Use runningPod for logging as the pod passed in could be *nil*.
klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID,
"containerName", container.Name, "containerID", container.ID)
}
containerResults <- killContainerResult
}(container)
}
wg.Wait()
close(containerResults)

for containerResult := range containerResults {
syncResults = append(syncResults, containerResult)
}
return
}


// kill一个容器
// 如果设置了pre-stop则执行对应的hooks
// 调用cri停止容器
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error {
var containerSpec *v1.Container
if pod != nil {
if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q",
containerName, containerID.String(), format.Pod(pod), message)
}
} else {
// Restore necessary information if one of the specs is nil.
restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(ctx, containerID)
if err != nil {
return err
}
pod, containerSpec = restoredPod, restoredContainer
}

// From this point, pod and container must be non-nil.
// gracePeriod默认为30s
gracePeriod := setTerminationGracePeriod(pod, containerSpec, containerName, containerID, reason)

if len(message) == 0 {
message = fmt.Sprintf("Stopping container %s", containerSpec.Name)
}
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)

// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
// 执行pre-stop hooks并更新gracePeriod,gracePeriod会作为超时参数传递给cri
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
gracePeriod = gracePeriod - m.executePreStopHook(ctx, pod, containerID, containerSpec, gracePeriod)
}
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
klog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
}

klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
// 真正进行cri调用的地方,如果要看StopContainer具体做了什么需要查看对应cri的代码
// 因为使用的是containerd,因此可以在containerd项目中查看对应的源码
// 如果没有设置pre-stop hooks,gracePeriod默认为30s(这其实也就是stopContainer的超时时间)
// 如果你的应用没有设置处理SIGTERM信号的话,一般来说进程会马上退出(也就是容器会被删除)
err := m.runtimeService.StopContainer(ctx, containerID.ID, gracePeriod)
if err != nil && !crierror.IsNotFound(err) {
klog.ErrorS(err, "Container termination failed with gracePeriod", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
return err
}
klog.V(3).InfoS("Container exited normally", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String())

return nil
}

调用Containerd接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// pkg/cri/sbserver/container_stop.go
func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {
start := time.Now()
// Get container config from container store.
container, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %w", r.GetContainerId(), err)
}
// 停止一个容器,可以直接看这个函数
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
return nil, err
}

sandbox, err := c.sandboxStore.Get(container.SandboxID)
// 删除对应的sandbox
if err != nil {
err = c.nri.StopContainer(ctx, nil, &container)
} else {
err = c.nri.StopContainer(ctx, &sandbox, &container)
}
if err != nil {
log.G(ctx).WithError(err).Error("NRI failed to stop container")
}

i, err := container.Container.Info(ctx)
if err != nil {
return nil, fmt.Errorf("get container info: %w", err)
}

containerStopTimer.WithValues(i.Runtime.Name).UpdateSince(start)

return &runtime.StopContainerResponse{}, nil
}


func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
id := container.ID
sandboxID := container.SandboxID


state := container.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING &&
state != runtime.ContainerState_CONTAINER_UNKNOWN {
log.G(ctx).Infof("Container to stop %q must be in running or unknown state, current state %q",
id, criContainerStateToString(state))
return nil
}

task, err := container.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get task for container %q: %w", id, err)
}
// Don't return for unknown state, some cleanup needs to be done.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
}
return nil
}

// Handle unknown state.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
// Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
}
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
}

exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
}

// We only need to kill the task. The event handler will Delete the
// task from containerd after it handles the Exited event.
if timeout > 0 {
// 如果timeout > 0 相当于是执行SIGTERM命令
stopSignal := "SIGTERM"
if container.StopSignal != "" {
stopSignal = container.StopSignal
} else {
// The image may have been deleted, and the `StopSignal` field is
// just introduced to handle that.
// However, for containers created before the `StopSignal` field is
// introduced, still try to get the stop signal from the image config.
// If the image has been deleted, logging an error and using the
// default SIGTERM is still better than returning error and leaving
// the container unstoppable. (See issue #990)
// TODO(random-liu): Remove this logic when containerd 1.2 is deprecated.
image, err := c.GetImage(container.ImageRef)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get image %q: %w", container.ImageRef, err)
}
log.G(ctx).Warningf("Image %q not found, stop container with signal %q", container.ImageRef, stopSignal)
} else {
if image.ImageSpec.Config.StopSignal != "" {
stopSignal = image.ImageSpec.Config.StopSignal
}
}
}
sig, err := signal.ParseSignal(stopSignal)
if err != nil {
return fmt.Errorf("failed to parse stop signal %q: %w", stopSignal, err)
}

var sswt bool
if container.IsStopSignaledWithTimeout == nil {
log.G(ctx).Infof("unable to ensure stop signal %v was not sent twice to container %v", sig, id)
sswt = true
} else {
sswt = atomic.CompareAndSwapUint32(container.IsStopSignaledWithTimeout, 0, 1)
}

if sswt {
log.G(ctx).Infof("Stop container %q with signal %v", id, sig)
// 执行kill操作,这里其实就是发送一个信号给对应的进程
if err = task.Kill(ctx, sig); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container %q: %w", id, err)
}
} else {
log.G(ctx).Infof("Skipping the sending of signal %v to container %q because a prior stop with timeout>0 request already sent the signal", sig, id)
}

sigTermCtx, sigTermCtxCancel := context.WithTimeout(ctx, timeout)
defer sigTermCtxCancel()
// 等待进程退出直到超时,如果超时了也会直接执行下面SIGKILL操作,直接kill进程
err = c.waitContainerStop(sigTermCtx, container)
if err == nil {
// Container stopped on first signal no need for SIGKILL
return nil
}
// If the parent context was cancelled or exceeded return immediately
if ctx.Err() != nil {
return ctx.Err()
}
// sigTermCtx was exceeded. Send SIGKILL
log.G(ctx).Debugf("Stop container %q with signal %v timed out", id, sig)
}

log.G(ctx).Infof("Kill container %q", id)
// 如果timeout<=0,则发送SIGKILL信号,强制退出
if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to kill container %q: %w", id, err)
}

// Wait for a fixed timeout until container stop is observed by event monitor.
err = c.waitContainerStop(ctx, container)
if err != nil {
return fmt.Errorf("an error occurs during waiting for container %q to be killed: %w", id, err)
}
return nil
}

容器终止后,kubelet会将pod的信息从缓存中删除,至此整个删除流程就走完了。


小结:

  • kubectl发送请求给apiserver
  • kubelet执行pre-stop hook,调用cri删除容器
  • kubeletpod信息从缓存的删除

从上面的内容中我们可以得到如何使pod如何进行优雅退出

  • 设置pre-stop hook
  • 在应用中处理SIGTERM信号

以上两种方法都能实现优雅退出但都不能超过最大退出时间(默认30s),否则进程将会被强制kill。


REF:
1.pkg/cri/sbserver/container_stop.go
2.pkg/kubelet/kubelet.go
3.pkg/kubelet/kubelet_pods.go