kube-controller之namespace

Kubernetes Namespace ControllerKubernetes中的一个控制器,负责管理Kubernetes中的命名空间。命名空间是Kubernetes资源的一个逻辑分组,用于将资源隔离在不同的命名空间中,从而实现更好的资源管理和隔离。

Namespace Controller主要负责以下任务:
在删除命名空间时,确保所有在该命名空间中创建的资源都被正确地清理和回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 启动函数
// cmd/kube-controller-manager/app/core.go
func startNamespaceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig.QPS *= 20
nsKubeconfig.Burst *= 100
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
}

func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {

metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil {
return nil, true, err
}

discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources

namespaceController := namespacecontroller.NewNamespaceController(
ctx,
namespaceKubeClient,
metadataClient,
discoverResourcesFn,
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes,
)
go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs))

return nil, true, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// pkg/controller/namespace/namespace_controller.go
const (
// namespaceDeletionGracePeriod is the time period to wait before processing a received namespace event.
// This allows time for the following to occur:
// * lifecycle admission plugins on HA apiservers to also observe a namespace
// deletion and prevent new objects from being created in the terminating namespace
// * non-leader etcd servers to observe last-minute object creations in a namespace
// so this controller's cleanup can actually clean up all objects
// 延迟入队时间
namespaceDeletionGracePeriod = 5 * time.Second
)

// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
// lister that can list namespaces from a shared cache
lister corelisters.NamespaceLister
// returns true when the namespace cache is ready
listerSynced cache.InformerSynced
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// helper to delete all resources in the namespace when the namespace is deleted.
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}

// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
ctx context.Context,
kubeClient clientset.Interface,
metadataClient metadata.Interface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
finalizerToken v1.FinalizerName) *NamespaceController {

// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
}

// configure the namespace informer event handlers
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*v1.Namespace)
namespaceController.enqueueNamespace(namespace)
},
},
resyncPeriod,
)
namespaceController.lister = namespaceInformer.Lister()
namespaceController.listerSynced = namespaceInformer.Informer().HasSynced

return namespaceController
}

func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}

namespace := obj.(*v1.Namespace)
// don't queue if we aren't deleted
// !! 只有删除操作才会入队,所以说NamespaceController只处理删除操作
if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
return
}

// delay processing namespace events to allow HA api servers to observe namespace deletion,
// and HA etcd servers to observe last minute object creations inside the namespace
nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
}

func (nm *NamespaceController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer nm.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting namespace controller")
defer logger.Info("Shutting down namespace controller")

if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) {
return
}

logger.V(5).Info("Starting workers of namespace controller")
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, nm.worker, time.Second)
}
<-ctx.Done()
}

func (nm *NamespaceController) worker(ctx context.Context) {
// workFunc 返回值是一个bool, 表明有队列是否被关闭
workFunc := func(ctx context.Context) bool {
// 从队列中获取一个key, 如果队列已经关闭,直接退出
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)

err := nm.syncNamespaceFromKey(ctx, key.(string))
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
return false
}

// ok=true,表明资源未完全清除完
// 这里会有一个延迟入队,等待资源被清除
if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
t := estimate.Estimate/2 + 1
klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
// 其它错误,直接重新入队
nm.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err))
}
return false
}
for {
quit := workFunc(ctx)

// 队列已关闭
if quit {
return
}
}
}

// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
}()

