kube-controller之hpa

HPA

HPA全称HorizontalPodAutoscaler,pod水平自动扩缩。在 Kubernetes 中,HorizontalPodAutoscaler 自动更新工作负载资源 (例如 Deployment 或者 StatefulSet), 目的是自动扩缩工作负载以满足需求。

水平扩缩意味着对增加的负载的响应是部署更多的Pod。 这与 “垂直(Vertical)” 扩缩不同,对于 Kubernetes, 垂直扩缩意味着将更多资源(例如:内存或 CPU)分配给已经为工作负载运行的 Pod

如果负载减少,并且 Pod 的数量高于配置的最小值, HorizontalPodAutoscaler 会指示工作负载资源(DeploymentStatefulSet 或其他类似资源)缩减。

水平 Pod 自动扩缩不适用于无法扩缩的对象(例如:DaemonSet。)

HorizontalPodAutoscaler 被实现为 Kubernetes API 资源和控制器。

Kubernetes 控制平面内运行的水平 Pod 自动扩缩控制器会定期调整其目标(例如:Deployment)的所需规模,以匹配观察到的指标, 例如,平均 CPU 利用率、平均内存利用率或你指定的任何其他自定义指标。

HorizontalPodAutoscalerMetrics Server中获得对应的资源指标,然后根据这些指标计算扩缩容比例。
HorizontalPodAutoscaler 控制器访问支持扩缩的相应工作负载资源(例如:Deployment,StatefulSet,ReplicaSet) 这些资源每个都有一个名为 scale 的子资源,该接口允许你动态设置副本的数量。

算法细节

Pod 水平自动扩缩控制器根据当前指标和期望指标来计算扩缩比例。

1
期望副本数 = ceil[当前副本数 * (当前指标 / 期望指标)]

例如,如果当前指标值为 200m,而期望值为 100m,则副本数将加倍, 因为 200.0 / 100.0 == 2.0 如果当前值为 50m,则副本数将减半, 因为 50.0 / 100.0 == 0.5。如果比率足够接近 1.0(在全局可配置的容差范围内,默认为 0.1), 则控制平面会跳过扩缩操作。

详细信息可查看文档

源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
// pkg/controller/podautoscaler/horizontal.go
// HorizontalController, 可以看到HorizontalController
// 包含了hpaLister,podLister,
type HorizontalController struct {
scaleNamespacer scaleclient.ScalesGetter
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter
mapper apimeta.RESTMapper

replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder

downscaleStabilisationWindow time.Duration

monitor monitor.Monitor

// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
// NewHorizontalController.
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
hpaListerSynced cache.InformerSynced

// podLister is able to list/get Pods from the shared cache from the informer passed in to
// NewHorizontalController.
podLister corelisters.PodLister
podListerSynced cache.InformerSynced

// Controllers that need to be synced
queue workqueue.RateLimitingInterface

// Latest unstabilized recommendations for each autoscaler.
recommendations map[string][]timestampedRecommendation
recommendationsLock sync.Mutex

// Latest autoscaler events
scaleUpEvents map[string][]timestampedScaleEvent
scaleUpEventsLock sync.RWMutex
scaleDownEvents map[string][]timestampedScaleEvent
scaleDownEventsLock sync.RWMutex

// Storage of HPAs and their selectors.
hpaSelectors *selectors.BiMultimap
hpaSelectorsMux sync.Mutex

// feature gates
containerResourceMetricsEnabled bool
}

// 创建HorizontalController
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper,
metricsClient metricsclient.MetricsClient,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,
containerResourceMetricsEnabled bool,
) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})

hpaController := &HorizontalController{
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
downscaleStabilisationWindow: downscaleStabilisationWindow,
monitor: monitor.New(),
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
recommendationsLock: sync.Mutex{},
scaleUpEvents: map[string][]timestampedScaleEvent{},
scaleUpEventsLock: sync.RWMutex{},
scaleDownEvents: map[string][]timestampedScaleEvent{},
scaleDownEventsLock: sync.RWMutex{},
hpaSelectors: selectors.NewBiMultimap(),
containerResourceMetricsEnabled: containerResourceMetricsEnabled,
}

// 监听了资源hpa变化,并进行入队操作
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: hpaController.enqueueHPA,
UpdateFunc: hpaController.updateHPA,
DeleteFunc: hpaController.deleteHPA,
},
resyncPeriod,
)
hpaController.hpaLister = hpaInformer.Lister()
hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced

hpaController.podLister = podInformer.Lister()
hpaController.podListerSynced = podInformer.Informer().HasSynced

