k8s controller-expectation

Controller Expectation

Kubernetes中Controller Expectation(控制器期望机制)是一种用于协调多个控制器对共享资源的操作的机制。它通过维护每个控制器对资源的期望状态,以确保只有一个控制器能够成功执行操作。

该机制的实现是通过Expectations对象来管理的。Expectations对象是一个保存了资源键(Resource Key)和期望计数(Expectation Count)之间映射关系的数据结构。每个控制器都可以通过Expectations对象来声明它对某个资源键的期望计数。

当控制器需要执行对资源的操作时,它会先检查Expectations对象中对应资源键的期望计数是否满足要求。如果满足,则该控制器可以执行操作,并将期望计数减一。如果不满足,则跳过对应的操作

通过使用Controller Expectation机制,可以有效地协调多个控制器并发对共享资源进行操作,避免了冲突和竞争条件的发生。这种机制能够确保每个控制器在执行操作之前都满足了其期望的前置条件,从而保证了系统的一致性和正确性。

Controller Expectation实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// pkg/controller/controller_utils.go
// ControllerExpectations: {
// controller1: expects 2 adds in 2 minutes
// controller2: expects 2 dels in 2 minutes
// controller3: expects -1 adds in 2 minutes => 表示已经达到期望状态
// }
//


// 实现:
// ControlleeExpectation: 通过原子计数来追踪controllee的creation/deletion
// ControllerExpectationsStore: TTLStore + 每个controller设置ControlleeExpectation
//

// 一个controller不会再执行sync直到它的expections状态为fulfilled或expire
// 没有设置expections的Controller将被唤醒以处理每个匹配的控制对象

// 定义了需要实现的方法
type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
SatisfiedExpectations(controllerKey string) bool
DeleteExpectations(controllerKey string)
SetExpectations(controllerKey string, add, del int) error
ExpectCreations(controllerKey string, adds int) error
ExpectDeletions(controllerKey string, dels int) error
CreationObserved(controllerKey string)
DeletionObserved(controllerKey string)
RaiseExpectations(controllerKey string, add, del int)
LowerExpectations(controllerKey string, add, del int)
}

// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
type ControllerExpectations struct {
cache.Store
}

func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
exp, exists, err := r.GetByKey(controllerKey)
if err == nil && exists {
return exp.(*ControlleeExpectations), true, nil
}
return nil, false, err
}

// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
if err := r.Delete(exp); err != nil {
klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
}
}
}

func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() {
klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
return true
} else if exp.isExpired() {
klog.V(4).Infof("Controller expectations expired %#v", exp)
return true
} else {
klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
// When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees.
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
// getting from local store) or this controller hasn't established expectations.
return true
}
func (exp *ControlleeExpectations) isExpired() bool {
return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}

// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
klog.V(4).Infof("Setting expectations %#v", exp)
return r.Add(exp)
}

func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
return r.SetExpectations(controllerKey, adds, 0)
}

func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error {
return r.SetExpectations(controllerKey, 0, dels)
}

// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(-add), int64(-del))
// The expectations might've been modified since the update on the previous line.
klog.V(4).Infof("Lowered expectations %#v", exp)
}
}

// Increments the expectation counts of the given controller.
func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(add), int64(del))
// The expectations might've been modified since the update on the previous line.
klog.V(4).Infof("Raised expectations %#v", exp)
}
}

// CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 1, 0)
}

// DeletionObserved atomically decrements the `del` expectation count of the given controller.
func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 0, 1)
}


// ControlleeExpectations track controllee creates/deletes.
type ControlleeExpectations struct {
// See: https://golang.org/pkg/sync/atomic/ for more information
add int64
del int64
key string
timestamp time.Time
}

// Add increments the add and del counters.
func (e *ControlleeExpectations) Add(add, del int64) {
atomic.AddInt64(&e.add, add)
atomic.AddInt64(&e.del, del)
}

// Fulfilled returns true if this expectation has been fulfilled.
func (e *ControlleeExpectations) Fulfilled() bool {
// TODO: think about why this line being atomic doesn't matter
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

// GetExpectations returns the add and del expectations of the controllee.
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}
expection在replicaSetController中的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// pkg/controller/replicaset/replica_set.go
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {

rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
eventBroadcaster: eventBroadcaster,
burstReplicas: burstReplicas,
// 设置expectations
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
...
}

// pkg/controller/controller_utils.go
func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
}
// 实现了ControllerExpectationsInterface接口
type UIDTrackingControllerExpectations struct {
ControllerExpectationsInterface
uidStoreLock sync.Mutex
// Store used for the UIDs associated with any expectation tracked via the
// ControllerExpectationsInterface.
uidStore cache.Store
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
...
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key)
// 删除对应的key
rsc.expectations.DeleteExpectations(key)
return nil
}
...
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
...
var manageReplicasErr error
// 如果rsNeedsSync不为true,说明有Controller正在执行对应调谐操作
// 这时当时Controller则不会执行manageReplicas,也就是真正的调谐操作
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
...
}

func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
...
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
// 添加期望数diff
rsc.expectations.ExpectCreations(rsKey, diff)
...
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.FromContext(ctx).V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
// 忽略的pod说明不是期望的,这里要减去
rsc.expectations.CreationObserved(rsKey)
}
}
} else if diff > 0 {
// 说明需要缩容
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.FromContext(ctx).V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

relatedPods, err := rsc.getIndirectlyRelatedPods(klog.FromContext(ctx), rs)
utilruntime.HandleError(err)

// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
// 删除了一个pod, expectations上也要减去
rsc.expectations.DeletionObserved(rsKey, podKey)
if !apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
errCh <- err
}
}
}(pod)
}
wg.Wait()

select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
}
}


func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
...

// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(key)

rsc.queue.Add(key)
}


// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)

if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rsc.deletePod(pod)
return
}

// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
// pod创建成功,需要减去pod的期望数
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
...
}


// When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
...

controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)

// 同样当删除了一个pod,期望删除的pod数也更新
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.queue.Add(rsKey)
}

REF:
1.pkg/controller/controller_utils.go
2.pkg/controller/replicaset/replica_set.go
3.pkg/controller/controller_utils.go