一个deployment是怎么创建的

从一个deployment到创建对应的资源(如Replicat,pod)主要包括这几个主件kubee-apiserver,kube-controller-manager(deployment-controller,replicaset-controller),kubelet,kube-schedule。

k8s中kube-apiserver负责接收请求,kube-controller-manager负责监听资源变化并作出相应动作,kube-schedule负责调度,kubelet负责把容器拉起来。

当你使用kubectl create 命令创建deploy会发生如下请求。
1
2
3
4
5
➜ ✗ k create -f tt.yaml --v=6
I0401 14:30:30.064130 1625050 loader.go:374] Config loaded from file: /home/xxxx/.kube/config
I0401 14:30:30.094255 1625050 round_trippers.go:553] GET https://192.168.3.112:6443/openapi/v2?timeout=32s 200 OK in 29 milliseconds
I0401 14:30:30.203951 1625050 round_trippers.go:553] POST https://192.168.3.112:6443/apis/apps/v1/namespaces/default/deployments?fieldManager=kubectl-create&fieldValidation=Strict 201 Created in 5 milliseconds
deployment.apps/use created

从日志中可以发现kubectl调用了接口/apis/aapps/v1/namespaces/default/deployments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 在这里会创建一个新的对象
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if objectMeta, err := meta.Accessor(obj); err != nil {
return nil, err
} else {
rest.FillObjectMetaSystemFields(objectMeta)
// 根据GenerateName设置对象名称
if len(objectMeta.GetGenerateName()) > 0 && len(objectMeta.GetName()) == 0 {
objectMeta.SetName(e.CreateStrategy.GenerateName(objectMeta.GetGenerateName()))
}
}
...
// 执行对应的deployment strategy,比如一些验证
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
...
// 获取对象的名称,在这里是deployment name
name, err := e.ObjectNameFunc(obj)
...
// 获取资源对象对应的路径,因为我是在default命名空间下创建的,所以key = "/deployments/default/use"
key, err := e.KeyFunc(ctx, name)
...
// 创建一个空的deployment对象
out := e.NewFunc()
// 创建对象,最终是往etcd中写入了对应的数据,此时对应的replicaset,pod并没有被创建
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
...
}

deployment创建成功后(可以通过kubectl get deploy进行查看),deployment controller就要开始干活了,那deployment controller是怎么知道有活干了呢?

deployment controller通过informer机制可以监听到事件变化,当创建一个deployment后,kube-apiserver会通过http分块传输将输给客户,在这里deployment controller可以看作是apiserver的客户端

deployment创建完成后,deployment就要开始工作创建对应的replicaset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(0)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()

defer dc.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting controller", "controller", "deployment")
defer logger.Info("Shutting down controller", "controller", "deployment")

if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
}

<-ctx.Done()
}

func (dc *DeploymentController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)

err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)

return true
}
真正执行逻辑的函数是
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}

startTime := time.Now()
logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()

deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return nil
}

// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}

if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}

// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(ctx, d); err != nil {
return err
}

if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}

// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}

scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}

switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(ctx, d, rsList, podMap)
// 新建一个deployment会执行到这个逻辑,创建对应的ReplicaSet
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// rolloutRolling会创建对应的ReplicaSet
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)

// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
创建完对应的replicaset之后,replicaset controller也会监听到相应的对象变化信息。现在来到replicaset controller

主要看syncReplicaSet函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.FromContext(ctx).V(4).Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}

rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
return nil
}

// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
filteredPods := controller.FilterActivePods(allPods)

// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
if err != nil {
return err
}

var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(klog.FromContext(ctx), rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
在manageReplicas中会调用 rsc.podControl.CreatePods(…)向apiserver发送一个请求创建对应的pod,接下来就是经过kube-scheduler调度,然后由kubelet将容器拉起

此外省略kube-schedule调度过程,跳至kubelet如何拉取一个容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pkg/kubelet/kubelet.go
// 从多个通道读取变化然后分发给对应的handler,因为是新建pod所以执行handler.HandlePodAdditions(u.Pods)
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}

switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
...
}
进入到HandlePodAdditions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// pkg/kubelet/kubelet.go
// 删减了一些不重要的逻辑
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// 对pod根据创建时间排序
sort.Sort(sliceutils.PodsByCreationTime(pods))

for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// 将pod加入podManager
kl.podManager.AddPod(pod)

// 如果是mirrorpod则进入mirrorpod的处理逻辑
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}

// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutInactivePods(existingPods)