// 用于计算副本数,ReplicaCalculator实现了几个方法用于副本的计算
replicaCalc := NewReplicaCalculator(
metricsClient,
hpaController.podLister,
tolerance,
cpuInitializationPeriod,
delayOfInitialReadinessStatus,
)
hpaController.replicaCalc = replicaCalc

monitor.Register()

return hpaController
}


func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
key, quit := a.queue.Get()
if quit {
return false
}
defer a.queue.Done(key)

deleted, err := a.reconcileKey(ctx, key.(string))
if err != nil {
utilruntime.HandleError(err)
}
// Add request processing HPA to queue with resyncPeriod delay.
// Requests are always added to queue with resyncPeriod delay. If there's already request
// for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
// in queue so HPAs are processed every resyncPeriod.
// Request is added here just in case last resync didn't insert request into the queue. This
// happens quite often because there is race condition between adding request after resyncPeriod
// and removing them from queue. Request can be added by resync before previous request is
// removed from queue. If we didn't add request here then in this case one request would be dropped
// and HPA would processed after 2 x resyncPeriod.
if !deleted {
a.queue.AddRateLimited(key)
}

return true
}

func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return true, err
}

logger := klog.FromContext(ctx)

// 获取hpa
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if k8serrors.IsNotFound(err) {
logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))

a.recommendationsLock.Lock()
delete(a.recommendations, key)
a.recommendationsLock.Unlock()

a.scaleUpEventsLock.Lock()
delete(a.scaleUpEvents, key)
a.scaleUpEventsLock.Unlock()

a.scaleDownEventsLock.Lock()
delete(a.scaleDownEvents, key)
a.scaleDownEventsLock.Unlock()

return true, nil
}
if err != nil {
return false, err
}

return false, a.reconcileAutoscaler(ctx, hpa, key)
}
// 真正进行调谐的函数
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
// actionLabel is used to report which actions this reconciliation has taken.
actionLabel := monitor.ActionLabelNone
start := time.Now()
defer func() {
errorLabel := monitor.ErrorLabelNone
if retErr != nil {
// In case of error, set "internal" as default.
errorLabel = monitor.ErrorLabelInternal
}
if errors.Is(retErr, errSpec) {
errorLabel = monitor.ErrorLabelSpec
}

a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
}()

// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()

reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
}

targetGK := schema.GroupKind{
Group: targetGV.Group,
Kind: hpa.Spec.ScaleTargetRef.Kind,
}
// RESTMappings 返回提供的组和资源类型的所有资源映射
mappings, err := a.mapper.RESTMappings(targetGK)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
}

// 获取hpa引用的资源的子资源scale
scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
// 当前复本数
currentReplicas := scale.Spec.Replicas
a.recordInitialRecommendation(currentReplicas, key)

var (
metricStatuses []autoscalingv2.MetricStatus
metricDesiredReplicas int32
metricName string
)

// 期望的副本数
desiredReplicas := int32(0)
rescaleReason := ""

var minReplicas int32

// 设置MinReplicas
if hpa.Spec.MinReplicas != nil {
minReplicas = *hpa.Spec.MinReplicas
} else {
// Default value
minReplicas = 1
}

rescale := true
logger := klog.FromContext(ctx)

// scale的副本数为0和minReplicas!=0,不进行Autoscaling(为什么还需要minReplicas这个条件)
if scale.Spec.Replicas == 0 && minReplicas != 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
// 不进行扩缩容操作
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
} else if currentReplicas > hpa.Spec.MaxReplicas {
// 如果当前副本数大于hpa MaxReplicas, 则期望副本数等于hpa.Spec.MaxReplicas
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
} else if currentReplicas < minReplicas {
// 如果当前副本数小于hpa MinReplicas, 则期望副本数等于hpa.Spec.MinReplicas
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = minReplicas
} else {
var metricTimestamp time.Time
// 根据资源指标计算metricDesiredReplicas
// hpa的算法就在computeReplicasForMetrics里
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
// computeReplicasForMetrics may return both non-zero metricDesiredReplicas and an error.
// That means some metrics still work and HPA should perform scaling based on them.
if err != nil && metricDesiredReplicas == -1 {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
if err != nil {
// We proceed to scaling, but return this error from reconcileAutoscaler() finally.
retErr = err
}

logger.V(4).Info("Proposing desired replicas",
"desiredReplicas", metricDesiredReplicas,
"metric", metricName,
"timestamp", metricTimestamp,
"scaleTarget", reference)

rescaleMetric := ""
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
}
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
if hpa.Spec.Behavior == nil {
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
} else {
desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
}
rescale = desiredReplicas != currentReplicas
}

