k8s源码设计模式之Observer

观察者模式,也叫事件订阅,监听者,Listener,Observer。
观察者模式是一种行为设计模式, 允许你定义一种订阅机制, 可在对象事件发生时通知多个 “观察” 该对象的其他对象。

k8s中的观察者模式

Kubernetes中观察者模式主要是通过watch机制实现的。watch机制是一种持续性的HTTP请求(通过块传输),当资源的状态发生变化时,服务器会返回该资源的新状态。客户端通过不断发起HTTP请求获取新状态,从而实现对资源的观察。watch机制是基于长轮询机制实现的,即服务器在有新状态时才会立即返回结果,否则会一直等待一段时间再返回结果。

在Kubernetes中,kube-apiserver负责处理客户端的watch请求。当客户端订阅一个资源时,kube-apiserver会在内部创建一个watcher对象,然后将其添加到对应资源的watcher列表中。当资源状态发生变化时,kube-apiserver会遍历watcher列表,并向所有订阅该资源的客户端发送新状态。客户端在收到新状态后,需要通过比较前后两个状态的差异来判断资源状态是否发生变化,并进行相应的处理。

执行命令kubectl get deployment use -n default -w可以监听名称为usedeployment的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 当这条语句执行会调用Watch
// newCacheWatcher创建一个watcher
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
...
watcher := newCacheWatcher(
chanSize,
filterWithAttrsFunction(key, pred),
emptyFunc,
c.versioner,
deadline,
pred.AllowWatchBookmarks,
c.groupResource,
identifier,
)
...
func() {
c.Lock()
defer c.Unlock()

if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
return
}

// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Update the bookMarkAfterResourceVersion
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
// 将创建的watcher添加到Cacher中的watchers
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
addedWatcher = true

// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
...
// 启动一个goroutine
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func(bool)
versioner storage.Versioner
deadline time.Time
allowWatchBookmarks bool
groupResource schema.GroupResource

identifier string

drainInputBuffer bool

bookmarkAfterResourceVersion uint64

// stateMutex protects state
stateMutex sync.Mutex
state int
}


func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(c.result)
defer c.Stop()

// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()

initEventCount := 0
for {
event, err := cacheInterval.Next()
if err != nil {
// An error indicates that the cache interval
// has been invalidated and can no longer serve
// events.
//
// Initially we considered sending an "out-of-history"
// Error event in this case, but because historically
// such events weren't sent out of the watchCache, we
// decided not to. This is still ok, because on watch
// closure, the watcher will try to re-instantiate the
// watch and then will get an explicit "out-of-history"
// window. There is potential for optimization, but for
// now, in order to be on the safe side and not break
// custom clients, the cost of it is something that we
// are fully accepting.
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
// 转换成watchEvent,并发送到c.result
c.sendWatchCacheEvent(event)

if event.ResourceVersion > resourceVersion {
resourceVersion = event.ResourceVersion
}
initEventCount++
}

if initEventCount > 0 {
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}

c.process(ctx, resourceVersion)
}
事件通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// dispatchEvents --> dispatchEvent --> watcher.add
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
// Watchers stopped after startDispatching will be delayed to finishDispatching,

// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent

c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}

if len(c.blockedWatchers) > 0 {
// dispatchEvent is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := c.dispatchTimeoutBudget.takeAvailable()
c.timer.Reset(timeout)

// Send event to all blocked watchers. As long as timer is running,
// `add` will wait for the watcher to unblock. After timeout,
// `add` will not wait, but immediately close a still blocked watcher.
// Hence, every watcher gets the chance to unblock itself while timer
// is running, not only the first ones in the list.
timer := c.timer
for _, watcher := range c.blockedWatchers {
if !watcher.add(event, timer) {
// fired, clean the timer by set it to nil.
timer = nil
}
}

// Stop the timer if it is not fired
if timer != nil && !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-timer.C
}

c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
}
}
}

REF:
1.staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
2.staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go)
3.staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go