kube-controller之replicaset

ReplicaSet维护一组Pod并使Pod数量和状态达到预期的状态。Deployment就是基于ReplicaSet的一层封装,官方并不建议直接使用ReplicaSet,而是使用Deployment

使用下面的yaml创建对应的RS.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: frontend
labels:
app: guestbook
tier: frontend
spec:

replicas: 3
selector:
matchLabels:
tier: frontend
template:
metadata:
labels:
tier: frontend
spec:
containers:
- name: php-redis
image: hysyeah/my-curl:v1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 80

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
type ReplicaSetController struct {
// GroupVersionKind indicates the controller type.
// Different instances of this struct may handle different GVKs.
// For example, this struct can be used (with adapters) to handle ReplicationController.
schema.GroupVersionKind

kubeClient clientset.Interface
podControl controller.PodControlInterface

eventBroadcaster record.EventBroadcaster

// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicaSet for testing.
syncHandler func(ctx context.Context, rsKey string) error

// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations

// A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
rsLister appslisters.ReplicaSetLister
// rsListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
rsIndexer cache.Indexer

// A store of pods, populated by the shared informer passed to NewReplicaSetController
podLister corelisters.PodLister
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced

// Controllers that need to be synced
queue workqueue.RateLimitingInterface
}

func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
if err := metrics.Register(legacyregistry.Register); err != nil {
logger.Error(err, "unable to register metrics")
}
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
"replicaset",
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
eventBroadcaster,
)
}

// 创建一个ReplicaSetController
// 监听ReplicaSet和Pod变化
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {

rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
eventBroadcaster: eventBroadcaster,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addRS,
UpdateFunc: rsc.updateRS,
DeleteFunc: rsc.deleteRS,
})
rsInformer.Informer().AddIndexers(cache.Indexers{
controllerUIDIndex: func(obj interface{}) ([]string, error) {
rs, ok := obj.(*apps.ReplicaSet)
if !ok {
return []string{}, nil
}
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
return []string{}, nil
}
return []string{string(controllerRef.UID)}, nil
},
})
rsc.rsIndexer = rsInformer.Informer().GetIndexer()
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced

rsc.syncHandler = rsc.syncReplicaSet

return rsc
}

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
rsc.eventBroadcaster.StartStructuredLogging(0)
rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
defer rsc.eventBroadcaster.Shutdown()

defer rsc.queue.ShutDown()

controllerName := strings.ToLower(rsc.Kind)
klog.FromContext(ctx).Info("Starting controller", "name", controllerName)
defer klog.FromContext(ctx).Info("Shutting down controller", "name", controllerName)

if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, rsc.worker, time.Second)
}

<-ctx.Done()
}

func (rsc *ReplicaSetController) worker(ctx context.Context) {
for rsc.processNextWorkItem(ctx) {
}
}

func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)

err := rsc.syncHandler(ctx, key.(string))
if err == nil {
// 如果未发生错误,结束重试
rsc.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
// 发生错误则重新入队
rsc.queue.AddRateLimited(key)

return true
}

// syncHandler函数
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
startTime := time.Now()
// 记录此次syncReplicaSet耗时
defer func() {
klog.FromContext(ctx).V(4).Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取对应的RS
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}

// 判断是否需要Sync
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
return nil
}

// 这里会获取RS对应命名空间下的所有pod
// 这是一个优化项,将下面的FilterActivePods合并为一步
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}

// 返回所有Active Pod
filteredPods := controller.FilterActivePods(allPods)

// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
// 新建的RS第一次调谐,filterdPod为nil
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
if err != nil {
return err
}

var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 操作pod
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(klog.FromContext(ctx), rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}


func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
// 获取最新的RS
// 如果fresh.UID和rs.UID不相等. 说明rs不是最新的,直接返回error
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
// 返回PodControllerRefManager
// PodControllerRefManager实现了对Pod管理的一些方法
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
// 认领RS拥有的Pod
return cm.ClaimPods(ctx, filteredPods)
}

// 认领RS所拥有的Pod
// 如果selector匹配,收养orphans pod
// 如果selector不再匹配,释放对应的pod
// 如果error返回为nil,则表示本次调谐成功或者不需要调谐
func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error

// 判断pod.Labels和rs.Selector是否匹配
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
// 收养一个pod
// 如果某个 Pod 没有 OwnerReference 或者其 OwnerReference 不是一个控制器,
// 且其匹配到某 ReplicaSet 的选择算符,则该 Pod 立即被此 ReplicaSet 获得。
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptPod(ctx, obj.(*v1.Pod))
}
// 如果不再匹配则释放对应的pod
release := func(ctx context.Context, obj metav1.Object) error {
return m.ReleasePod(ctx, obj.(*v1.Pod))
}

for _, pod := range pods {
ok, err := m.ClaimObject(ctx, pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
// 如果是新建的RS第一次调谐,claimed为nil
return claimed, utilerrors.NewAggregate(errlist)
}

// 对replicas进行操作
// 不能直接对<filteredPods>进行修改
// 因为这个过程可能失败,失败后需要重新入队
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// 新建的RS,len(filteredPods)=0
// 如果replicas=3,则diff=-3
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
// rsKey = default/frontend
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
// diff<0,说明需要增加pod
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
// 向Controller注册最的Expection
rsc.expectations.ExpectCreations(rsKey, diff)
klog.FromContext(ctx).V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
// 调用接口创建pod
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return nil
}
}
return err
})

// Any skipped pods that we never attempted to start shouldn't be expected.
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.FromContext(ctx).V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.FromContext(ctx).V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

relatedPods, err := rsc.getIndirectlyRelatedPods(klog.FromContext(ctx), rs)
utilruntime.HandleError(err)

// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
// 删除多余的pod
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey)
if !apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
errCh <- err
}
}
}(pod)
}
wg.Wait()

select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
}

return nil
}

REF:
1.pkg/controller/replicaset/replica_set.go
2.pkg/controller/controller_ref_manager.go