Kubernetes EndpointSlice
是在Kubernetes 1.16
版本中引入的一种资源,它提供了一种更有效的方式来表示Kubernetes
服务的网络端点。
EndpointSlice
为现有的Endpoints
资源提供了一种替代方案,对于具有大量网络端点的非常大型服务,Endpoints
资源可能会成为性能瓶颈。 EndpointSlice
将服务的网络端点分成更小、更易管理的块,每个块由单独的EndpointSlice
资源表示。
每个EndpointSlice
包含有关服务的一部分端点的信息,包括它们的IP地址、端口和附加的元数据,例如拓扑信息。这使得Kubernetes
更容易有效地管理和更新服务的网络端点。
EndpointSlice
还支持其他功能,例如支持多协议服务和支持加权负载平衡,这些功能在原始的Endpoints
资源中不可用。
与 Endpoints
的比较
原来的 Endpoints API
提供了在 Kubernetes
中跟踪网络端点的一种简单而直接的方法。随着 Kubernetes
集群和服务逐渐开始为更多的后端 Pod
处理和发送请求, 原来的 API
的局限性变得越来越明显。最明显的是那些因为要处理大量网络端点而带来的挑战。
由于任一 Service
的所有网络端点都保存在同一个 Endpoints
对象中,这些 Endpoints
对象可能变得非常巨大。对于保持稳定的服务(长时间使用同一组端点),影响不太明显; 即便如此,Kubernetes
的一些使用场景也没有得到很好的服务。
当某 Service
存在很多后端端点并且该工作负载频繁扩缩或上线新更改时,对该 Service
的单个 Endpoints
对象的每次更新都意味着(在控制平面内以及在节点和 API 服务器之间)Kubernetes
集群组件之间会出现大量流量。 这种额外的流量在 CPU
使用方面也有开销。
使用 EndpointSlices
时,添加或移除单个 Pod
对于正监视变更的客户端会触发相同数量的更新, 但这些更新消息的大小在大规模场景下要小得多。
EndpointSlices
还支持围绕双栈网络和拓扑感知路由等新功能的创新。
可以使用下面的yaml
文件创建Service
观察对应的EndpointSlice
.
1 | cat > ep.yaml <<EOF |
1 | ➜ kubernetes git:(master) k get svc --show-labels |
1 | // staging/src/k8s.io/api/discovery/v1/types.go |
syncService
是循环处理函数,从workqueue
中取key
然后进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
// pkg/controller/endpointslice/endpointslice_controller.go
func (c *Controller) syncService(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoint slices. (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := c.serviceLister.Services(namespace).Get(name)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
c.triggerTimeTracker.DeleteService(namespace, name)
c.reconciler.deleteService(namespace, name)
c.endpointSliceTracker.DeleteService(namespace, name)
// The service has been deleted, return nil so that it won't be retried.
return nil
}
if service.Spec.Type == v1.ServiceTypeExternalName {
// services with Type ExternalName receive no endpoints from this controller;
// Ref: https://issues.k8s.io/105986
return nil
}
if service.Spec.Selector == nil {
// services without a selector receive no endpoint slices from this controller;
// these services will receive endpoint slices that are created out-of-band via the REST API.
return nil
}
klog.V(5).Infof("About to update endpoint slices for service %q", key)
podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
if err != nil {
// Since we're getting stuff from a local cache, it is basically
// impossible to get this error.
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
"Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}
esLabelSelector := labels.Set(map[string]string{
discovery.LabelServiceName: service.Name,
discovery.LabelManagedBy: controllerName,
}).AsSelectorPreValidated()
endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
if err != nil {
// Since we're getting stuff from a local cache, it is basically
// impossible to get this error.
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
"Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}
// Drop EndpointSlices that have been marked for deletion to prevent the controller from getting stuck.
endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)
if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
return endpointsliceutil.NewStaleInformerCache("EndpointSlice informer cache is out of date")
}
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the EndpointSlice objects.
lastChangeTriggerTime := c.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
err = c.reconciler.reconcile(service, pods, endpointSlices, lastChangeTriggerTime)
if err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}
return nil
}
// reconcile takes a set of pods currently matching a service selector and
// compares them with the endpoints already present in any existing endpoint
// slices for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
// pkg/controller/endpointslice/reconciler.go
// 参数为service及匹配service selector的pods,已经存在的endpointslice
// 作用:创建,更新或删除endpointslice,确保endpointslice能匹配所有预期的pod
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
slicesToDelete := []*discovery.EndpointSlice{} // slices that are no longer matching any address the service has
errs := []error{} // all errors generated in the process of reconciling
slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type
// addresses that this service supports [o(1) find]
// 获得server支持的AddressesTypes,serviceSupportedAddressesTypes是一个map实现的set
// * IPv4: Represents an IPv4 Address.
// * IPv6: Represents an IPv6 Address.
// * FQDN: Represents a Fully Qualified Domain Name.
serviceSupportedAddressesTypes := getAddressTypesForService(service)
// loop through slices identifying their address type.
// slices that no longer match address type supported by services
// go to delete, other slices goes to the reconciler machinery
// for further adjustment
for _, existingSlice := range existingSlices {
// service no longer supports that address type, add it to deleted slices
if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok {
if r.topologyCache != nil {
svcKey, err := serviceControllerKey(existingSlice)
if err != nil {
klog.Warningf("Couldn't get key to remove EndpointSlice from topology cache %+v: %v", existingSlice, err)
} else {
r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
}
}
// 如果是这个Service不支持的AddressType则加入删除列表
slicesToDelete = append(slicesToDelete, existingSlice)
continue
}
// add list if it is not on our map
if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
}
slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
}
// reconcile for existing.
for addressType := range serviceSupportedAddressesTypes {
existingSlices := slicesByAddressType[addressType]
// 对不同的AddressType进行调谐
err := r.reconcileByAddressType(service, pods, existingSlices, triggerTime, addressType)
if err != nil {
errs = append(errs, err)
}
}
// delete those which are of addressType that is no longer supported
// by the service
for _, sliceToDelete := range slicesToDelete {
err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting %s EndpointSlice for Service %s/%s: %w", sliceToDelete.Name, service.Namespace, service.Name, err))
} else {
r.endpointSliceTracker.ExpectDeletion(sliceToDelete)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}
return utilerrors.NewAggregate(errs)
}
// reconcileByAddressType takes a set of pods currently matching a service selector and
// compares them with the endpoints already present in any existing endpoint
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
errs := []error{}
slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
slicesToDelete := []*discovery.EndpointSlice{}
events := []*topologycache.EventBuilder{}
// Build data structures for existing state.
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
for _, existingSlice := range existingSlices {
if ownedBy(existingSlice, service) {
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
} else {
slicesToDelete = append(slicesToDelete, existingSlice)
}
}
// Build data structures for desired state.
desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointsliceutil.EndpointSet{}
for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, true) {
continue
}
endpointPorts := getEndpointPorts(service, pod)
epHash := endpointutil.NewPortMapKey(endpointPorts)
if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
}
if _, ok := desiredMetaByPortMap[epHash]; !ok {
desiredMetaByPortMap[epHash] = &endpointMeta{
addressType: addressType,
ports: endpointPorts,
}
}
node, err := r.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
// we are getting the information from the local informer,
// an error different than IsNotFound should not happen
if !errors.IsNotFound(err) {
return err
}
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
// retry later, but also update the EndpointSlice without the problematic Pod.
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
// stuck forever.
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
// Pod, since the user is explicitly indicating that the Pod address should be published.
if !service.Spec.PublishNotReadyAddresses {
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
continue
}
}
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
}
}
spMetrics := metrics.NewServicePortCache()
totalAdded := 0
totalRemoved := 0
// Determine changes necessary for each group of slices by port map.
for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
numEndpoints := len(desiredEndpoints)
pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
totalAdded += added
totalRemoved += removed
spMetrics.Set(portMap, metrics.EfficiencyInfo{
Endpoints: numEndpoints,
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
})
slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
}
// If there are unique sets of ports that are no longer desired, mark
// the corresponding endpoint slices for deletion.
for portMap, existingSlices := range existingSlicesByPortMap {
if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
slicesToDelete = append(slicesToDelete, existingSlices...)
}
}
// When no endpoint slices would usually exist, we need to add a placeholder.
if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
// Check for existing placeholder slice outside of the core control flow
placeholderSlice := newEndpointSlice(service, &endpointMeta{ports: []discovery.EndpointPort{}, addressType: addressType})
if len(slicesToDelete) == 1 && placeholderSliceCompare.DeepEqual(slicesToDelete[0], placeholderSlice) {
// We are about to unnecessarily delete/recreate the placeholder, remove it now.
slicesToDelete = slicesToDelete[:0]
} else {
slicesToCreate = append(slicesToCreate, placeholderSlice)
}
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}
metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
// Topology hints are assigned per address type. This means it is
// theoretically possible for endpoints of one address type to be assigned
// hints while another endpoints of another address type are not.
si := &topologycache.SliceInfo{
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
AddressType: addressType,
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(si)
} else {
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
klog.InfoS("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &topologycache.EventBuilder{
EventType: corev1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType),
})
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
for _, event := range events {
r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message)
}
return utilerrors.NewAggregate(errs)
}
1 | // pkg/controller/endpointslice/endpointslice_controller.go |
为了管理服务的EndpointSlices
分发,Kubernetes
维护了一个名为TopologyCache
的EndpointSlices
缓存。TopologyCache
跟踪服务所有EndpointSlices
的状态,并允许Kubernetes
快速、高效地将流量路由到适当的端点。
TopologyCache
是由服务控制器提供的端点拓扑信息构建的。控制器根据标签和选择器确定服务端点的拓扑,并生成一组基于拓扑信息的EndpointSlices
,并使用新的EndpointSlices
更新TopologyCache
。
REF:
1.https://kubernetes.io/zh-cn/docs/concepts/services-networking/endpoint-slices/