kube-controller之resource-quota

Kubernetes 中,ResourceQuota(资源配额)是一种机制,用于对命名空间中的资源使用进行限制和控制。它可以帮助管理员在 Kubernetes 集群中实施资源管理策略,确保不同的命名空间或用户不会滥用资源或超出预定的限制。

ResourceQuota 控制器是 Kubernetes 中的一部分,它负责监控和执行 ResourceQuota 对象的策略。控制器会定期检查每个命名空间的资源使用情况,并与 ResourceQuota 对象中定义的限制进行比较。如果某个命名空间的资源使用超出了限制,控制器将采取相应的措施来限制该命名空间的资源使用。

通过 ResourceQuota,管理员可以限制每个命名空间的 CPU、内存、存储等资源的使用量。它还可以限制 Pod、服务、配置映射等对象的数量。通过定义适当的 ResourceQuota对象,管理员可以确保资源在集群中得到合理的分配和使用。

Kubernetes 源码中,ResourceQuota 控制器的实现可以在 pkg/controller/resourcequota 目录下找到。该目录中的文件包含了 ResourceQuota 控制器的业务逻辑和处理逻辑。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// pkg/controller/resourcequota/resource_quota_controller.go
// Controller is responsible for tracking quota usage status in the system
type Controller struct {
// Must have authority to list all resources in the system, and update quota status
rqClient corev1client.ResourceQuotasGetter
// A lister/getter of resource quota objects
rqLister corelisters.ResourceQuotaLister
// A list of functions that return true when their caches have synced
informerSyncedFuncs []cache.InformerSynced
// ResourceQuota objects that need to be synchronized
queue workqueue.RateLimitingInterface
// missingUsageQueue holds objects that are missing the initial usage information
missingUsageQueue workqueue.RateLimitingInterface
// To allow injection of syncUsage for testing.
syncHandler func(ctx context.Context, key string) error
// function that controls full recalculation of quota usage
resyncPeriod controller.ResyncPeriodFunc
// knows how to calculate usage
registry quota.Registry
// knows how to monitor all the resources tracked by quota and trigger replenishment
quotaMonitor *QuotaMonitor
// controls the workers that process quotas
// this lock is acquired to control write access to the monitors and ensures that all
// monitors are synced before the controller can process quotas.
workerLock sync.RWMutex
}

// NewController creates a quota controller with specified options
func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) {
// build the resource quota controller
rq := &Controller{
rqClient: options.QuotaClient,
rqLister: options.ResourceQuotaInformer.Lister(),
informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
}
// set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey

logger := klog.FromContext(ctx)

options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rq.addQuota(logger, obj)
},
UpdateFunc: func(old, cur interface{}) {
// 我们只
oldResourceQuota := old.(*v1.ResourceQuota)
curResourceQuota := cur.(*v1.ResourceQuota)
if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
return
}
rq.addQuota(logger, curResourceQuota)
},
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
// 进入sync loop但是不做任何操作,因为controller已经被删除了
DeleteFunc: func(obj interface{}) {
rq.enqueueResourceQuota(logger, obj)
},
},
rq.resyncPeriod(),
)

if options.DiscoveryFunc != nil {
qm := &QuotaMonitor{
informersStarted: options.InformersStarted,
informerFactory: options.InformerFactory,
ignoredResources: options.IgnoredResourcesFunc(),
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
resyncPeriod: options.ReplenishmentResyncPeriod,
replenishmentFunc: rq.replenishQuota,
registry: rq.registry,
updateFilter: options.UpdateFilter,
}

rq.quotaMonitor = qm

// do initial quota monitor setup. If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
// 用于获取可以进行配额限制的资源列表
resources, err := GetQuotableResources(options.DiscoveryFunc)
if discovery.IsGroupDiscoveryFailedError(err) {
utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
} else if err != nil {
return nil, err
}

if err = qm.SyncMonitors(ctx, resources); err != nil {
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
}

// only start quota once all informers synced
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool {
return qm.IsSynced(ctx)
})
}

return rq, nil
}

type ControllerOptions struct {
// Must have authority to list all quotas, and update quota status
QuotaClient corev1client.ResourceQuotasGetter
// Shared informer for resource quotas
ResourceQuotaInformer coreinformers.ResourceQuotaInformer
// Controls full recalculation of quota usage
ResyncPeriod controller.ResyncPeriodFunc
// Maintains evaluators that know how to calculate usage for group resource
Registry quota.Registry
// Discover list of supported resources on the server.
DiscoveryFunc NamespacedResourcesFunc
// A function that returns the list of resources to ignore
IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
// InformersStarted knows if informers were started.
InformersStarted <-chan struct{}
// InformerFactory interfaces with informers.
InformerFactory informerfactory.InformerFactory
// Controls full resync of objects monitored for replenishment.
ReplenishmentResyncPeriod controller.ResyncPeriodFunc
// Filters update events so we only enqueue the ones where we know quota will change
UpdateFilter UpdateFilter
}

// syncHandler
func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
startTime := time.Now()

logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "key", key)

defer func() {
logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
if errors.IsNotFound(err) {
logger.Info("Resource quota has been deleted", "key", key)
return nil
}
if err != nil {
logger.Error(err, "Unable to retrieve resource quota from store", "key", key)
return err
}
return rq.syncResourceQuota(ctx, resourceQuota)
}


// syncResourceQuota runs a complete sync of resource quota status across all known kinds
// 计算资源使用率并向api-serever上报
func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.ResourceQuota) (err error) {
// quota is dirty if any part of spec hard limits differs from the status hard limits
statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)

// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil

used := v1.ResourceList{}
if resourceQuota.Status.Used != nil {
used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
}
hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)

var errs []error

newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
if err != nil {
// if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
errs = append(errs, err)
}
for key, value := range newUsage {
used[key] = value
}

// ensure set of used values match those that have hard constraints
hardResources := quota.ResourceNames(hardLimits)
used = quota.Mask(used, hardResources)

// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
usage := resourceQuota.DeepCopy()
usage.Status = v1.ResourceQuotaStatus{
Hard: hardLimits,
Used: used,
}

dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)

// there was a change observed by this controller that requires we update quota
if dirty {
_, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(ctx, usage, metav1.UpdateOptions{})
if err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}

REF:

1.resource-quotas
2.quota-api-object
3.admission_control_resource_quota
4.pkg/controller/resourcequota/resource_quota_controller.go