k8s expiring cache

如何实现一个带ttl的缓存。今天来学习下k8s源码中是怎么实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

package cache

import (
"container/heap"
"sync"
"time"

"k8s.io/utils/clock"
)

// NewExpiring 返回初始化后的Expiring
func NewExpiring() *Expiring {
return NewExpiringWithClock(clock.RealClock{})
}

// NewExpiringWithClock 与NewExpiring的不同之处在于可以传入一个clock参数,方便在测试中使用
func NewExpiringWithClock(clock clock.Clock) *Expiring {
// clock使用的是k8s封装过后的clock
return &Expiring{
clock: clock,
cache: make(map[interface{}]entry),
}
}

// Expiring is a map whose entries expire after a per-entry timeout.
// Expiring 实现一个可以设置过期时间的的缓存
type Expiring struct {
// AllowExpiredGet causes the expiration check to be skipped on Get.
// It should only be used when a key always corresponds to the exact same value.
// Thus when this field is true, expired keys are considered valid
// until the next call to Set (which causes the GC to run).
// It may not be changed concurrently with calls to Get.
// 如果AllowExpiredGet为true, Get操作跳过过期检查
AllowExpiredGet bool

clock clock.Clock

// mu protects the below fields
mu sync.RWMutex
// cache is the internal map that backs the cache.
cache map[interface{}]entry
// generation is used as a cheap resource version for cache entries. Cleanups
// are scheduled with a key and generation. When the cleanup runs, it first
// compares its generation with the current generation of the entry. It
// deletes the entry iff the generation matches. This prevents cleanups
// scheduled for earlier versions of an entry from deleting later versions of
// an entry when Set() is called multiple times with the same key.
//
// The integer value of the generation of an entry is meaningless.
// 当进行多次Set操作时,上次Set 操作中的gc可能还没有完成
// 如果generation 与entry中的generation不一致,则说明值是新值
// 则会跳过删除操作
generation uint64

heap expiringHeap
}

type entry struct {
val interface{}
expiry time.Time
generation uint64
}

// Get looks up an entry in the cache.
// Get 根据key从map中获取对应的值
func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()
e, ok := c.cache[key]
if !ok {
return nil, false
}
// 如果不是跳过过期检查且key已经过期,返回nil, false
if !c.AllowExpiredGet && !c.clock.Now().Before(e.expiry) {
return nil, false
}
return e.val, true
}

// Set sets a key/value/expiry entry in the map, overwriting any previous entry
// with the same key. The entry expires at the given expiry time, but its TTL
// may be lengthened or shortened by additional calls to Set(). Garbage
// collection of expired entries occurs during calls to Set(), however calls to
// Get() will not return expired entries that have not yet been garbage
// collected.
// Set 设置值
func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) {
now := c.clock.Now()
expiry := now.Add(ttl)

c.mu.Lock()
defer c.mu.Unlock()

// 增加generation
c.generation++

c.cache[key] = entry{
val: val,
expiry: expiry,
generation: c.generation,
}

// Run GC inline before pushing the new entry.
// 进行gc操作
// 会一直进行gc清除操作直到队列长度等于0或者不存在过期的数据
c.gc(now)

// 插入到优先队列中
heap.Push(&c.heap, &expiringHeapEntry{
key: key,
expiry: expiry,
generation: c.generation,
})
}

// Delete deletes an entry in the map.
func (c *Expiring) Delete(key interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.del(key, 0)
}

// del deletes the entry for the given key. The generation argument is the
// generation of the entry that should be deleted. If the generation has been
// changed (e.g. if a set has occurred on an existing element but the old
// cleanup still runs), this is a noop. If the generation argument is 0, the
// entry's generation is ignored and the entry is deleted.
//
// del must be called under the write lock.
func (c *Expiring) del(key interface{}, generation uint64) {
e, ok := c.cache[key]
if !ok {
return
}
// 这里会对generation进行比较,如果不相等则直接返回
if generation != 0 && generation != e.generation {
return
}
delete(c.cache, key)
}

// Len returns the number of items in the cache.
func (c *Expiring) Len() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cache)
}

func (c *Expiring) gc(now time.Time) {
for {
// Return from gc if the heap is empty or the next element is not yet
// expired.
//
// heap[0] is a peek at the next element in the heap, which is not obvious
// from looking at the (*expiringHeap).Pop() implementation below.
// heap.Pop() swaps the first entry with the last entry of the heap, then
// calls (*expiringHeap).Pop() which returns the last element.
// 如果heap长度为0 且第一个key没有过期,则直接返回
if len(c.heap) == 0 || now.Before(c.heap[0].expiry) {
return
}
// 从堆中pop, 然后删除map中的数据
cleanup := heap.Pop(&c.heap).(*expiringHeapEntry)
c.del(cleanup.key, cleanup.generation)
}
}

type expiringHeapEntry struct {
key interface{}
expiry time.Time
generation uint64
}

// expiringHeap is a min-heap ordered by expiration time of its entries. The
// expiring cache uses this as a priority queue to efficiently organize entries
// which will be garbage collected once they expire.
// expiringHeap 使用的是最小堆,根据过期时间进行排序
// expiring cache使用最小堆作为一个优先队列,实现快速的过期删除
type expiringHeap []*expiringHeapEntry

var _ heap.Interface = &expiringHeap{}

func (cq expiringHeap) Len() int {
return len(cq)
}

// Less 较小的时间排在前面
func (cq expiringHeap) Less(i, j int) bool {
return cq[i].expiry.Before(cq[j].expiry)
}

func (cq expiringHeap) Swap(i, j int) {
cq[i], cq[j] = cq[j], cq[i]
}

func (cq *expiringHeap) Push(c interface{}) {
*cq = append(*cq, c.(*expiringHeapEntry))
}

func (cq *expiringHeap) Pop() interface{} {
c := (*cq)[cq.Len()-1]
*cq = (*cq)[:cq.Len()-1]
return c
}

REF:

  1. expiring.go