k8s源码设计模式之State

State也就是状态模式是一种行为设计模式, 让你能在一个对象的内部状态变化时改变其行为, 使其看上去就像改变了自身所属的类一样。

Kubernetes 中,DeploymentController 作为控制器之一,主要负责维护 Deployment 对象的状态,即根据用户定义的 Deployment 规范,创建、更新、删除 ReplicaSetPod 等资源,确保应用的正确运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// pkg/apis/apps/types.go
// DeploymentConditionType有三种类型,Available,Progressing,ReplicaFailure.
const (
// Available means the deployment is available, ie. at least the minimum available
// replicas required are up and running for at least minReadySeconds.
DeploymentAvailable DeploymentConditionType = "Available"
// Progressing means the deployment is progressing. Progress for a deployment is
// considered when a new replica set is created or adopted, and when new pods scale
// up or old pods scale down. Progress is not estimated for paused deployments or
// when progressDeadlineSeconds is not specified.
DeploymentProgressing DeploymentConditionType = "Progressing"
// ReplicaFailure is added in a deployment when one of its pods fails to be created
// or deleted.
DeploymentReplicaFailure DeploymentConditionType = "ReplicaFailure"
)
// DeploymentCondition 记录了Deployment的状态
// DeploymentCondition describes the state of a deployment at a certain point.
type DeploymentCondition struct {
// Type of deployment condition.
Type DeploymentConditionType
// Status of the condition, one of True, False, Unknown.
Status api.ConditionStatus
// The last time this condition was updated.
LastUpdateTime metav1.Time
// Last time the condition transitioned from one status to another.
LastTransitionTime metav1.Time
// The reason for the condition's last transition.
Reason string
// A human readable message indicating details about the transition.
Message string
}

// DeploymentStatus holds information about the observed status of a deployment.
type DeploymentStatus struct {
// The generation observed by the deployment controller.
// +optional
ObservedGeneration int64

// Total number of non-terminated pods targeted by this deployment (their labels match the selector).
// +optional
Replicas int32

// Total number of non-terminated pods targeted by this deployment that have the desired template spec.
// +optional
UpdatedReplicas int32

// Total number of ready pods targeted by this deployment.
// +optional
ReadyReplicas int32

// Total number of available pods (ready for at least minReadySeconds) targeted by this deployment.
// +optional
AvailableReplicas int32

// Total number of unavailable pods targeted by this deployment. This is the total number of
// pods that are still required for the deployment to have 100% available capacity. They may
// either be pods that are running but not yet available or pods that still have not been created.
// +optional
UnavailableReplicas int32

// Represents the latest available observations of a deployment's current state.
Conditions []DeploymentCondition

// Count of hash collisions for the Deployment. The Deployment controller uses this
// field as a collision avoidance mechanism when it needs to create the name for the
// newest ReplicaSet.
// +optional
CollisionCount *int32
}

下面这段代码是 DeploymentController 在检测 Deployment 的部署状态时,使用状态模式进行状态转换的一个例子。该控制器使用 DeploymentCondition 对象来跟踪 Deployment 的状态。在这里,根据 Deployment 是否已经完成(DeploymentComplete)、是否在进行中(DeploymentProgressing)以及是否已经超时(DeploymentTimedOut)等三种状态,来执行不同的操作。

首先,如果 Deployment 已经完成,则会创建一个新的 DeploymentCondition 对象,将其状态设置为 True,原因设置为 NewReplicaSetAvailable,具体是哪个取决于新创建的 ReplicaSet 是否存在。然后,使用 SetDeploymentCondition 方法将其设置为Deployment 的新状态。

其次,如果 Deployment 正在进行中,则会创建一个的 DeploymentCondition 对象。如果currentCond不为nil且该对象的状态为 True,则将其上次转换时间 condition.LastTransitionTime设置为currentCond.LastTransitionTime。然后使用 SetDeploymentCondition 方法更新。

最后,如果 Deployment 已经超时,则会创建一个新的 DeploymentCondition 对象,将其状态设置为 False,原因设置为 TimedOutReason,并使用 SetDeploymentCondition 方法将其设置为 Deployment 的新状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// pkg/controller/deployment/progress.go
func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)

// If there is no progressDeadlineSeconds set, remove any Progressing condition.
if !util.HasProgressDeadline(d) {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}

// If there is only one replica set that is active then that means we are not running
// a new rollout and this is a resync where we don't need to estimate any progress.
// In such a case, we should simply not estimate any progress for this deployment.
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
// Check for progress only if there is a progress deadline set and the latest rollout
// hasn't completed yet.
if util.HasProgressDeadline(d) && !isCompleteDeployment {
switch {
case util.DeploymentComplete(d, &newStatus):
// Update the deployment conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)

case util.DeploymentProgressing(d, &newStatus):
// If there is any progress made, continue by not checking if the deployment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetDeploymentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == v1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
util.SetDeploymentCondition(&newStatus, *condition)

case util.DeploymentTimedOut(ctx, d, &newStatus):
// Update the deployment with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
}
}

// Move failure conditions of all replica sets in deployment conditions. For now,
// only one failure condition is returned from getReplicaFailures.
if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
// There will be only one ReplicaFailure condition on the replica set.
util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
} else {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
}

// Do not update if there is nothing new to add.
if reflect.DeepEqual(d.Status, newStatus) {
// Requeue the deployment if required.
dc.requeueStuckDeployment(ctx, d, newStatus)
return nil
}

newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
return err
}

这种方式可以将状态转换的逻辑从主控制器中分离出来,这样做可以使代码更加清晰、易于维护。


REF:
1.pkg/controller/deployment/progress.go
2.hpkg/apis/apps/types.go