kube-state-metrics

kube-state-metrics(KSM)是一个简单的服务,它监听Kubernetes API服务器并生成关于对象状态的指标(请参见下面“指标”部分中的示例)。它的重点不在于个别Kubernetes组件的健康状况,而是关注其中各种对象(如部署、节点和Pod)的健康状况。

kube-state-metrics通过从Kubernetes API对象生成指标,而无需修改这些对象,确保了其提供的功能与Kubernetes API对象本身具有相同的稳定性。这意味着在某些情况下,kube-state-metrics可能不会显示与kubectl完全相同的值,因为kubectl会应用一定的启发式方法来显示可理解的消息。kube-state-metrics提供了未经修改的来自Kubernetes API的原始数据,这样用户就可以获得他们所需的所有数据,并根据需要执行启发式方法。

这些指标通过HTTP端点 /metrics 在监听端口(默认为8080)上以纯文本形式导出。它们旨在被Prometheus本身或与抓取Prometheus客户端端点兼容的抓取器消费。您也可以在浏览器中打开 /metrics 来查看原始指标数据。请注意,/metrics 端点上公开的指标反映了Kubernetes集群的当前状态。当Kubernetes对象被删除时,它们将不再在 /metrics 端点上可见。


kube-state-metrics新增自定义的crd指标
指标如下

Metric Name 描述
user_cpu_total 用户可以使用的cpu
user_memory_total 用户可以使用的memory

kube-state-metricsinternal/store目录下记录了所有的metricFamily,所以我们需要的指标写在这个目录下面。

internal/store下面新增了三个文件

1
2
3
user_types.go
user_test.go
user.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// internal/store/user_types.go

// 因为我们这次监听的CRD是kubeshper User,这里直接把代码copy过来
// 当然也不是一定要使用结构体,一些情况下可以使用unstructured.Unstructured{}
type User struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec UserSpec `json:"spec"`
// +optional
Status UserStatus `json:"status,omitempty"`
}

type FinalizerName string

// UserSpec defines the desired state of User
type UserSpec struct {
// Unique email address(https://www.ietf.org/rfc/rfc5322.txt).
Email string `json:"email"`
// The preferred written or spoken language for the user.
// +optional
Lang string `json:"lang,omitempty"`
// Description of the user.
// +optional
Description string `json:"description,omitempty"`
// +optional
DisplayName string `json:"displayName,omitempty"`
// +optional
Groups []string `json:"groups,omitempty"`

EncryptedPassword string `json:"password,omitempty"`
}

type UserState string

// These are the valid phases of a user.
const (
// UserActive means the user is available.
UserActive UserState = "Active"
// UserDisabled means the user is disabled.
UserDisabled UserState = "Disabled"
// UserAuthLimitExceeded means restrict user login.
UserAuthLimitExceeded UserState = "AuthLimitExceeded"

AuthenticatedSuccessfully = "authenticated successfully"
)

