kube-controller之endpointslice

Kubernetes EndpointSlice是在Kubernetes 1.16版本中引入的一种资源,它提供了一种更有效的方式来表示Kubernetes服务的网络端点。

EndpointSlice为现有的Endpoints资源提供了一种替代方案,对于具有大量网络端点的非常大型服务,Endpoints资源可能会成为性能瓶颈。 EndpointSlice将服务的网络端点分成更小、更易管理的块,每个块由单独的EndpointSlice资源表示。

每个EndpointSlice包含有关服务的一部分端点的信息,包括它们的IP地址、端口和附加的元数据,例如拓扑信息。这使得Kubernetes更容易有效地管理和更新服务的网络端点。

EndpointSlice还支持其他功能,例如支持多协议服务和支持加权负载平衡,这些功能在原始的Endpoints资源中不可用。

Endpoints 的比较
原来的 Endpoints API 提供了在 Kubernetes 中跟踪网络端点的一种简单而直接的方法。随着 Kubernetes 集群和服务逐渐开始为更多的后端 Pod 处理和发送请求, 原来的 API 的局限性变得越来越明显。最明显的是那些因为要处理大量网络端点而带来的挑战。

由于任一 Service 的所有网络端点都保存在同一个 Endpoints 对象中,这些 Endpoints 对象可能变得非常巨大。对于保持稳定的服务(长时间使用同一组端点),影响不太明显; 即便如此,Kubernetes 的一些使用场景也没有得到很好的服务。

当某 Service存在很多后端端点并且该工作负载频繁扩缩或上线新更改时,对该 Service 的单个 Endpoints 对象的每次更新都意味着(在控制平面内以及在节点和 API 服务器之间)Kubernetes 集群组件之间会出现大量流量。 这种额外的流量在 CPU 使用方面也有开销。

使用 EndpointSlices 时,添加或移除单个 Pod 对于正监视变更的客户端会触发相同数量的更新, 但这些更新消息的大小在大规模场景下要小得多。

EndpointSlices 还支持围绕双栈网络和拓扑感知路由等新功能的创新。


可以使用下面的yaml文件创建Service观察对应的EndpointSlice.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
cat > ep.yaml <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
name: ep
spec:
replicas: 1
selector:
matchLabels:
app: ep
template:
metadata:
labels:
app: ep
spec:
containers:
- name: ep
image: hysyeah/my-curl:v1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 80

---
apiVersion: v1
kind: Service
metadata:
name: ep
spec:
selector:
app: ep
ports:
- name: http
port: 8080
targetPort: 8080
type: NodePort
EOF
1
2
3
4
5
6
➜  kubernetes git:(master) k get svc --show-labels
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE LABELS
ep NodePort 10.96.112.72 <none> 8080:30143/TCP 23h <none>
➜ kubernetes git:(master) k get endpointslice --show-labels
NAME ADDRESSTYPE PORTS ENDPOINTS AGE LABELS
ep-rsnnk IPv4 8080 10.244.0.183 23h endpointslice.kubernetes.io/managed-by=endpointslice-controller.k8s.io,kubernetes.io/service-name=ep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// staging/src/k8s.io/api/discovery/v1/types.go
// EndpointSlice represents a subset of the endpoints that implement a service.
// For a given service there may be multiple EndpointSlice objects, selected by
// labels, which must be joined to produce the full set of endpoints.
// EndpointSlice表示实现服务的一部分端点。对于给定的服务,可能会有多个EndpointSlice对象,
// 通过标签进行选择,必须将它们合并以产生完整的端点集。
type EndpointSlice struct {
metav1.TypeMeta `json:",inline"`

// Standard object's metadata.
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// addressType specifies the type of address carried by this EndpointSlice.
// All addresses in this slice must be the same type. This field is
// immutable after creation. The following address types are currently
// supported:
// * IPv4: Represents an IPv4 Address.
// * IPv6: Represents an IPv6 Address.
// * FQDN: Represents a Fully Qualified Domain Name.
AddressType AddressType `json:"addressType" protobuf:"bytes,4,rep,name=addressType"`

// endpoints is a list of unique endpoints in this slice. Each slice may
// include a maximum of 1000 endpoints.
// +listType=atomic
Endpoints []Endpoint `json:"endpoints" protobuf:"bytes,2,rep,name=endpoints"`

// ports specifies the list of network ports exposed by each endpoint in
// this slice. Each port must have a unique name. When ports is empty, it
// indicates that there are no defined ports. When a port is defined with a
// nil port value, it indicates "all ports". Each slice may include a
// maximum of 100 ports.
// +optional
// +listType=atomic
Ports []EndpointPort `json:"ports" protobuf:"bytes,3,rep,name=ports"`
}


