kube-controller之ServiceAccounts

ServiceAccountsController 是 Kubernetes 中的一个控制器,负责管理和维护 Service Account 资源。

Service Account 是 Kubernetes 中用于身份验证和授权的一种机制。它与 Pod 关联,为 Pod 提供一个身份标识。通过 Service Account,Pod 可以与 API 服务器进行身份验证,并根据其与 Service Account 关联的角色和权限来访问集群中的资源。

ServiceAccountsController 的主要职责包括以下几个方面:

  1. 创建Service Account:当创建Namespace 时,ServiceAccountsController 会检查该Namespace下是否存在”default” ServiceAccounts,如果不存在则创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

func DefaultServiceAccountsControllerOptions() ServiceAccountsControllerOptions {
return ServiceAccountsControllerOptions{
ServiceAccounts: []v1.ServiceAccount{
{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
},
}
}


// pkg/controller/serviceaccount/serviceaccounts_controller.go
type ServiceAccountsController struct {
client clientset.Interface
serviceAccountsToEnsure []v1.ServiceAccount

// To allow injection for testing.
syncHandler func(ctx context.Context, key string) error

saLister corelisters.ServiceAccountLister
saListerSynced cache.InformerSynced

nsLister corelisters.NamespaceLister
nsListerSynced cache.InformerSynced

queue workqueue.RateLimitingInterface
}

// NewServiceAccountsController 监听两种资源对象
// ServiceAccounts 监听删除事件,将key入队
// Namespaces 监听创建和更新事件,将key入队
func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) (*ServiceAccountsController, error) {
e := &ServiceAccountsController{
client: cl,
// serviceAccountsToEnsure, 有一个名为"default"的serviceAccount
serviceAccountsToEnsure: options.ServiceAccounts,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"),
}

saHandler, _ := saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.serviceAccountDeleted,
}, options.ServiceAccountResync)
e.saLister = saInformer.Lister()
e.saListerSynced = saHandler.HasSynced

nsHandler, _ := nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: e.namespaceAdded,
UpdateFunc: e.namespaceUpdated,
}, options.NamespaceResync)
e.nsLister = nsInformer.Lister()
e.nsListerSynced = nsHandler.HasSynced

// syncHandler为syncNamespace
e.syncHandler = e.syncNamespace

return e, nil
}

func (c *ServiceAccountsController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.FromContext(ctx).Info("Starting service account controller")
defer klog.FromContext(ctx).Info("Shutting down service account controller")

if !cache.WaitForNamedCacheSync("service account", ctx.Done(), c.saListerSynced, c.nsListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}

<-ctx.Done()
}

func (c *ServiceAccountsController) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *ServiceAccountsController) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

err := c.syncHandler(ctx, key.(string))
if err == nil {
c.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
c.queue.AddRateLimited(key)

return true
}
func (c *ServiceAccountsController) syncNamespace(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.FromContext(ctx).V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
}()

ns, err := c.nsLister.Get(key)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Namespace状态不是Active,不做任何事
if ns.Status.Phase != v1.NamespaceActive {
return nil
}

createFailures := []error{}
for _, sa := range c.serviceAccountsToEnsure {
// 判断对应的命名空间下有没有名为"default"的ServiceAccounts
switch _, err := c.saLister.ServiceAccounts(ns.Name).Get(sa.Name); {
case err == nil:
continue
// 如果err是NotFound,则会在下面的逻辑中创建新的ServiceAccounts
case apierrors.IsNotFound(err):
case err != nil:
return err
}
// this is only safe because we never read it and we always write it
// TODO eliminate this once the fake client can handle creation without NS
sa.Namespace = ns.Name

if _, err := c.client.CoreV1().ServiceAccounts(ns.Name).Create(ctx, &sa, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
// we can safely ignore terminating namespace errors
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
createFailures = append(createFailures, err)
}
}
}

return utilerrors.Flatten(utilerrors.NewAggregate(createFailures))
}

启动入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// cmd/kube-controller-manager/app/core.go
func startServiceAccountController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
sac, err := serviceaccountcontroller.NewServiceAccountsController(
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ClientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
}
go sac.Run(ctx, 1)
return nil, true, nil
}

