今天通过一张图和一段代码来完全掌握k8s中的informer机制。
Reflector: 用于监听指定k8s资源的变化,并将事件推送到DeltaFIFO
中.监听的对象需要实现ListAndWatch
方法.监听的对象可以是k8s中的内置资源也可以自定义资源
DeltaFIFO: 一个存储资源对象的先进先出队列
Indexer: 自带索引功能的本地存储.可参考k8s-indexer
k8s代码版本为V1.21
Reflector结构定义
1 | // staging/src/k8s.io/client-go/tools/cache/reflector.go |
DeltaFIFO结构定义
1 | // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go |
通过一段代码来搞清楚informer是怎么工作的
1 | func main() { |
1.获取informer工厂函数
1 | // client-go/informers/factory.go |
1 | // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. |
2.informer
的创建流程1
2
3
4
5```golang
// client-go/informers/factory.go
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f, f.namespace, f.tweakListOptions)
}
1 | // client-go/informers/core/interface.go |
1 | // client-go/informers/core/v1/interface.go |
1 | // client-go/informers/core/v1/pod.go |
1 | // client-go/informers/factory.go |
3.informer是如何启动的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 创建一个FIFO队列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
// 设置ListerWatcher函数
ListerWatcher: s.listerWatcher,
// 对象类型v1.Pod
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 这里应该是设置事件回调函数
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 根据上面的配置文件,创建一个Controller对象
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 启动两个goroutine
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 运行控制器
s.controller.Run(stopCh)
}
1 | // 进入到s.controller.Run(stopCh),在这里会创建Reflector对象 |
1 | // client-go/tools/cache/delta_fifo.go |
让我们进入process(item)函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64// client-go/tools/cache/shared_informer.go
// 真正处理DeltaFIFO数据的函数
// 这里会将获取到的数据存入indexer中(如果数据存在的话进行更新操作,否则直接插入),完成后还会进行distribute操作
// 将事件分发给监听者
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
// 将事件分发给监听者
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
那存量的pod列表信息是何时放入到DeltaFIFO中的呢,也就是说何时调用ListFunc?1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// 此处会调用client-go/informers/core/v1/pod.go中NewFilteredPodInformer中的ListFunc,获取pod资源对象列表
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 此处将pod资源对象列表插入到DeltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
}
// 将list接口中的数据存放到cache.DeltaFIFO中
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 对store中已存在的元素进行替换操作
return r.store.Replace(found, resourceVersion)
}
1 | // 真正执行入队的操作 |
总结:
使用IDE对文中的示例代码进行调试,因为这段代码会启动多个不会退出的协程,所以为调试增加了难度。执行informer.Run(stopCh)
会运行多个goroutine
,可以在对应的协程代码里面打断点进行调试,更好的理解informer
机制。1
2
3
4
5
6wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
// 运行Reflector实例
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
REF:
1.https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
2.https://cloudnative.to/blog/client-go-informer-source-code