// pkg/controller/endpointslice/endpointslice_controller.go
// 与endpoint_controller有一点区别,endpointslice监听了node
func NewController(podInformer coreinformers.PodInformer,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
maxEndpointsPerSlice int32,
client clientset.Interface,
endpointUpdatesBatchPeriod time.Duration,
) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})

endpointslicemetrics.RegisterMetrics()

c := &Controller{
client: client,
// This is similar to the DefaultControllerRateLimiter, just with a
// significantly higher default backoff (1s vs 5ms). This controller
// processes events that can require significant EndpointSlice changes,
// such as an update to a Service or Deployment. A more significant
// rate limit back off here helps ensure that the Controller does not
// overwhelm the API Server.
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
// 10 qps, 100 bucket size. This is only for retry speed and its
// only the overall factor (not per item).
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "endpoint_slice"),
workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
c.onServiceUpdate(cur)
},
DeleteFunc: c.onServiceDelete,
})
c.serviceLister = serviceInformer.Lister()
c.servicesSynced = serviceInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPod,
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
})
c.podLister = podInformer.Lister()
c.podsSynced = podInformer.Informer().HasSynced

c.nodeLister = nodeInformer.Lister()
c.nodesSynced = nodeInformer.Informer().HasSynced

endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onEndpointSliceAdd,
UpdateFunc: c.onEndpointSliceUpdate,
DeleteFunc: c.onEndpointSliceDelete,
})

c.endpointSliceLister = endpointSliceInformer.Lister()
c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()

c.maxEndpointsPerSlice = maxEndpointsPerSlice

c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()

c.eventBroadcaster = broadcaster
c.eventRecorder = recorder

c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addNode,
UpdateFunc: c.updateNode,
DeleteFunc: c.deleteNode,
})

c.topologyCache = topologycache.NewTopologyCache()
}

c.reconciler = &reconciler{
client: c.client,
nodeLister: c.nodeLister,
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
endpointSliceTracker: c.endpointSliceTracker,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
topologyCache: c.topologyCache,
eventRecorder: c.eventRecorder,
}

return c
}

syncService是循环处理函数,从workqueue中取key然后进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
// pkg/controller/endpointslice/endpointslice_controller.go
func (c *Controller) syncService(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoint slices. (%v)", key, time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

service, err := c.serviceLister.Services(namespace).Get(name)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}

c.triggerTimeTracker.DeleteService(namespace, name)
c.reconciler.deleteService(namespace, name)
c.endpointSliceTracker.DeleteService(namespace, name)
// The service has been deleted, return nil so that it won't be retried.
return nil
}

if service.Spec.Type == v1.ServiceTypeExternalName {
// services with Type ExternalName receive no endpoints from this controller;
// Ref: https://issues.k8s.io/105986
return nil
}

if service.Spec.Selector == nil {
// services without a selector receive no endpoint slices from this controller;
// these services will receive endpoint slices that are created out-of-band via the REST API.
return nil
}

klog.V(5).Infof("About to update endpoint slices for service %q", key)

podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
if err != nil {
// Since we're getting stuff from a local cache, it is basically
// impossible to get this error.
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
"Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}

esLabelSelector := labels.Set(map[string]string{
discovery.LabelServiceName: service.Name,
discovery.LabelManagedBy: controllerName,
}).AsSelectorPreValidated()
endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)

if err != nil {
// Since we're getting stuff from a local cache, it is basically
// impossible to get this error.
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
"Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}

// Drop EndpointSlices that have been marked for deletion to prevent the controller from getting stuck.
endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)

if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
return endpointsliceutil.NewStaleInformerCache("EndpointSlice informer cache is out of date")
}

// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the EndpointSlice objects.
lastChangeTriggerTime := c.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

err = c.reconciler.reconcile(service, pods, endpointSlices, lastChangeTriggerTime)
if err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
return err
}

return nil
}


// reconcile takes a set of pods currently matching a service selector and
// compares them with the endpoints already present in any existing endpoint
// slices for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
// pkg/controller/endpointslice/reconciler.go
// 参数为service及匹配service selector的pods,已经存在的endpointslice
// 作用:创建,更新或删除endpointslice,确保endpointslice能匹配所有预期的pod
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
slicesToDelete := []*discovery.EndpointSlice{} // slices that are no longer matching any address the service has
errs := []error{} // all errors generated in the process of reconciling
slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type

// addresses that this service supports [o(1) find]
// 获得server支持的AddressesTypes,serviceSupportedAddressesTypes是一个map实现的set
// * IPv4: Represents an IPv4 Address.
// * IPv6: Represents an IPv6 Address.
// * FQDN: Represents a Fully Qualified Domain Name.
serviceSupportedAddressesTypes := getAddressTypesForService(service)