// UserStatus defines the observed state of User
type UserStatus struct {
// The user status
// +optional
State UserState `json:"state,omitempty"`
// +optional
Reason string `json:"reason,omitempty"`
// +optional
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"`
// Last login attempt timestamp
// +optional
LastLoginTime *metav1.Time `json:"lastLoginTime,omitempty"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// internal/store/user.go
import (
"context"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-state-metrics/v2/pkg/metric"
generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator"
)

var userAnnotationCpuLimitKey = "user-cpu-limit"
var userAnnotationMemoryLimitKey = "user-memory-limit"
var schemeGroupVersionResource = schema.GroupVersionResource{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "users"}
var descUserDefaultLabels = []string{"user"}

type CpuOrMemory string

var Cpu CpuOrMemory = "cpu"
var Memory CpuOrMemory = "memory"

// 定义了MetricFamilies
func userMetricFamilies() []generator.FamilyGenerator {
return []generator.FamilyGenerator{
*generator.NewFamilyGenerator(
"kube_user_cpu_total",
"The number of user's cpu core.",
metric.Gauge,
"",
wrapUserFunc(func(u *User) *metric.Family {
return &metric.Family{
Metrics: []*metric.Metric{
{
// 设置metric kube_user_cpu_total的值
Value: getUserLimit(u, Cpu),
},
},
}
}),
),
*generator.NewFamilyGenerator(
"kube_user_memory_total",
"The byte of user's memory.",
metric.Gauge,
"",
wrapUserFunc(func(u *User) *metric.Family {
return &metric.Family{
Metrics: []*metric.Metric{
{
// 设置metric kube_user_memory_total的值
Value: getUserLimit(u, Memory),
},
},
}
}),
),
}
}

func getUserLimit(u *User, metricType CpuOrMemory) float64 {
s, _ := u.Annotations[userAnnotationCpuLimitKey]
if metricType == Memory {
s, _ = u.Annotations[userAnnotationMemoryLimitKey]
}
limit, err := resource.ParseQuantity(s)
if err != nil {
return float64(0)
}

return limit.AsApproximateFloat64()
}

func wrapUserFunc(f func(*User) *metric.Family) func(interface{}) *metric.Family {
return func(obj interface{}) *metric.Family {
//user := obj.(*User)
var user User
userUnstructured := obj.(*unstructured.Unstructured)
err := runtime.DefaultUnstructuredConverter.FromUnstructured(userUnstructured.UnstructuredContent(), &user)
if err != nil {
print("transfer from unstructured to object error", err)
}
metricFamily := f(&user)

for _, m := range metricFamily.Metrics {
m.LabelKeys = append(descUserDefaultLabels, m.LabelKeys...)
m.LabelValues = append([]string{user.Name}, m.LabelValues...)
}
return metricFamily
}
}

// createUserListWatchFunc 为user创建List Watch
// 这里采用多层函数嵌套可以不用返回待定的结构,返回函数只要实现了对应的接口就行
func createUserListWatchFunc(userClient dynamic.Interface) func(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher {
return func(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
return userClient.Resource(schemeGroupVersionResource).List(context.TODO(), opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
return userClient.Resource(schemeGroupVersionResource).Watch(context.TODO(), opts)
},
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// internal/store/user_test.go
func TestUserStore(t *testing.T) {
const metadata = `
# HELP kube_user_cpu_total The number of user's cpu core.
# TYPE kube_user_cpu_total gauge
# HELP kube_user_memory_total The byte of user's memory.
# TYPE kube_user_memory_total gauge
`
cases := []generateMetricsTestCase{
{
AllowAnnotationsList: []string{userAnnotationCpuLimitKey, userAnnotationMemoryLimitKey},
Obj: &User{
ObjectMeta: metav1.ObjectMeta{
Name: "bob",
Namespace: "ns1",
Annotations: map[string]string{
userAnnotationCpuLimitKey: "1",
userAnnotationMemoryLimitKey: "1G",
},
},
},
Want: metadata + `
kube_user_cpu_total{user="bob"} 1
kube_user_memory_total{user="bob"} 1e+09
`,
},
}
for i, c := range cases {
c.Func = generator.ComposeMetricGenFuncs(userMetricFamilies())
c.Headers = generator.ExtractMetricFamilyHeaders(userMetricFamilies())
if err := c.run(); err != nil {
t.Errorf("unexpected collecting result in %vth run:\n%s", i, err)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// internal/store/builder.go

type Builder struct {
kubeClient clientset.Interface
vpaClient vpaclientset.Interface
// 因为我们新增监听对象是CRD,所以需要创建一个单独的client才能获取对应的资源对象
// 新增dynamic.Interface,用于list,watch CRD
+ userClient dynamic.Interface
namespaces options.NamespaceList
namespaceFilter string
ctx context.Context
enabledResources []string
familyGeneratorFilter generator.FamilyGeneratorFilter
listWatchMetrics *watch.ListWatchMetrics
shardingMetrics *sharding.Metrics
shard int32
totalShards int
buildStoresFunc ksmtypes.BuildStoresFunc
allowAnnotationsList map[string][]string
allowLabelsList map[string][]string
useAPIServerCache bool
}


+ // WithUserClient sets the userClient property of a Builder
+ func (b *Builder) WithUserClient(c dynamic.Interface) {
+ b.userClient = c
+ }


var availableStores = map[string]func(f *Builder) []cache.Store{
...
+ "users": func(b *Builder) []cache.Store { return b.buildUserStores() },
}


+ func (b *Builder) buildUserStores() []cache.Store {
return b.buildStoresFunc(userMetricFamilies(), &unstructured.Unstructured{}, createUserListWatchFunc(b.userClient), b.useAPIServerCache)
+ }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// main.go
func main() {
// 新增创建userClient
kubeClient, vpaClient, userClient, err := createKubeClient(opts.Apiserver, opts.Kubeconfig)
...
storeBuilder.WithUserClient(userClient)

}

func createKubeClient(apiserver string, kubeconfig string) (clientset.Interface, vpaclientset.Interface, dynamic.Interface, error) {
config, err := clientcmd.BuildConfigFromFlags(apiserver, kubeconfig)
if err != nil {
return nil, nil, nil, err
}

config.UserAgent = version.Version
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"

kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
}

vpaClient, err := vpaclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
}

+ userClient, err := dynamic.NewForConfig(config)
+ if err != nil {
+ return nil, nil, nil, err
+ }
// Informers don't seem to do a good job logging error messages when it
// can't reach the server, making debugging hard. This makes it easier to
// figure out if apiserver is configured incorrectly.
klog.Infof("Testing communication with server")
v, err := kubeClient.Discovery().ServerVersion()
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error while trying to communicate with apiserver")
}
klog.Infof("Running with Kubernetes cluster version: v%s.%s. git version: %s. git tree state: %s. commit: %s. platform: %s",
v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform)
klog.Infof("Communication with server successful")

return kubeClient, vpaClient, userClient, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// pkg/options/resource.go

var (
// DefaultNamespaces is the default namespace selector for selecting and filtering across all namespaces.
DefaultNamespaces = NamespaceList{metav1.NamespaceAll}

// DefaultResources represents the default set of resources in kube-state-metrics.
DefaultResources = ResourceSet{
"certificatesigningrequests": struct{}{},
"configmaps": struct{}{},
"cronjobs": struct{}{},
"daemonsets": struct{}{},
"deployments": struct{}{},
"endpoints": struct{}{},
"horizontalpodautoscalers": struct{}{},
"ingresses": struct{}{},
"jobs": struct{}{},
"leases": struct{}{},
"limitranges": struct{}{},
"mutatingwebhookconfigurations": struct{}{},
"namespaces": struct{}{},
"networkpolicies": struct{}{},
"nodes": struct{}{},
"persistentvolumes": struct{}{},
"persistentvolumeclaims": struct{}{},
"poddisruptionbudgets": struct{}{},
"pods": struct{}{},
"replicasets": struct{}{},
"replicationcontrollers": struct{}{},
"resourcequotas": struct{}{},
"secrets": struct{}{},
"services": struct{}{},
"statefulsets": struct{}{},
"storageclasses": struct{}{},
"validatingwebhookconfigurations": struct{}{},
"volumeattachments": struct{}{},
+ "users": struct{}{},
}
)

REF:
1.https://github.com/kubernetes/kube-state-metrics