k8s informer

今天通过一张图和一段代码来完全掌握k8s中的informer机制。

图片来源
image

Reflector: 用于监听指定k8s资源的变化,并将事件推送到DeltaFIFO中.监听的对象需要实现ListAndWatch方法.监听的对象可以是k8s中的内置资源也可以自定义资源
DeltaFIFO: 一个存储资源对象的先进先出队列
Indexer: 自带索引功能的本地存储.可参考k8s-indexer

k8s代码版本为V1.21

Reflector结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// Reflector 监听指定资源变化并将变化事件推送到指定存储
type Reflector struct {
// 只列出部分字段
name string
// 指定存储
// 需实现Add,Update,Delete,List,ListKeys,Get,GetByKey,Replace,Resync方法
store Store
listerWatcher ListerWatcher
...
}

// staging/src/k8s.io/client-go/tools/cache/listwatch.go
type Lister interface {
List(options metav1.ListOptions) (runtime.Object, error)
}


type Watcher interface {
Watch(options metav1.ListOptions) (watch.Interface, error)
}

type ListerWatcher interface {
Lister
Watcher
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}

// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
return lw.ListFunc(options)
}

// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}

DeltaFIFO结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond

items map[string]Deltas
queue []string
...
}

type Delta struct {
Type DeltaType // 事件类型:string
Object interface{} // 资源对象: k8s对象
}
type Deltas []Delta

通过一段代码来搞清楚informer是怎么工作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
// 构建config
config, err := clientcmd.BuildConfigFromFlags("","/root/.kube/config")
if err != nil{
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil{
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
// informer的工厂函数,返回的是sharedInformerFactory对象
factory := informers.NewSharedInformerFactory(clientset, time.Minute)
// 创建Pod资源对象的Informer
informer := factory.Core().V1().Pods().Informer()
// 注册事件回调函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*corev1.Pod)
fmt.Println("add a pod:", p.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
fmt.Println("update a pod")
},
DeleteFunc: func(obj interface{}) {
fmt.Println("delete a pod")
},
})
informer.Run(stopCh)
}

1.获取informer工厂函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// client-go/informers/factory.go
//
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration

informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}

// 调用NewSharedInformerFactoryWithOptions创建一个sharedInformerFactory并最先实例化
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}

// Apply all options
for _, opt := range options {
factory = opt(factory)
}

return factory
}
1
2
3
4
5
6
7
8
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
// 在SharedInformer的基础上提供了Indexers能力
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}

2.informer的创建流程
image

:
1
2
3
4
5
```golang
// client-go/informers/factory.go
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f, f.namespace, f.tweakListOptions)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client-go/informers/core/interface.go
type group struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New(core.New) returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// V1
func (g *group) V1() v1.Interface {
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client-go/informers/core/v1/interface.go
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}

// New(V1.New) returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// Pods返回PodInformer实例
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
1
2
3
4
5
// client-go/informers/core/v1/pod.go

func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

3.informer是如何启动的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 创建一个FIFO队列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
// 设置ListerWatcher函数
ListerWatcher: s.listerWatcher,
// 对象类型v1.Pod
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

// 这里应该是设置事件回调函数
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

// 根据上面的配置文件,创建一个Controller对象
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 启动两个goroutine
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 运行控制器
s.controller.Run(stopCh)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 进入到s.controller.Run(stopCh),在这里会创建Reflector对象
// client-go/tools/cache/controller.go
// s.controller.Run(stopCh)
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 创建Reflector

r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
// 以goroutine的形式启动Reflector
wg.StartWithChannel(stopCh, r.Run)
// 以goroutine的形式启动c.processLoop,
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}

// 从工作队列中获取元素,并进行处理
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// client-go/tools/cache/delta_fifo.go
// 从队列中Pop元素,并执行用户自定义的事件回调函数
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
// 用户自定义的事件 处理函数
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

让我们进入process(item)函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// client-go/tools/cache/shared_informer.go
// 真正处理DeltaFIFO数据的函数
// 这里会将获取到的数据存入indexer中(如果数据存在的话进行更新操作,否则直接插入),完成后还会进行distribute操作
// 将事件分发给监听者
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

// 将事件分发给监听者
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()

if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

那存量的pod列表信息是何时放入到DeltaFIFO中的呢,也就是说何时调用ListFunc?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
// 此处会调用client-go/informers/core/v1/pod.go中NewFilteredPodInformer中的ListFunc,获取pod资源对象列表
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 此处将pod资源对象列表插入到DeltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
}

// 将list接口中的数据存放到cache.DeltaFIFO中
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 对store中已存在的元素进行替换操作
return r.store.Replace(found, resourceVersion)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 真正执行入队的操作
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
oldDeltas := f.items[id]
// 入队
newDeltas := append(oldDeltas, Delta{actionType, obj})
// 去重
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}

总结:
使用IDE对文中的示例代码进行调试,因为这段代码会启动多个不会退出的协程,所以为调试增加了难度。执行informer.Run(stopCh)会运行多个goroutine,可以在对应的协程代码里面打断点进行调试,更好的理解informer机制。

1
2
3
4
5
6
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

// 运行Reflector实例
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)


REF:
1.https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
2.https://cloudnative.to/blog/client-go-informer-source-code