当 Pod 使用 ServiceAccount 运行时,Kubernetes 会自动为该 ServiceAccount 创建一个与之关联的 Token。这个 Token 通常存储在 Pod 的文件系统中的 /var/run/secrets/kubernetes.io/serviceaccount/token 文件中。Pod 中的容器可以通过读取该文件来获取与 ServiceAccount 相关联的 Token。

Token 是一种用于身份验证的凭据,它可以用于与 Kubernetes API 服务器进行身份验证和授权操作。Pod 中的容器可以使用 Token 来与 Kubernetes API 服务器交互,例如获取或修改资源对象。

通过 ServiceAccount 和 Token 的组合,Kubernetes 提供了一种安全的身份验证和授权机制。Pod 可以使用与其关联的 ServiceAccount 的 Token 来证明其身份,并在需要访问受保护资源时进行授权。这种机制确保了集群中的各个组件和应用程序具有适当的访问权限,并提供了更细粒度的权限控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
type TokensControllerOptions struct {
// TokenGenerator is the generator to use to create new tokens
// GenerateToken生成一个令牌,用于标识给定的ServiceAccount
TokenGenerator serviceaccount.TokenGenerator
// ServiceAccountResync is the time.Duration at which to fully re-list service accounts.
// If zero, re-list will be delayed as long as possible
ServiceAccountResync time.Duration
// SecretResync is the time.Duration at which to fully re-list secrets.
// If zero, re-list will be delayed as long as possible
SecretResync time.Duration
// This CA will be added in the secrets of service accounts
RootCA []byte

// MaxRetries controls the maximum number of times a particular key is retried before giving up
// If zero, a default max is used
MaxRetries int
}


// pkg/controller/serviceaccount/tokens_controller.go
func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) (*TokensController, error) {
maxRetries := options.MaxRetries
if maxRetries == 0 {
maxRetries = 10
}

e := &TokensController{
client: cl,
token: options.TokenGenerator,
rootCA: options.RootCA,

syncServiceAccountQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount_tokens_service"),
syncSecretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount_tokens_secret"),

maxRetries: maxRetries,
}

e.serviceAccounts = serviceAccounts.Lister()
e.serviceAccountSynced = serviceAccounts.Informer().HasSynced
serviceAccounts.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
// 加入syncServiceAccountQueue
AddFunc: e.queueServiceAccountSync,
UpdateFunc: e.queueServiceAccountUpdateSync,
DeleteFunc: e.queueServiceAccountSync,
},
options.ServiceAccountResync,
)

secretCache := secrets.Informer().GetIndexer()
e.updatedSecrets = cache.NewIntegerResourceVersionMutationCache(secretCache, secretCache, 60*time.Second, true)
e.secretSynced = secrets.Informer().HasSynced
secrets.Informer().AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Secret:
return t.Type == v1.SecretTypeServiceAccountToken
default:
utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
return false
}
},
// 加入syncSecretQueue
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: e.queueSecretSync,
UpdateFunc: e.queueSecretUpdateSync,
DeleteFunc: e.queueSecretSync,
},
},
options.SecretResync,
)

return e, nil
}

func (e *TokensController) queueServiceAccountSync(obj interface{}) {
if serviceAccount, ok := obj.(*v1.ServiceAccount); ok {
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
}
}

func (e *TokensController) queueServiceAccountUpdateSync(oldObj interface{}, newObj interface{}) {
if serviceAccount, ok := newObj.(*v1.ServiceAccount); ok {
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
}
}

func (e *TokensController) queueSecretSync(obj interface{}) {
if secret, ok := obj.(*v1.Secret); ok {
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
}
}

func (e *TokensController) queueSecretUpdateSync(oldObj interface{}, newObj interface{}) {
if secret, ok := newObj.(*v1.Secret); ok {
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
}
}

// Run runs controller blocks until stopCh is closed
func (e *TokensController) Run(ctx context.Context, workers int) {
// Shut down queues
defer utilruntime.HandleCrash()
defer e.syncServiceAccountQueue.ShutDown()
defer e.syncSecretQueue.ShutDown()

if !cache.WaitForNamedCacheSync("tokens", ctx.Done(), e.serviceAccountSynced, e.secretSynced) {
return
}

logger := klog.FromContext(ctx)
logger.V(5).Info("Starting workers")
for i := 0; i < workers; i++ {
// 启动了两个sync方法,syncServiceAccount,syncSecret
go wait.UntilWithContext(ctx, e.syncServiceAccount, 0)
go wait.UntilWithContext(ctx, e.syncSecret, 0)
}
<-ctx.Done()
logger.V(1).Info("Shutting down")
}

