k8s-workqueue

workqueue提供了如下特性

1
2
3
4
- 公平: 队列中的元素以先进先出的方式进行处理
- Stingy(翻译为:吝啬的): 在高并发情况下和一元素在处理之前添加了多次的情况下,队列中的元素只会被处理一次
- 并发性:多消费者和生产者.支持元素被正在处理的情况下重新入队
- 通知机制:ShutDown方法通过信号量通知队列不再接收新的元素

Kubernetes 中使用 WorkQueue 的好处主要有以下几点:

  • 降低并发冲突:WorkQueue 可以确保每个任务都被串行处理,这意味着每个任务将独立地执行,而不会受到其他任务的干扰,从而降低并发冲突的可能性。

  • 控制任务执行速率:使用 WorkQueue 可以限制并控制任务的执行速率,这对于资源敏感的应用程序和需要限制负载的场景非常有用。

  • 实现重试逻辑:WorkQueue 支持任务重试,当一个任务失败时,它可以将任务重新排队,以便在稍后的时间再次尝试执行该任务,从而实现重试逻辑。

  • 消除重复工作:使用 WorkQueue 可以有效地避免重复处理相同的任务,以提高应用程序的性能和效率。

Queue
1
2
3
4
5
6
7
8
9
10
11
// vendor/k8s.io/client-go/util/workqueue/queue.go
// 定义了队列常用的方法
type Interface interface {
Add(item interface{}) // 添加一个元素到队列
Len() int // 元素个数
Get() (item interface{}, shutdown bool) // 获取一个元素,shutdown标记队列是否关闭
Done(item interface{}) // 标记一个元素已经处理完
ShutDown() // 关闭队列
ShutDownWithDrain() // 关闭队列,但是等待队列中的元素处理完
ShuttingDown() bool // 队列是否正在关闭
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type empty struct{}
type t interface{}
type set map[t]empty

// 队列的实现
type Type struct {
// 实际存储元素的地方,此处的每个元素应该存在于dirty中而不存在于processing set
queue []t

// 所有需要被处理的item,类型是集合
dirty set

// processing用于标记一个元素是否正在处理.在并发场景下这些元素也可能存在dirty set.
// 当我们处理完这个事件之后会将其移除,然后检查其是否存在于dirty set,如果存在将其入队.
processing set

cond *sync.Cond

shuttingDown bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// 入队操作
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列是处理shuttingDown状态下,直接返回,不再进行入队操作
if q.shuttingDown {
return
}
// 如果dirty set存在这个元素,则不会再次入队,保证元素在处理前只会进行一次入队操作
if q.dirty.has(item) {
return
}

q.metrics.add(item)

// 插入dirty set
q.dirty.insert(item)
// 如果元素存在于processing set中,则返回
if q.processing.has(item) {
return
}

// 将元素添加到队列中
q.queue = append(q.queue, item)
q.cond.Signal() // 通知getter有新元素到来
}

// 返回队列长度
func (q *Type) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}

// Get取出队列头部元素
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列长度为0且处于shuttingDown状态下,阻塞
for len(q.queue) == 0 && !q.shuttingDown {
// 接收到q.cond.Signal()后结束阻塞
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 对队列头部元素进行出队操作
item, q.queue = q.queue[0], q.queue[1:]

q.metrics.get(item)

// 将元素插入processing set 并且将其从dirty set删除
q.processing.insert(item)
q.dirty.delete(item)

return item, false
}

// 标记一个元素已被处理完
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.metrics.done(item)
// 从processing set中移除item
q.processing.delete(item)

// 如果元素在处理过程中又插入了相同的元素,重新将元素入队
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}

// ShutDown 如果队列处于shuttingDown状态下,将不会往队列中添加新的元素
// 当工作的协程将队列中的元素处理完之后,它们将会退出
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}

func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()

return q.shuttingDown
}

通过一个例子学习workqueue的工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import (
"fmt"
"k8s.io/client-go/util/workqueue"
"sync"
)



func main() {
var wg sync.WaitGroup
down := make(chan struct{})
q := workqueue.New()
q.Add(1)
q.Add(2)
q.Add(3)

// 支持Stingy的特性的微妙之处就在于这,如果元素已经存在于队列中且不存在于processing set中,将不会入队.

q.Add(1)

// 执行完4次Add操作后,队列中的元素内容如下
// queue --> [1, 2, 3]
// dirty --> {1, 2, 3}
// processing --> {} // queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}

item,_ := q.Get()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("processing",item)
<-down
// 执行q.Done(item)之后的状态
// queue --> [2, 3, 1]
// dirty --> {1, 2, 3}
// processing --> {}
// 此时元素1重新入队
q.Done(item)

fmt.Println("processed",item)
}()
// 在q.Add(1)执行之前此时状态为
// queue --> [2, 3]
// dirty --> {2, 3}
// processing --> {1}

