kube-state-metrics源码分析

代码结构

介绍下几个比较重要的目录

目录 描述
internal/store 定义了各种资源metricsFamily用生成metrics
pkg/builder 使用builder_pattern构建store
pkg/metric_generator 生成metrics
pkg/metric_store 构建了一个MetricsStore 实现了cache接口,用于存储refector获取到的数据
pkg/metricshandler 实现了/metrics的handler函数|

MetricsStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// pkg/metrics_store/metrics_store.go
// MetricsStore 实现了the k8s.io/client-go/tools/cache.Store接口.MetricsStore
// 不存储k8s资源,只存储基于资源生成的metrics
type MetricsStore struct {
// Protects metrics
mutex sync.RWMutex
// metrics is a map indexed by Kubernetes object id, containing a slice of
// metric families, containing a slice of metrics. We need to keep metrics
// grouped by metric families in order to zip families with their help text in
// MetricsStore.WriteAll().
metrics map[types.UID][][]byte
// headers contains the header (TYPE and HELP) of each metric family. It is
// later on zipped with with their corresponding metric families in
// MetricStore.WriteAll().
headers []string

// generateMetricsFunc generates metrics based on a given Kubernetes object
// and returns them grouped by metric family.
generateMetricsFunc func(interface{}) []metric.FamilyInterface
}

// Add 为MetricsStore实现了Add方法,该方法会由reflector调用
func (s *MetricsStore) Add(obj interface{}) error {
o, err := meta.Accessor(obj)
if err != nil {
return err
}

s.mutex.Lock()
defer s.mutex.Unlock()

// 生成prometheus需要的metric
// 这里的generateMetricsFunc对于configmap这种资源来说,generateMetricsFunc也就是ComposeMetricGenFuncs的返回值(也是一个函数)
families := s.generateMetricsFunc(obj)
familyStrings := make([][]byte, len(families))

for i, f := range families {
familyStrings[i] = f.ByteSlice()
}

// key为k8s资源的UID
s.metrics[o.GetUID()] = familyStrings

return nil
}

MetricsGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// pkg/metric_generator/generator.go
// 为k8s对象生成metrics
type FamilyGenerator struct {
Name string
Help string
Type metric.Type
OptIn bool
DeprecatedVersion string
GenerateFunc func(obj interface{}) *metric.Family
}

// NewFamilyGenerator creates new FamilyGenerator instances.
func NewFamilyGenerator(name string, help string, metricType metric.Type, deprecatedVersion string, generateFunc func(obj interface{}) *metric.Family) *FamilyGenerator {
f := &FamilyGenerator{
Name: name,
Type: metricType,
Help: help,
OptIn: false,
DeprecatedVersion: deprecatedVersion,
GenerateFunc: generateFunc,
}
if deprecatedVersion != "" {
f.Help = fmt.Sprintf("(Deprecated since %s) %s", deprecatedVersion, help)
}
return f
}

func (g *FamilyGenerator) Generate(obj interface{}) *metric.Family {
family := g.GenerateFunc(obj)
family.Name = g.Name
family.Type = g.Type
return family
}
// generateHeader 生成metrics所需的header
func (g *FamilyGenerator) generateHeader() string {
header := strings.Builder{}
header.WriteString("# HELP ")
header.WriteString(g.Name)
header.WriteByte(' ')
header.WriteString(g.Help)
header.WriteByte('\n')
header.WriteString("# TYPE ")
header.WriteString(g.Name)
header.WriteByte(' ')
header.WriteString(string(g.Type))

return header.String()
}

// ExtractMetricFamilyHeaders takes in a slice of FamilyGenerator metrics and
// returns the extracted headers.
func ExtractMetricFamilyHeaders(families []FamilyGenerator) []string {
headers := make([]string, len(families))

for i, f := range families {
headers[i] = f.generateHeader()
}

return headers
}

// ComposeMetricGenFuncs takes a slice of metric families and returns a function
// that composes their metric generation functions into a single one.
func ComposeMetricGenFuncs(familyGens []FamilyGenerator) func(obj interface{}) []metric.FamilyInterface {
return func(obj interface{}) []metric.FamilyInterface {
families := make([]metric.FamilyInterface, len(familyGens))

for i, gen := range familyGens {
// Generate调用了GenerateFunc
// 不同的object实现了不同的metric
families[i] = gen.Generate(obj)
}

return families
}
}

MetricsHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// pkg/metricshandler/metrics_handler.go
// ServeHTTP implements the http.Handler interface. It writes all generated
// metrics to the response body.
// 实现了ServeHTTP处理/metrics请求
func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mtx.RLock()
defer m.mtx.RUnlock()
resHeader := w.Header()
var writer io.Writer = w

resHeader.Set("Content-Type", `text/plain; version=`+"0.0.4")

if m.enableGZIPEncoding {
// Gzip response if requested. Taken from
// github.com/prometheus/client_golang/prometheus/promhttp.decorateWriter.
reqHeader := r.Header.Get("Accept-Encoding")
parts := strings.Split(reqHeader, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
writer = gzip.NewWriter(writer)
resHeader.Set("Content-Encoding", "gzip")
}
}
}

for _, w := range m.metricsWriters {
// 调用WriteAll 生成prometheus所需的数据
w.WriteAll(writer)
}

// In case we gzipped the response, we have to close the writer.
if closer, ok := writer.(io.Closer); ok {
closer.Close()
}
}

