workqueue
提供了如下特性1
2
3
4- 公平: 队列中的元素以先进先出的方式进行处理
- Stingy(翻译为:吝啬的): 在高并发情况下和一元素在处理之前添加了多次的情况下,队列中的元素只会被处理一次
- 并发性:多消费者和生产者.支持元素被正在处理的情况下重新入队
- 通知机制:ShutDown方法通过信号量通知队列不再接收新的元素
Kubernetes 中使用 WorkQueue 的好处主要有以下几点:
降低并发冲突:WorkQueue 可以确保每个任务都被串行处理,这意味着每个任务将独立地执行,而不会受到其他任务的干扰,从而降低并发冲突的可能性。
控制任务执行速率:使用 WorkQueue 可以限制并控制任务的执行速率,这对于资源敏感的应用程序和需要限制负载的场景非常有用。
实现重试逻辑:WorkQueue 支持任务重试,当一个任务失败时,它可以将任务重新排队,以便在稍后的时间再次尝试执行该任务,从而实现重试逻辑。
消除重复工作:使用 WorkQueue 可以有效地避免重复处理相同的任务,以提高应用程序的性能和效率。
Queue
1 | // vendor/k8s.io/client-go/util/workqueue/queue.go |
1 | type empty struct{} |
1 | // 入队操作 |
通过一个例子学习workqueue
的工作原理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62import (
"fmt"
"k8s.io/client-go/util/workqueue"
"sync"
)
func main() {
var wg sync.WaitGroup
down := make(chan struct{})
q := workqueue.New()
q.Add(1)
q.Add(2)
q.Add(3)
// 支持Stingy的特性的微妙之处就在于这,如果元素已经存在于队列中且不存在于processing set中,将不会入队.
q.Add(1)
// 执行完4次Add操作后,队列中的元素内容如下
// queue --> [1, 2, 3]
// dirty --> {1, 2, 3}
// processing --> {} // queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}
item,_ := q.Get()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("processing",item)
<-down
// 执行q.Done(item)之后的状态
// queue --> [2, 3, 1]
// dirty --> {1, 2, 3}
// processing --> {}
// 此时元素1重新入队
q.Done(item)
fmt.Println("processed",item)
}()
// 在q.Add(1)执行之前此时状态为
// queue --> [2, 3]
// dirty --> {2, 3}
// processing --> {1}
q.Add(1)
// 在执行q.Add(1)之后此时状态为
// queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}
// queue --> [2, 3]
// dirty --> {1, 2, 3}
// processing --> {1}
down<- struct{}{}
wg.Wait()
fmt.Println()
}
小结:
Stingy这一特性在高并发情况下是通过互斥锁来保证元素只会被处理一次;而对于同一元素多次入队的情况,如果该元素还未被处理,则会直接丢弃。
DelayingQueue
顺便看k8s中的延迟队列,延迟队列基于FIFO队列实现,在原有的基础上添加了AddAfter方法.在延迟一段时间后将元素插入到队列中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187// vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
// delayingType 提供延迟入队操作
type delayingType struct {
// 嵌套普通队列queue
Interface
// clock tracks time for delayed firing
clock clock.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
stopOnce sync.Once
// heartbeat ensures we wait no more than maxWait before firing
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan *waitFor
// metrics counts the number of retries
metrics retryMetrics
}
// newDelayingQueue 真正实例化延迟队列的代码
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
// 运行一个循环
go ret.waitingLoop()
return ret
}
// waitingLoop 不断的检查waitForPriorityQueue的元素
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// 队里没有元素时等待
never := make(<-chan time.Time)
// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer
// 构造一个优先级队列
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
// 用于处理重复添加逻辑
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
// 如果元素中的readyAt 在now之后,则退出
if entry.readyAt.After(now) {
break
}
// 从优先级队列中取出顶部元素,添加到workqueue中
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
// 返回一个定时器对象,时间间隔是:entry.readyAt-now
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
// 猜测这样做是不用一直占用cpu资源,在优先级最高的元素入队的时间未到之前可以挂起
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
//
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// 插入元素到优先级队列,如果元素已存在则更新其readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}做
return
}
// 插入优先级队列
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
// 用于记录入队的元素以及需要被入队的时间
type waitFor struct {
data t
readyAt time.Time
// 优先级队列(堆)中的索引
index int
}
// waitForPriorityQueue 为一个优先级队列,实现了heap.Interface中的方法
// 值readyAt最小的元素是顶点(index 0).
// Peek返回索引0中的元素
// Pop 将元素从队列中移除
// Push将元素插入到队列中
type waitForPriorityQueue []*waitFor
// AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
// 将元素添加到waitingForAddCh 中,此channel中的数据将在waitingLoop方法中被处理
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
1 | // waitForPriorityQueue实现了heap.Interface接口,最小堆 |
RateLimitingQueue
1 | // vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go |
REF:
1.https://pkg.go.dev/k8s.io/client-go/util/workqueue
2.Kubernetes源码剖析