// loop through slices identifying their address type.
// slices that no longer match address type supported by services
// go to delete, other slices goes to the reconciler machinery
// for further adjustment
for _, existingSlice := range existingSlices {
// service no longer supports that address type, add it to deleted slices
if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok {
if r.topologyCache != nil {
svcKey, err := serviceControllerKey(existingSlice)
if err != nil {
klog.Warningf("Couldn't get key to remove EndpointSlice from topology cache %+v: %v", existingSlice, err)
} else {
r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
}
}
// 如果是这个Service不支持的AddressType则加入删除列表
slicesToDelete = append(slicesToDelete, existingSlice)
continue
}

// add list if it is not on our map
if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
}

slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
}

// reconcile for existing.
for addressType := range serviceSupportedAddressesTypes {
existingSlices := slicesByAddressType[addressType]
// 对不同的AddressType进行调谐
err := r.reconcileByAddressType(service, pods, existingSlices, triggerTime, addressType)
if err != nil {
errs = append(errs, err)
}
}

// delete those which are of addressType that is no longer supported
// by the service
for _, sliceToDelete := range slicesToDelete {
err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting %s EndpointSlice for Service %s/%s: %w", sliceToDelete.Name, service.Namespace, service.Name, err))
} else {
r.endpointSliceTracker.ExpectDeletion(sliceToDelete)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}

return utilerrors.NewAggregate(errs)
}


// reconcileByAddressType takes a set of pods currently matching a service selector and
// compares them with the endpoints already present in any existing endpoint
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
errs := []error{}

slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
slicesToDelete := []*discovery.EndpointSlice{}
events := []*topologycache.EventBuilder{}

// Build data structures for existing state.
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
for _, existingSlice := range existingSlices {
if ownedBy(existingSlice, service) {
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
} else {
slicesToDelete = append(slicesToDelete, existingSlice)
}
}

// Build data structures for desired state.
desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointsliceutil.EndpointSet{}

for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, true) {
continue
}

endpointPorts := getEndpointPorts(service, pod)
epHash := endpointutil.NewPortMapKey(endpointPorts)
if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
}

if _, ok := desiredMetaByPortMap[epHash]; !ok {
desiredMetaByPortMap[epHash] = &endpointMeta{
addressType: addressType,
ports: endpointPorts,
}
}

node, err := r.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
// we are getting the information from the local informer,
// an error different than IsNotFound should not happen
if !errors.IsNotFound(err) {
return err
}
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
// retry later, but also update the EndpointSlice without the problematic Pod.
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
// stuck forever.
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
// Pod, since the user is explicitly indicating that the Pod address should be published.
if !service.Spec.PublishNotReadyAddresses {
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
continue
}
}
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
}
}

spMetrics := metrics.NewServicePortCache()
totalAdded := 0
totalRemoved := 0

// Determine changes necessary for each group of slices by port map.
for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
numEndpoints := len(desiredEndpoints)
pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])

totalAdded += added
totalRemoved += removed

spMetrics.Set(portMap, metrics.EfficiencyInfo{
Endpoints: numEndpoints,
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
})

slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
}

// If there are unique sets of ports that are no longer desired, mark
// the corresponding endpoint slices for deletion.
for portMap, existingSlices := range existingSlicesByPortMap {
if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
slicesToDelete = append(slicesToDelete, existingSlices...)
}
}

// When no endpoint slices would usually exist, we need to add a placeholder.
if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
// Check for existing placeholder slice outside of the core control flow
placeholderSlice := newEndpointSlice(service, &endpointMeta{ports: []discovery.EndpointPort{}, addressType: addressType})
if len(slicesToDelete) == 1 && placeholderSliceCompare.DeepEqual(slicesToDelete[0], placeholderSlice) {
// We are about to unnecessarily delete/recreate the placeholder, remove it now.
slicesToDelete = slicesToDelete[:0]
} else {
slicesToCreate = append(slicesToCreate, placeholderSlice)
}
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}

metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))

serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)

// Topology hints are assigned per address type. This means it is
// theoretically possible for endpoints of one address type to be assigned
// hints while another endpoints of another address type are not.
si := &topologycache.SliceInfo{
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
AddressType: addressType,
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}

if r.topologyCache != nil && hintsEnabled(service.Annotations) {
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(si)
} else {
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
klog.InfoS("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &topologycache.EventBuilder{
EventType: corev1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType),
})
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
for _, event := range events {
r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message)
}
return utilerrors.NewAggregate(errs)

}

为了管理服务的EndpointSlices分发,Kubernetes维护了一个名为TopologyCacheEndpointSlices缓存。TopologyCache跟踪服务所有EndpointSlices的状态,并允许Kubernetes快速、高效地将流量路由到适当的端点。

TopologyCache是由服务控制器提供的端点拓扑信息构建的。控制器根据标签和选择器确定服务端点的拓扑,并生成一组基于拓扑信息的EndpointSlices,并使用新的EndpointSlices更新TopologyCache


REF:
1.https://kubernetes.io/zh-cn/docs/concepts/services-networking/endpoint-slices/