if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
podCopy := pod.DeepCopy()
for _, c := range podCopy.Spec.Containers {
allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
if c.Resources.Requests != nil && found {
c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU]
c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory]
}
}
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
kl.rejectPod(pod, reason, message)
continue
}
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
}
} else {
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
进入到dispatchWork
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// pkg/kubelet/kubelet.go
// 开始在pod worker中进行异步更新
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
进入到UpdatePod
1
2
3
4
5
6
7
8
9
10
11
// pkg/kubelet/pod_workers.go
// 这里会更新pod的一些状态,并开启一个协程进入podWorkerLoop
// 上面说的异步更新就是指这里会开启一个新的协程
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// spawn a pod worker
go func() {
defer runtime.HandleCrash()
defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
p.podWorkerLoop(uid, outCh)
}()
}
进入podWorkerLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// pkg/kubelet/pod_workers.go
// 最终会进入到SyncPod
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
var lastSyncTime time.Time
for range podUpdates {
...
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
// status updates on resyncs (the result of the last sync), transitions to
// terminating (no wait), or on terminated (whatever the most recent state is).
// Only syncing and terminating can generate pod status changes, while terminated
// pods ensure the most recent status makes it to the api server.
var status *kubecontainer.PodStatus
var err error
switch {
case update.Options.RunningPod != nil:
// when we receive a running pod, we don't need status at all because we are
// guaranteed to be terminating and we skip updates to the pod
default:
// wait until we see the next refresh from the PLEG via the cache (max 2s)
// TODO: this adds ~1s of latency on all transitions from sync to terminating
// to terminated, and on all termination retries (including evictions). We should
// improve latency by making the pleg continuous and by allowing pod status
// changes to be refreshed when key events happen (killPod, sync->terminating).
// Improving this latency also reduces the possibility that a terminated
// container's status is garbage collected before we have a chance to update the
// API server (thus losing the exit code).
status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)

if err != nil {
// This is the legacy event thrown by manage pod loop all other events are now dispatched
// from syncPodFn
p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
}

// Take the appropriate action (illegal phases are prevented by UpdatePod)
switch {
case update.WorkType == TerminatedPod:
err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)

case update.WorkType == TerminatingPod:
var gracePeriod *int64
if opt := update.Options.KillPodOptions; opt != nil {
gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
}
podStatusFn := p.acknowledgeTerminating(podUID)

// if we only have a running pod, terminate it directly
if update.Options.RunningPod != nil {
err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
} else {
err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
}

default:
isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}

lastSyncTime = p.clock.Now()
return err
}()


}
}
syncPod
1
2
3
4
5
// pkg/kubelet/kubelet.go
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
...

kl.containerRuntime.SyncPod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// 在这里会调用CRI创建对应的容器,CNI由CRI负责调用
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(ctx, pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}

// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
} else {
klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
}

killResult := m.killPodWithSyncResult(ctx, pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
return
}

if podContainerChanges.CreateSandbox {
m.purgeInitContainers(ctx, pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
return
}
}
}

// Keep terminated init containers fairly aggressively controlled
// This is an optimization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(ctx, pod, podStatus)

// We pass the value of the PRIMARY podIP and list of podIPs down to
// generatePodSandboxConfig and generateContainerConfig, which in turn
// passes it to various other functions, in order to facilitate functionality
// that requires this value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet. The list of podIPs is used to
// generate the hosts file.
//
// We default to the IPs in the passed-in pod status, and overwrite them if the
// sandbox needs to be (re)started.
var podIPs []string
if podStatus != nil {
podIPs = podStatus.IPs
}

// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error

klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
metrics.StartedPodsTotal.Inc()
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)

// ConvertPodSysctlsVariableToDotsSeparator converts sysctl variable
// in the Pod.Spec.SecurityContext.Sysctls slice into a dot as a separator.
// runc uses the dot as the separator to verify whether the sysctl variable
// is correct in a separate namespace, so when using the slash as the sysctl
// variable separator, runc returns an error: "sysctl is not in a separate kernel namespace"
// and the podSandBox cannot be successfully created. Therefore, before calling runc,
// we need to convert the sysctl variable, the dot is used as a separator to separate the kernel namespace.
// When runc supports slash as sysctl separator, this function can no longer be used.
sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)

// Prepare resources allocated by the Dynammic Resource Allocation feature for the pod
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if m.runtimeHelper.PrepareDynamicResources(pod) != nil {
return
}
}

podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
if err != nil {
// createPodSandbox can return an error from CNI, CSI,
// or CRI if the Pod has been deleted while the POD is
// being created. If the pod has been deleted then it's
// not a real error.
//
// SyncPod can still be running when we get here, which
// means the PodWorker has not acked the deletion.
if m.podStateProvider.IsPodTerminationRequested(pod.UID) {
klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
metrics.StartedPodsErrorsTotal.Inc()
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
return
}
klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))

resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))
result.Fail(err)
return
}
if resp.GetStatus() == nil {
result.Fail(errors.New("pod sandbox status is nil"))
return
}

// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())
klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
}
}

// the start containers routines depend on pod ip(as in primary pod ip)
// instead of trying to figure out if we have 0 < len(podIPs)
// everytime, we short circuit it here
podIP := ""
if len(podIPs) != 0 {
podIP = podIPs[0]
}

// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}

// Helper containing boilerplate common to starting all types of containers.
// typeName is a description used to describe this type of container in log messages,
// currently: "container", "init container" or "ephemeral container"
// metricLabel is the label used to describe this type of container in monitoring metrics.
// currently: "container", "init_container" or "ephemeral_container"
start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)

isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
return err
}

metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
}
klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
}
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
}
return err
}

return nil
}

// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}

// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}

// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}

// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if isInPlacePodVerticalScalingAllowed(pod) {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
}
}

// Step 8: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}

return
}


1
2
3
4
5
6
7
deployment conroller 入口
pkg/controller/deployment/deployment_controller.go:157
func (dc *DeploymentController) Run(ctx context.Context, workers int)

replicaset controller
pkg/controller/replicaset/replica_set.go:190
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int)

REF:
1.https://github.com/kubernetes/kubernetes