if rescale {
// 设置scale.Spec.Replicas
scale.Spec.Replicas = desiredReplicas
// 调用接口更新scale
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to rescale %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
logger.Info("Successfully rescaled",
"HPA", klog.KObj(hpa),
"currentReplicas", currentReplicas,
"desiredReplicas", desiredReplicas,
"reason", rescaleReason)

if desiredReplicas > currentReplicas {
actionLabel = monitor.ActionLabelScaleUp
} else {
actionLabel = monitor.ActionLabelScaleDown
}
} else {
logger.V(4).Info("Decided not to scale",
"scaleTarget", reference,
"desiredReplicas", desiredReplicas,
"lastScaleTime", hpa.Status.LastScaleTime)
desiredReplicas = currentReplicas
}

a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)

err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
if err != nil {
// we can overwrite retErr in this case because it's an internal error.
return err
}

return retErr
}

// 根据HPA中的metric spec计算期望副本数
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {

selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
if err != nil {
return -1, "", nil, time.Time{}, err
}

specReplicas := scale.Spec.Replicas
statusReplicas := scale.Status.Replicas
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))

invalidMetricsCount := 0
var invalidMetricError error
var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition

// 遍历metricSpecs,分别计算期望副本数,取最大值
for i, metricSpec := range metricSpecs {
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])

if err != nil {
if invalidMetricsCount <= 0 {
invalidMetricCondition = condition
invalidMetricError = err
}
invalidMetricsCount++
continue
}
if replicas == 0 || replicaCountProposal > replicas {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
}
}

if invalidMetricError != nil {
invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
}

// If all metrics are invalid or some are invalid and we would scale down,
// return an error and set the condition of the hpa based on the first invalid metric.
// Otherwise set the condition as scaling active as we're going to scale
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
return -1, "", statuses, time.Time{}, invalidMetricError
}
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)

return replicas, metric, statuses, timestamp, invalidMetricError
}


func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
// actionLabel is used to report which actions this reconciliation has taken.
start := time.Now()
defer func() {
actionLabel := monitor.ActionLabelNone
switch {
case replicaCountProposal > hpa.Status.CurrentReplicas:
actionLabel = monitor.ActionLabelScaleUp
case replicaCountProposal < hpa.Status.CurrentReplicas:
actionLabel = monitor.ActionLabelScaleDown
}

errorLabel := monitor.ErrorLabelNone
if err != nil {
// In case of error, set "internal" as default.
errorLabel = monitor.ErrorLabelInternal
actionLabel = monitor.ActionLabelNone
}
if errors.Is(err, errSpec) {
errorLabel = monitor.ErrorLabelSpec
}

a.monitor.ObserveMetricComputationResult(actionLabel, errorLabel, time.Since(start), spec.Type)
}()

// 根据不同的类型分别计算期望副本数
switch spec.Type {
case autoscalingv2.ObjectMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
case autoscalingv2.PodsMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
}
case autoscalingv2.ContainerResourceMetricSourceType:
if !a.containerResourceMetricsEnabled {
// If the container resource metrics feature is disabled but the object has the one,
// that means the user enabled the feature once,
// created some HPAs with the container resource metrics, and disabled it finally.
return 0, "", time.Time{}, condition, fmt.Errorf("ContainerResource metric type is not supported: disabled by the feature gate")
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
}
case autoscalingv2.ExternalMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Metric类型
// staging/src/k8s.io/api/autoscaling/v2/types.go
const (
// ObjectMetricSourceType is a metric describing a kubernetes object
// (for example, hits-per-second on an Ingress object).
ObjectMetricSourceType MetricSourceType = "Object"
// PodsMetricSourceType is a metric describing each pod in the current scale
// target (for example, transactions-processed-per-second). The values
// will be averaged together before being compared to the target value.
PodsMetricSourceType MetricSourceType = "Pods"
// ResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
ResourceMetricSourceType MetricSourceType = "Resource"
// ContainerResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing a single container in each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
ContainerResourceMetricSourceType MetricSourceType = "ContainerResource"
// ExternalMetricSourceType is a global metric that is not associated
// with any Kubernetes object. It allows autoscaling based on information
// coming from components running outside of cluster
// (for example length of queue in cloud messaging service, or
// QPS from loadbalancer running outside of cluster).
ExternalMetricSourceType MetricSourceType = "External"
)

REF:
1.horizontal-pod-autoscale/
2.编写自定义指标适配器
3.staging/src/k8s.io/api/autoscaling/v2/types.go
4.pkg/controller/podautoscaler/horizontal.go