启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
func main() {
opts := options.NewOptions()
// 添加flag, 根据参数填充options中的值,并为一些参数提供默认值
opts.AddFlags()

promLogger := promLogger{}

ctx := context.Background()

// 解析参数
err := opts.Parse()
if err != nil {
klog.Fatalf("Error: %s", err)
}

if opts.Version {
fmt.Printf("%s\n", version.Print("kube-state-metrics"))
os.Exit(0)
}

if opts.Help {
opts.Usage()
os.Exit(0)
}
// 创建一个storeBuilder, 使用很常见的builder模式一步一步构建对象
storeBuilder := store.NewBuilder()

// 用于观测kube-state-metrics本身的性能
ksmMetricsRegistry := prometheus.NewRegistry()
ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics"))
durationVec := promauto.With(ksmMetricsRegistry).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "A histogram of requests for kube-state-metrics metrics handler.",
Buckets: prometheus.DefBuckets,
ConstLabels: prometheus.Labels{"handler": "metrics"},
}, []string{"method"},
)
storeBuilder.WithMetrics(ksmMetricsRegistry)

var resources []string
if len(opts.Resources) == 0 {
klog.Info("Using default resources")
resources = options.DefaultResources.AsSlice()
} else {
klog.Infof("Using resources %s", opts.Resources.String())
resources = opts.Resources.AsSlice()
}

// 设置对应的resources
if err := storeBuilder.WithEnabledResources(resources); err != nil {
klog.Fatalf("Failed to set up resources: %v", err)
}

// 设置namespaces
namespaces := opts.Namespaces.GetNamespaces()
nsFieldSelector := namespaces.GetExcludeNSFieldSelector(opts.NamespacesDenylist)
storeBuilder.WithNamespaces(namespaces, nsFieldSelector)

allowDenyList, err := allowdenylist.New(opts.MetricAllowlist, opts.MetricDenylist)
if err != nil {
klog.Fatal(err)
}

err = allowDenyList.Parse()
if err != nil {
klog.Fatalf("error initializing the allowdeny list : %v", err)
}

klog.Infof("metric allow-denylisting: %v", allowDenyList.Status())

// 设置metric family filter, 可以过滤掉一些指标
optInMetricFamilyFilter, err := optin.NewMetricFamilyFilter(opts.MetricOptInList)
if err != nil {
klog.Fatalf("error initializing the opt-in metric list : %v", err)
}

if optInMetricFamilyFilter.Count() > 0 {
klog.Infof("metrics which were opted into: %v", optInMetricFamilyFilter.Status())
}

// 设置family generator filter
storeBuilder.WithFamilyGeneratorFilter(generator.NewCompositeFamilyGeneratorFilter(
allowDenyList,
optInMetricFamilyFilter,
))

// 设置使用的store
// DefaultGenerateStoresFunc()中的startReflector,这里会启动reflector
storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc(), opts.UseAPIServerCache)

// 在Linux系统中,进程ID(PID)为1的进程通常是init进程(也称为init系统)或systemd进程。
// 该进程是整个系统启动过程中的第一个用户级进程,负责启动和管理其他进程。
// 如果进程本身的pid=1则启动goroutine则注册SIGCHLD,获取子进程的状态信息
// https://man7.org/linux/man-pages/man2/waitpid.2.html
proc.StartReaper()

kubeClient, vpaClient, userClient, err := createKubeClient(opts.Apiserver, opts.Kubeconfig)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}

// 设置Builder结构体变量
storeBuilder.WithKubeClient(kubeClient)
storeBuilder.WithVPAClient(vpaClient)
storeBuilder.WithUserClient(userClient)
storeBuilder.WithSharding(opts.Shard, opts.TotalShards)
storeBuilder.WithAllowAnnotations(opts.AnnotationsAllowList)
storeBuilder.WithAllowLabels(opts.LabelsAllowList)

ksmMetricsRegistry.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
)

var g run.Group

m := metricshandler.New(
opts,
kubeClient,
storeBuilder,
opts.EnableGZIPEncoding,
)
// Run MetricsHandler
{
ctxMetricsHandler, cancel := context.WithCancel(ctx)
g.Add(func() error {
return m.Run(ctxMetricsHandler)
}, func(error) {
cancel()
})
}

tlsConfig := opts.TLSConfig

telemetryMux := buildTelemetryServer(ksmMetricsRegistry)
telemetryListenAddress := net.JoinHostPort(opts.TelemetryHost, strconv.Itoa(opts.TelemetryPort))
telemetryServer := http.Server{Handler: telemetryMux, Addr: telemetryListenAddress}

// 创建metric server,提供接口给prometheus拉取数据
metricsMux := buildMetricsServer(m, durationVec)
metricsServerListenAddress := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
metricsServer := http.Server{Handler: metricsMux, Addr: metricsServerListenAddress}

// Run Telemetry server
{
g.Add(func() error {
klog.Infof("Starting kube-state-metrics self metrics server: %s", telemetryListenAddress)
return web.ListenAndServe(&telemetryServer, tlsConfig, promLogger)
}, func(error) {
ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
telemetryServer.Shutdown(ctxShutDown)
})
}
// Run Metrics server
{
g.Add(func() error {
klog.Infof("Starting metrics server: %s", metricsServerListenAddress)
return web.ListenAndServe(&metricsServer, tlsConfig, promLogger)
}, func(error) {
ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
metricsServer.Shutdown(ctxShutDown)
})
}

if err := g.Run(); err != nil {
klog.Fatalf("RunGroup Error: %v", err)
}
klog.Info("Exiting")
}