k8s源码中的一些实现

本文主要记录k8s中源码中一些函数实现。持续更新。

函数批量调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// pkg/controller/replicaset/replica_set.go
// slowStartBatch 尝试调用fn总共count次,刚开始以较小的并发batchSize调用fn函数(用于检查是否发生错误)
// 如果未发生错误则增加batchSize

// 将函数进行分批调用,起始值为initialBatchSize和count之间的最小值,在每一批的函数调用是并发执行的
// 如果一次批调用都成功了,一下次批调用将会对batchSize翻倍。
// 如果在一次批调用过程中发生了错误,会等待当前批次的调用执行完。但余下的批次将不会执行。

// 如果initialBatchSize为1,则batchSize变化为
// 1, 2, 4, 8, 16...

// 返回成功调用fn的次数
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
// remaining统计fn剩余调用的次数
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
// errCh用于统计调用失败的次数
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
// 更新remaining
remaining -= batchSize
}
return successes, nil
}

// vendor/k8s.io/utils/integer/integer.go
// IntMin returns the minimum of the params
func IntMin(a, b int) int {
if b < a {
return b
}
return a
}
FindTailLineStartIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// pkg/util/tail/tail.go

// 返回倒数第n行的起始字节数
// If n < 0, 返回文件起始位置
// If n >=0, 返回倒数第n行的起始字节数

// 如果最后一行没有换行符将不会统计
func FindTailLineStartIndex(f io.ReadSeeker, n int64) (int64, error) {
if n < 0 {
return 0, nil
}
// 获取文件大小
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
var left, cnt int64
buf := make([]byte, blockSize)
for right := size; right > 0 && cnt <= n; right -= blockSize {
left = right - blockSize
if left < 0 {
left = 0
buf = make([]byte, right)
}
if _, err := f.Seek(left, io.SeekStart); err != nil {
return 0, err
}
if _, err := f.Read(buf); err != nil {
return 0, err
}
// 统计行数
cnt += int64(bytes.Count(buf, eol))
}
// 假设文件为"hello\nworld\n",则size=12
// cnt = 2, 此时left为0
// 假设n=1,则cnt>n进入循环
for ; cnt > n; cnt-- {
// idx=6
idx := bytes.Index(buf, eol) + 1
buf = buf[idx:]
left += int64(idx)
}
// left=6
return left, nil
}

const (
blockSize = 1024
)

var (
// eol is the end-of-line sign in the log.
eol = []byte{'\n'}
)
并行拉取镜像

最大并行镜像拉取数量:
特性状态: Kubernetes v1.27 [alpha]
当 serializeImagePulls 被设置为 false 时,kubelet 默认对同时拉取的最大镜像数量没有限制。 如果你想限制并行镜像拉取的数量,可以在 kubelet 配置中设置字段 maxParallelImagePulls。 当 maxParallelImagePulls 设置为 n 时,只能同时拉取 n 个镜像, 超过 n 的任何镜像都必须等到至少一个正在进行拉取的镜像拉取完成后,才能拉取。

当启用并行镜像拉取时,限制并行镜像拉取的数量可以防止镜像拉取消耗过多的网络带宽或磁盘 I/O。

你可以将 maxParallelImagePulls 设置为大于或等于 1 的正数。 如果将 maxParallelImagePulls 设置为大于等于 2,则必须将 serializeImagePulls 设置为 false。 kubelet 在无效的 maxParallelImagePulls 设置下会启动失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

// pkg/kubelet/images/puller.go
type parallelImagePuller struct {
imageService kubecontainer.ImageService
tokens chan struct{}
}

func newParallelImagePuller(imageService kubecontainer.ImageService, maxParallelImagePulls *int32) imagePuller {
if maxParallelImagePulls == nil || *maxParallelImagePulls < 1 {
return &parallelImagePuller{imageService, nil}
}
// tokens的大小为maxParallelImagePulls
// maxParallelImagePulls并发的goroutine数
return &parallelImagePuller{imageService, make(chan struct{}, *maxParallelImagePulls)}
}

func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
go func() {
if pip.tokens != nil {
// 每启动一个goroutine拉取镜像,往tokens写入struct{}{}
// 如果tokens已经满了,则会阻塞
pip.tokens <- struct{}{}
// 镜像拉取成功后,释放空间
defer func() { <-pip.tokens }()
}
startTime := time.Now()
imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig)
pullChan <- pullResult{
imageRef: imageRef,
err: err,
pullDuration: time.Since(startTime),
}
}()
}

func (m *imageManager) EnsureImageExists(ctx context.Context, pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, string, error) {
...
pullChan := make(chan pullResult)
// 调用pullImage并行拉取镜像
m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig)
// 串行获取镜像拉取结果
imagePullResult := <-pullChan
...
}
串行镜像拉取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// pkg/kubelet/images/puller.go

// 可以进入排队的最大镜像拉取请求数
const maxImagePullRequests = 10

type serialImagePuller struct {
imageService kubecontainer.ImageService
pullRequests chan *imagePullRequest
}

func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller {
imagePuller := &serialImagePuller{imageService, make(chan *imagePullRequest, maxImagePullRequests)}
// 启动一个不会退出的goroutine, 不断的从pullRequests中取出信息,然后拉取镜像
go wait.Until(imagePuller.processImagePullRequests, time.Second, wait.NeverStop)
return imagePuller
}

type imagePullRequest struct {
ctx context.Context
spec kubecontainer.ImageSpec
pullSecrets []v1.Secret
pullChan chan<- pullResult
podSandboxConfig *runtimeapi.PodSandboxConfig
}

func (sip *serialImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
// 这里只是把镜像拉取请求发送到pullRequests,实现的拉取镜像动作是在processImagePullRequests
sip.pullRequests <- &imagePullRequest{
ctx: ctx,
spec: spec,
pullSecrets: pullSecrets,
pullChan: pullChan,
podSandboxConfig: podSandboxConfig,
}
}

func (sip *serialImagePuller) processImagePullRequests() {
// 遍历pullRequests,串行拉取镜像
for pullRequest := range sip.pullRequests {
startTime := time.Now()
imageRef, err := sip.imageService.PullImage(pullRequest.ctx, pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig)
pullRequest.pullChan <- pullResult{
imageRef: imageRef,
err: err,
pullDuration: time.Since(startTime),
}
}
}

REF:
1.pkg/controller/replicaset/replica_set.go
2.vendor/k8s.io/utils/integer/integer.go
3.pkg/util/tail/tail.go
4.pkg/kubelet/images/puller.go
5.https://kubernetes.io/zh-cn/docs/concepts/containers/images/#maximum-parallel-image-pulls
6.pkg/kubelet/images/image_manager.go