q.Add(1)

// 在执行q.Add(1)之后此时状态为
// queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}


// queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}
down<- struct{}{}
wg.Wait()
fmt.Println()
}

小结:
Stingy这一特性在高并发情况下是通过互斥锁来保证元素只会被处理一次;而对于同一元素多次入队的情况,如果该元素还未被处理,则会直接丢弃。


DelayingQueue

顺便看k8s中的延迟队列,延迟队列基于FIFO队列实现,在原有的基础上添加了AddAfter方法.在延迟一段时间后将元素插入到队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}

// delayingType 提供延迟入队操作
type delayingType struct {
// 嵌套普通队列queue
Interface

// clock tracks time for delayed firing
clock clock.Clock

// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker

// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
}

// newDelayingQueue 真正实例化延迟队列的代码
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
// 运行一个循环
go ret.waitingLoop()
return ret
}


// waitingLoop 不断的检查waitForPriorityQueue的元素
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()

// 队里没有元素时等待
never := make(<-chan time.Time)

// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer

// 构造一个优先级队列
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)

// 用于处理重复添加逻辑
waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
return
}

now := q.clock.Now()

// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
// 如果元素中的readyAt 在now之后,则退出
if entry.readyAt.After(now) {
break
}

// 从优先级队列中取出顶部元素,添加到workqueue中
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}

// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
// 返回一个定时器对象,时间间隔是:entry.readyAt-now
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
// 猜测这样做是不用一直占用cpu资源,在优先级最高的元素入队的时间未到之前可以挂起
nextReadyAt = nextReadyAtTimer.C()
}

select {
case <-q.stopCh:
return

case <-q.heartbeat.C():
// continue the loop, which will add ready items

case <-nextReadyAt:
// continue the loop, which will add ready items

case waitEntry := <-q.waitingForAddCh:
//
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}

drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}

// 插入元素到优先级队列,如果元素已存在则更新其readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}做

return
}
// 插入优先级队列
heap.Push(q, entry)
knownEntries[entry.data] = entry
}

// 用于记录入队的元素以及需要被入队的时间
type waitFor struct {
data t
readyAt time.Time

// 优先级队列(堆)中的索引
index int
}
// waitForPriorityQueue 为一个优先级队列,实现了heap.Interface中的方法
// 值readyAt最小的元素是顶点(index 0).
// Peek返回索引0中的元素
// Pop 将元素从队列中移除
// Push将元素插入到队列中
type waitForPriorityQueue []*waitFor


// AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}

q.metrics.retry()

// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}

select {
case <-q.stopCh:
// unblock if ShutDown() is called
// 将元素添加到waitingForAddCh 中,此channel中的数据将在waitingLoop方法中被处理
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// waitForPriorityQueue实现了heap.Interface接口,最小堆
// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}

// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}

// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
RateLimitingQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
// 结束重试 
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
// 重新入队次数
NumRequeues(item interface{}) int
}

// 实现RateLimitingInterface的结构体
type rateLimitingType struct {
DelayingInterface

rateLimiter RateLimiter
}

// 入队操作
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

// 核心对象RateLimiter
// vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}

// 这个接口有5个实现
// BucketRateLimiter
// 使用golang.org/x/time/rate.Limiter实现
type BucketRateLimiter struct {
*rate.Limiter
}

var _ RateLimiter = &BucketRateLimiter{}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
}


// ItemExponentialFailureRateLimiter
// 失败次数越多,间隔时间越长
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

baseDelay time.Duration
maxDelay time.Duration
}

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

exp := r.failures[item]
r.failures[item] = r.failures[item] + 1

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}
// ItemFastSlowRateLimiter
// 快慢指的是定义一个阈值,达到阈值之前快速重试,超过了就慢慢重试
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int

maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

r.failures[item] = r.failures[item] + 1

if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}

return r.slowDelay
}

// MaxOfRateLimiter
// 过维护多个限速器列表,然后返回其中限速最严格的一个延时
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}

return ret
}

// WithMaxWaitRateLimiter
// 加入最大延迟属性,如果到了最大延时则返回
type WithMaxWaitRateLimiter struct {
limiter RateLimiter
maxDelay time.Duration
}

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay
}

return delay
}

REF:
1.https://pkg.go.dev/k8s.io/client-go/util/workqueue
2.Kubernetes源码剖析