namespace, err := nm.lister.Get(key)
if errors.IsNotFound(err) {
logger.Info("Namespace has been deleted", "namespace", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
return err
}
return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// pkg/controller/namespace/deletion/namespaced_resources_deleter.go
// Delete deletes all resources in the given namespace.
// Before deleting resources:
// - It ensures that deletion timestamp is set on the
// namespace (does nothing if deletion timestamp is missing).
// - Verifies that the namespace is in the "terminating" phase
// (updates the namespace phase if it is not yet marked terminating)
//
// After deleting the resources:
// * It removes finalizer token from the given namespace.
//
// Returns an error if any of those steps fail.
// Returns ResourcesRemainingError if it deleted some resources but needs
// to wait for them to go away.
// Caller is expected to keep calling this until it succeeds.
func (d *namespacedResourcesDeleter) Delete(ctx context.Context, nsName string) error {
// Multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
namespace, err := d.nsClient.Get(context.TODO(), nsName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if namespace.DeletionTimestamp == nil {
return nil
}

klog.FromContext(ctx).V(5).Info("Namespace controller - syncNamespace", "namespace", namespace.Name, "finalizerToken", d.finalizerToken)

// ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone
namespace, err = d.retryOnConflictError(namespace, d.updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

// the latest view of the namespace asserts that namespace is no longer deleting..
if namespace.DeletionTimestamp.IsZero() {
return nil
}

// return if it is already finalized.
// 如果len(Spec.Finalizers)==0, return
if finalized(namespace) {
return nil
}

// there may still be content for us to remove
estimate, err := d.deleteAllContent(ctx, namespace)
if err != nil {
return err
}

// 还有资源未删除
if estimate > 0 {
return &ResourcesRemainingError{estimate}
}

// we have removed content, so mark it finalized by us
_, err = d.retryOnConflictError(namespace, d.finalizeNamespace)
if err != nil {
// in normal practice, this should not be possible, but if a deployment is running
// two controllers to do namespace deletion that share a common finalizer token it's
// possible that a not found could occur since the other controller would have finished the delete.
if errors.IsNotFound(err) {
return nil
}
return err
}
return nil
}

// 使用dynamic client删除命名空间下的所有资源
// 返回一个时间(估计资源删除需要的时间)
func (d *namespacedResourcesDeleter) deleteAllContent(ctx context.Context, ns *v1.Namespace) (int64, error) {
namespace := ns.Name
namespaceDeletedAt := *ns.DeletionTimestamp
var errs []error
conditionUpdater := namespaceConditionUpdater{}
estimate := int64(0)
logger := klog.FromContext(ctx)
logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace)

resources, err := d.discoverResourcesFn()
if err != nil {
// discovery errors are not fatal. We often have some set of resources we can operate against even if we don't have a complete list
errs = append(errs, err)
conditionUpdater.ProcessDiscoverResourcesErr(err)
}
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
// discovery errors are not fatal. We often have some set of resources we can operate against even if we don't have a complete list
errs = append(errs, err)
conditionUpdater.ProcessGroupVersionErr(err)
}

numRemainingTotals := allGVRDeletionMetadata{
gvrToNumRemaining: map[schema.GroupVersionResource]int{},
finalizersToNumRemaining: map[string]int{},
}
for gvr := range groupVersionResources {
gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(ctx, gvr, namespace, namespaceDeletedAt)
if err != nil {
// If there is an error, hold on to it but proceed with all the remaining
// groupVersionResources.
errs = append(errs, err)
conditionUpdater.ProcessDeleteContentErr(err)
}
if gvrDeletionMetadata.finalizerEstimateSeconds > estimate {
estimate = gvrDeletionMetadata.finalizerEstimateSeconds
}
if gvrDeletionMetadata.numRemaining > 0 {
numRemainingTotals.gvrToNumRemaining[gvr] = gvrDeletionMetadata.numRemaining
for finalizer, numRemaining := range gvrDeletionMetadata.finalizersToNumRemaining {
if numRemaining == 0 {
continue
}
numRemainingTotals.finalizersToNumRemaining[finalizer] = numRemainingTotals.finalizersToNumRemaining[finalizer] + numRemaining
}
}
}
conditionUpdater.ProcessContentTotals(numRemainingTotals)

// we always want to update the conditions because if we have set a condition to "it worked" after it was previously, "it didn't work",
// we need to reflect that information. Recall that additional finalizers can be set on namespaces, so this finalizer may clear itself and
// NOT remove the resource instance.
if hasChanged := conditionUpdater.Update(ns); hasChanged {
if _, err = d.nsClient.UpdateStatus(context.TODO(), ns, metav1.UpdateOptions{}); err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't update status condition for namespace %q: %v", namespace, err))
}
}

// if len(errs)==0, NewAggregate returns nil.
err = utilerrors.NewAggregate(errs)
logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace, "estimate", estimate, "err", err)
return estimate, err
}

总结:
NamespaceController相对比较简单,只有一个删除的调谐操作。


REF:
1.cmd/kube-controller-manager/app/core.go
2.pkg/controller/namespace/namespace_controller.go
3.pkg/controller/namespace/deletion/namespaced_resources_deleter.go