k8s hasSynced

k8s cache

k8s中为减轻apiserver的压力,会使用缓存来存储k8s中的资源,这样每次访问就可以直接从缓存了获取数据(k8s通过watchresync机制来保证本地缓存和数据的同步)。

当我们在使用informer机制时,必须等待资源同步到本地缓存中。
所以如何等待和判断数据已经同步到了本地缓存呢?

源码分析
1
2
3
4
5
6
7
8
factory := informers.NewSharedInformerFactory(clientset, time.Minute)
podInformer := factory.Core().V1().Pods().Informer()
...
go podInformer.Run(make(chan struct{}))
// client-go 中通过WaitForCacheSync来等待缓存同步
if !cache.WaitForCacheSync(make(chan struct{}), podInformer.HasSynced) {
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollImmediateUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
// 这里的syncFunc就是podInformer.HasSynced
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}

klog.V(4).Infof("caches populated")
return true
}

func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()

if s.controller == nil {
return false
}
return s.controller.HasSynced()
}

// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
// 最后是调用DeltaFIFO中HasSynced方法进行判断
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
// 为什么这两个条件就能判断是否进行过一次fulllist呢?
// 接着往下看
return f.populated && f.initialPopulationCount == 0
}

// staging/src/k8s.io/apimachinery/pkg/util/wait/poll.go
// PollImmediateUntil执行conditionFunc 直到函数返回true,或者error或者关闭了stopCh
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
done, err := condition()
if err != nil {
return err
}
if done {
return nil
}
select {
case <-stopCh:
return ErrWaitTimeout
default:
return PollUntil(interval, condition, stopCh)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
type DeltaFIFO struct {

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
// 调用了Repllace()函数,或者Delete/Add/Update/AddIfNotPresent, populated = true
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
// 调用Replace()插入数据的长度
initialPopulationCount int
}


func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}

func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// syncWith会调用Delta中的Replace方法
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
}

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}

func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))

// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}

// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}

if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}

if !f.populated {
// 如果在这之前没有调用Add/Update/Delete/AddIfNotPresent,则设置populated = true
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
// 设置initialPopulationCount, 在进行Pop操作的时候会对initialPopulationCount-1
f.initialPopulationCount = keys.Len() + queuedDeletions
}

return nil
}

// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}

deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}

if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
}

return nil
}


func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
小结

综上f.populated && f.initialPopulationCount == 0 表示至少进行过一次全量List


REF:
1.staging/src/k8s.io/client-go/tools/cache/shared_informer.go
2.staging/src/k8s.io/client-go/tools/cache/controller.go
3.staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
4.staging/src/k8s.io/apimachinery/pkg/util/wait/poll.go