kube-controller之daemonset

Kubernetes中的DaemonSet Controller是负责管理DaemonSet资源的控制器,确保在集群中的每个节点上都运行指定数量的Pod副本。
DaemonSet Controller是一种类型的控制器,用于在每个节点上运行一个Pod副本,以便在整个集群中覆盖所有节点。
DaemonSet通常用于运行守护进程、日志收集器、监控代理等任务。

通过DaemonSet ControllerKubernetes能够实现在整个集群中自动管理和部署Pod副本的能力,确保每个节点都运行所需的应用程序或服务。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
// pkg/controller/daemon/daemon_controller.go

const (
// BurstReplicas is a rate limiter for booting pods on a lot of pods.
// The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
BurstReplicas = 250

// StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
StatusUpdateRetries = 1

// BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
BackoffGCInterval = 1 * time.Minute
)

// 定义daemonset事件原因
const (
// SelectingAllReason 表示daemonset没有设置Selector
SelectingAllReason = "SelectingAll"
// FailedPlacementReason 表示pod不能调度到对应的节点
FailedPlacementReason = "FailedPlacement"
// FailedDaemonPodReason pod的状态是"Failed"
FailedDaemonPodReason = "FailedDaemonPod"
// SucceededDaemonPodReason 成功拉取对应的pod
SucceededDaemonPodReason = "SucceededDaemonPod"
)

type DaemonSetsController struct {
kubeClient clientset.Interface

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

podControl controller.PodControlInterface
crControl controller.ControllerRevisionControlInterface

// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
// burstReplicas 默认值为 250 即每个 syncLoop 中创建或者删除的 pod 数最多为 250 个
burstReplicas int

// To allow injection of syncDaemonSet for testing.
syncHandler func(ctx context.Context, dsKey string) error
// used for unit testing
enqueueDaemonSet func(ds *apps.DaemonSet)
// A TTLCache of pod creates/deletes each ds expects to see
expectations controller.ControllerExpectationsInterface
// dsLister can list/get daemonsets from the shared informer's store
dsLister appslisters.DaemonSetLister
// dsStoreSynced returns true if the daemonset store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dsStoreSynced cache.InformerSynced
// historyLister get list/get history from the shared informers's store
historyLister appslisters.ControllerRevisionLister
// historyStoreSynced returns true if the history store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
historyStoreSynced cache.InformerSynced
// podLister get list/get pods from the shared informers's store
podLister corelisters.PodLister
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
// nodeLister can list/get nodes from the shared informer's store
nodeLister corelisters.NodeLister
// nodeStoreSynced returns true if the node store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
nodeStoreSynced cache.InformerSynced

// DaemonSet keys that need to be synced.
queue workqueue.RateLimitingInterface

failedPodsBackoff *flowcontrol.Backoff
}

// 创建DaemonSetController
func NewDaemonSetsController(
ctx context.Context,
daemonSetInformer appsinformers.DaemonSetInformer,
historyInformer appsinformers.ControllerRevisionInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
logger := klog.FromContext(ctx)
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
},
crControl: controller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
}

// daemonSetInformer 监听daemonSet变化
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addDaemonset(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateDaemonset(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deleteDaemonset(logger, obj)
},
})
dsc.dsLister = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

// historyInformer 监听history变化
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addHistory(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateHistory(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deleteHistory(logger, obj)
},
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced

// 监听pod事件creation/deletion. 我们监听pod事件的原因在于我们不想创建或删除更多的pod直到满足expectations.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addPod(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updatePod(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deletePod(logger, obj)
},
})
dsc.podLister = podInformer.Lister()
dsc.podStoreSynced = podInformer.Informer().HasSynced

// nodeInformer 监听node事件
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addNode(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateNode(logger, oldObj, newObj)
},
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeLister = nodeInformer.Lister()

// 设置调谐函数
dsc.syncHandler = dsc.syncDaemonSet
dsc.enqueueDaemonSet = dsc.enqueue

dsc.failedPodsBackoff = failedPodsBackoff

return dsc, nil
}

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

dsc.eventBroadcaster.StartStructuredLogging(0)
dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
defer dsc.eventBroadcaster.Shutdown()

defer dsc.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting daemon sets controller")
defer logger.Info("Shutting down daemon sets controller")

if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
}

go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())

<-ctx.Done()
}

func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
for dsc.processNextWorkItem(ctx) {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
dsKey, quit := dsc.queue.Get()
if quit {
return false
}
defer dsc.queue.Done(dsKey)

err := dsc.syncHandler(ctx, dsKey.(string))
if err == nil {
dsc.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
dsc.queue.AddRateLimited(dsKey)

return true
}

// 调谐函数
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := dsc.failedPodsBackoff.Clock.Now()

defer func() {
logger.V(4).Info("Finished syncing daemon set", "daemonset", key, "time", dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
}()

// 根据workqueue中的数据解析出namespace和资源名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取对应的资源daemonset
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
}
// 获取所有的node节点
nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}

everything := metav1.LabelSelector{}
// 判断daemonset有没有设置Selector, 如果没有设置则产生事件然后return
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
return nil
}

// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}

// If the DaemonSet is being deleted (either by foreground deletion or
// orphan deletion), we cannot be sure if the DaemonSet history objects
// it owned still exist -- those history objects can either be deleted
// or orphaned. Garbage collector doesn't guarantee that it will delete
// DaemonSet pods before deleting DaemonSet history objects, because
// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
// calculate the status of a DaemonSet being deleted. Therefore, return
// here without updating status for the DaemonSet being deleted.
if ds.DeletionTimestamp != nil {
return nil
}

// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ctx, ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

// 通过expectations机制判断daemonset有没有处理完
if !dsc.expectations.SatisfiedExpectations(dsKey) {
// 没处理完只更新状态
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
}

err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
switch {
case err != nil && statusErr != nil:
// If there was an error, and we failed to update status,
// log it and return the original error.
logger.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
return err
case err != nil:
return err
case statusErr != nil:
return statusErr
}

return nil
}


func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}

// Process rolling updates if we're ready.
// 满足expectations
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}

err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}

return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// pkg/controller/daemon/update.go
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
logger := klog.FromContext(ctx)
// 返回一个字典,key为nodeName,值为列表,元素是对应node上需要创建的pod
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// maxSurge允许的超出期望副本数的最大数量
// maxUnavailable允许的不可用副本的最大数量
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}

now := dsc.failedPodsBackoff.Clock.Now()

// When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
// are necessary, and let the core loop create new instances on those nodes.
//
// Assumptions:
// * Expect manage loop to allow no more than one pod per node
// * Expect manage loop will create new pods
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * The number of new pods that are unavailable must be less than maxUnavailable
// * A node with an available old pod is a candidate for deletion if it does not violate other invariants


if maxSurge == 0 {
var numUnavailable int
var allowedReplacementPods []string
var candidatePodsToDelete []string
for nodeName, pods := range nodeToDaemonPods {
//findUpdatedPodsOnNode函数检查给定节点上的非删除Pod,如果旧Pod和新Pod至多存在一个,则返回true;如果存在多个Pod,则返回false。
// 在这种情况下,我们可以跳过处理特定节点,并让管理循环在下一次循环中处理多余的Pod。
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
numUnavailable++
continue
}
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up to date, check its availability
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// an unavailable new pod is counted against maxUnavailable
numUnavailable++
}
default:
// this pod is old, it is an update candidate
switch {
// 此pod状态不是Available,则加入allowedReplacementPods列表
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, so it needs to be replaced
logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
}
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// no point considering any other candidates
continue
default:
logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
}
// 记录需要被删除的pod
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
}

// use any of the candidates we can, including the allowedReplacemnntPods
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
remainingUnavailable = 0
}
if max := len(candidatePodsToDelete); remainingUnavailable > max {
remainingUnavailable = max
}
oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)

return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
}

// When surging, we create new pods whenever an old pod is unavailable, and we can create up
// to maxSurge extra pods
//
// Assumptions:
// * Expect manage loop to allow no more than two pods per node, one old, one new
// * Expect manage loop will create new pods if there are no pods on node
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * A node with an unavailable old pod is a candidate for immediate new pod creation
// * An old available pod is deleted if a new pod is available
// * No more than maxSurge new pods are created for old available pods at any one time
//
var oldPodsToDelete []string
var candidateNewNodes []string
var allowedNewNodes []string
var numSurge int

for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as a surge node
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
numSurge++
continue
}
switch {
case oldPod == nil:
// we don't need to do anything to this node, the manage loop will handle it
case newPod == nil:
// this is a surge candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, allow it to become a replacement
logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the replacement
if allowedNewNodes == nil {
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
}
allowedNewNodes = append(allowedNewNodes, nodeName)
case numSurge >= maxSurge:
// no point considering any other candidates
continue
default:
logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidateNewNodes == nil {
candidateNewNodes = make([]string, 0, maxSurge)
}
candidateNewNodes = append(candidateNewNodes, nodeName)
}
default:
// we have already surged onto this node, determine our state
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// we're waiting to go available here
numSurge++
continue
}
// we're available, delete the old pod
logger.V(5).Info("DaemonSet pod on node is available, remove old pod", "daemonset", klog.KObj(ds), "newPod", klog.KObj(newPod), "node", nodeName, "oldPod", klog.KObj(oldPod))
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
}
}

// use any of the candidates we can, including the allowedNewNodes
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
remainingSurge := maxSurge - numSurge
if remainingSurge < 0 {
remainingSurge = 0
}
if max := len(candidateNewNodes); remainingSurge > max {
remainingSurge = max
}
newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)

return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

// pkg/controller/daemon/daemon_controller.go
// 在节点上删除或创建对应的pod
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
// 在创建/删除pod前我们需要设置expections以避免竞态情况
logger := klog.FromContext(ctx)
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}

createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)

// 设置最大创建数
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
// 设置最大删除数
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}

// 设置期望值
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff)

logger.V(4).Info("Nodes needing daemon pods for daemon set, creating", "daemonset", klog.KObj(ds), "needCount", nodesNeedingDaemonPods, "createCount", createDiff)
createWait := sync.WaitGroup{}
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()

podTemplate := template.DeepCopy()
// The pod's NodeAffinity will be updated to make sure the Pod is bound
// to the target node by default scheduler. It is safe to do so because there
// should be no conflicting node affinity with the target node.
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))

if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
}

logger.V(4).Info("Pods to delete for daemon set, deleting", "daemonset", klog.KObj(ds), "toDeleteCount", podsToDelete, "deleteCount", deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey)
if !apierrors.IsNotFound(err) {
logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
errCh <- err
utilruntime.HandleError(err)
}
}
}(i)
}
deleteWait.Wait()

// collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}

REF:
1.pkg/controller/daemon/daemon_controller.go
2.pkg/controller/daemon/update.go