func (e *TokensController) syncServiceAccount(ctx context.Context) {
logger := klog.FromContext(ctx)
key, quit := e.syncServiceAccountQueue.Get()
if quit {
return
}
defer e.syncServiceAccountQueue.Done(key)

retry := false
defer func() {
e.retryOrForget(logger, e.syncServiceAccountQueue, key, retry)
}()

// 获取serviceAccountQueueKey
// type serviceAccountQueueKey struct {
// namespace string
// name string
// uid types.UID
// }
saInfo, err := parseServiceAccountKey(key)
if err != nil {
logger.Error(err, "Parsing service account key")
return
}

sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
switch {
case err != nil:
logger.Error(err, "Getting service account")
retry = true
case sa == nil:
// serviceaccount已经不存在,删除对应的token
logger.V(4).Info("Service account deleted, removing tokens", "namespace", saInfo.namespace, "serviceaccount", saInfo.name)
sa = &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
retry, err = e.deleteTokens(sa)
if err != nil {
logger.Error(err, "Error deleting serviceaccount tokens", "namespace", saInfo.namespace, "serviceaccount", saInfo.name)
}
}
}

func (e *TokensController) syncSecret(ctx context.Context) {
key, quit := e.syncSecretQueue.Get()
if quit {
return
}
defer e.syncSecretQueue.Done(key)

logger := klog.FromContext(ctx)
// Track whether or not we should retry this sync
retry := false
defer func() {
e.retryOrForget(logger, e.syncSecretQueue, key, retry)
}()
// type secretQueueKey struct {
// namespace string
// name string
// uid types.UID
// saName string
// // optional, will be blank when syncing tokens missing the service account uid annotation
// saUID types.UID
// }
secretInfo, err := parseSecretQueueKey(key)
if err != nil {
logger.Error(err, "Parsing secret queue key")
return
}
// 获取对应的Secret
secret, err := e.getSecret(secretInfo.namespace, secretInfo.name, secretInfo.uid, false)
switch {
case err != nil:
logger.Error(err, "Getting secret")
retry = true
case secret == nil:
// If the service account exists

if sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, false); saErr == nil && sa != nil {
// secret no longer exists, so delete references to this secret from the service account
// serviceaccount存在,secret不存在,删除对secret的引用
if err := clientretry.RetryOnConflict(RemoveTokenBackoff, func() error {
return e.removeSecretReference(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, secretInfo.name)
}); err != nil {
logger.Error(err, "Removing secret reference")
}
}
default:
// Ensure service account exists
// secret不为nil的情况
sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, true)
switch {
case saErr != nil:
logger.Error(saErr, "Getting service account")
retry = true
case sa == nil:
// Delete token
// 如果serviceaccount为nil, 则删除对应的token
logger.V(4).Info("Service account does not exist, deleting token", "secret", klog.KRef(secretInfo.namespace, secretInfo.name))
if retriable, err := e.deleteToken(secretInfo.namespace, secretInfo.name, secretInfo.uid); err != nil {
logger.Error(err, "Deleting serviceaccount token", "secret", klog.KRef(secretInfo.namespace, secretInfo.name), "serviceAccount", klog.KRef(secretInfo.namespace, secretInfo.saName))
retry = retriable
}
default:
// 更新token
if retriable, err := e.generateTokenIfNeeded(logger, sa, secret); err != nil {
logger.Error(err, "Populating serviceaccount token", "secret", klog.KRef(secretInfo.namespace, secretInfo.name), "serviceAccount", klog.KRef(secretInfo.namespace, secretInfo.saName))
retry = retriable
}
}
}
}

REF:
1.pkg/controller/serviceaccount/serviceaccounts_controller.go
2.cmd/kube-controller-manager/app/core.go
3.pkg/controller/serviceaccount/tokens_controller.go