kube-controller之endpoint

Kubernetes 中,一个 Endpoint 代表一个网络地址(IP地址和端口,是实现实际服务的端点的集合,通常用于将服务的客户端请求路由到后端 Pod 的 IP 地址和端口上。

Endpoint 可以手动创建,也可以由 Kubernetesservice 控制器自动创建和更新。当创建一个 Service 对象时,Kubernetes 会创建一个关联的 Endpoint 对象,自动添加 PodIP 地址和端口到 Endpoint 中。

当客户端通过 Service 访问后端 Pod 时,Kubernetes 会自动将客户端请求路由到后端 PodIP 地址和端口上。这种路由方式是通过 iptables 实现的,对于一个 Service 的每个端口,Kubernetes 会自动创建一条 iptables 规则将该端口上的请求转发到对应的 Endpoint 地址和端口上。

官方目前推荐使用endpointslice, 创建一个service会自动创建一个endpointendpointSlice.发现如果删除掉endpoint服务也能正常访问。

可以使用下面的yaml文件创建Service观察对应的endpoint.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
cat > ep.yaml <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
name: ep
spec:
replicas: 1
selector:
matchLabels:
app: ep
template:
metadata:
labels:
app: ep
spec:
containers:
- name: ep
image: hysyeah/my-curl:v1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 80

---
apiVersion: v1
kind: Service
metadata:
name: ep
spec:
selector:
app: ep
ports:
- name: http
port: 8080
targetPort: 8080
type: NodePort
EOF

1
2
3
4
5
I0503 15:23:44.009583  131717 round_trippers.go:463] POST apis/apps/v1/namespaces/default/deployments?fieldManager=kubectl-create&fieldValidation=Strict

deployment.apps/ep created
I0503 15:23:44.016610 131717 round_trippers.go:463] POST api/v1/namespaces/default/services?fieldManager=kubectl-create&fieldValidation=Strict
service/ep created

调用接口分别创建了deploymentservice,假如关掉kube-controller-manager的话会发现kubectl get ep并不能看到对应的Endpoint,Endpoint是由kube-controller-manager创建的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

// pkg/apis/discovery/types.go
// Endpoint represents a single logical "backend" implementing a service.
type Endpoint struct {
// addresses of this endpoint. The contents of this field are interpreted
// according to the corresponding EndpointSlice addressType field. Consumers
// must handle different types of addresses in the context of their own
// capabilities. This must contain at least one address but no more than
// 100.
// +listType=set
Addresses []string
// conditions contains information about the current status of the endpoint.
Conditions EndpointConditions
// hostname of this endpoint. This field may be used by consumers of
// endpoints to distinguish endpoints from each other (e.g. in DNS names).
// Multiple endpoints which use the same hostname should be considered
// fungible (e.g. multiple A values in DNS). Must pass DNS Label (RFC 1123)
// validation.
// +optional
Hostname *string
// targetRef is a reference to a Kubernetes object that represents this
// endpoint.
// +optional
TargetRef *api.ObjectReference
// deprecatedTopology is deprecated and only retained for round-trip
// compatibility with v1beta1 Topology field. When v1beta1 is removed, this
// should be removed, too.
// +optional
DeprecatedTopology map[string]string
// nodeName represents the name of the Node hosting this endpoint. This can
// be used to determine endpoints local to a Node.
// +optional
NodeName *string
// zone is the name of the Zone this endpoint exists in.
// +optional
Zone *string
// hints contains information associated with how an endpoint should be
// consumed.
// +featureGate=TopologyAwareHints
// +optional
Hints *EndpointHints
}


// 从代码中可以看该Controller通过Informer机制监听了三种资源
// Service
// Pod
// endpoint
// pkg/controller/endpoint/endpoints_controller.go
// NewEndpointController returns a new *Controller.
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})

e := &Controller{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
workerLoopPeriod: time.Second,
}

// onServiceUpdate,onServiceDelete都是执行入队操作
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.onServiceUpdate(cur)
},
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced

// e.onEndpointsDelete将key(default/ep)入队
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.onEndpointsDelete,
})
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder

e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod

return e
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
func (e *Controller) processNextWorkItem(ctx context.Context) bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(eKey)
// 处理逻辑都在syncService中
err := e.syncService(ctx, eKey.(string))
e.handleErr(err, eKey)

return true
}


func (e *Controller) syncService(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
}()

// 通过key获取资源的命名空间和名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 通过endpoint名称找到对应的service,这里endpoint与service应该是一一对应的
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}

// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
// 当kube-controller-manager处于宕机状态时删除Service会导致Endpoint可能会一直存在
err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
e.triggerTimeTracker.DeleteService(namespace, name)
return nil
}

if service.Spec.Type == v1.ServiceTypeExternalName {
// services with Type ExternalName receive no endpoints from this controller;
// Ref: https://issues.k8s.io/105986
return nil
}

if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return nil
}

klog.V(5).Infof("About to update endpoints for service %q", key)
// 通过Selector找到对应的Pod
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
return err
}

// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int

for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}

ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
continue
}

epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}

// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)

var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
subsets = endpoints.RepackSubsets(subsets)

// See if there's actually an update here.
// 通过命名空间和资源名称找对应的Endpoint
// 如果Endpoint存在则是一个更新操作
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
}

// 表示Endpoint不存在,需要创建新的EndPoint
// 当删除Endpoint后createEndpoints = true
// 当创建Service后(Endpoint创建之前)createEndpoints = true
createEndpoints := len(currentEndpoints.ResourceVersion) == 0

// Compare the sorted subsets and labels
// Remove the HeadlessService label from the endpoints if it exists,
// as this won't be set on the service itself
// and will cause a false negative in this diff check.
// But first check if it has that label to avoid expensive copies.
compareLabels := currentEndpoints.Labels
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
// 不必更新Endpoint
if !createEndpoints &&
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
// Endpoint发生变化,更新对应的字段
newEndpoints := currentEndpoints.DeepCopy()
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}

if !endpointsLastChangeTriggerTime.IsZero() {
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
} else { // No new trigger time, clear the annotation.
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
// subset的最大长度为1000,所以理论上一个service最多支持1000个pod
// 大于1000则会发生截断
if truncateEndpoints(newEndpoints) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}

if newEndpoints.Labels == nil {
newEndpoints.Labels = make(map[string]string)
}

if !helper.IsServiceIPSet(service) {
newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
} else {
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
}

klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
// A request is forbidden primarily for two reasons:
// 1. namespace is terminating, endpoint creation is not allowed by default.
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)

// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}

if createEndpoints {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
} else {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
}

return err
}
return nil
}

REF:
1.pkg/controller/endpoint/endpoints_controller.go