kubelet之prober

Kubernetes 中,kubelet 组件负责管理和运行每个节点上的容器。kubelet 组件包含一个名为 prober 的子组件,它是用于容器的健康检查的一部分。

proberkubelet 组件的一部分,它负责执行容器的健康检查。健康检查是一种用于确定容器是否正常运行的机制。prober 会定期向容器发送健康检查请求,并根据容器的响应来确定其状态。

prober 支持多种健康检查方式,包括:

  • HTTP 健康检查:向容器的指定端口发送 HTTP 请求,并根据响应的状态码判断容器的健康状态。
  • TCP 健康检查:向容器的指定端口发送 TCP 连接请求,并根据连接是否成功判断容器的健康状态。
  • Exec 健康检查:在容器内部执行指定的命令,并根据命令的执行结果判断容器的健康状态。
  • gRPC 健康检查:与容器内的gRPC服务建立连接,并发送指定的gRPC 请求。根据服务端返回的响应,prober 可以确定服务是否正常运行
    通过定期执行这些健康检查,prober 可以监测容器的运行状态,并及时采取相应的措施,如重启容器或报告容器的健康状态给 Kubernetes 控制平面。

prober
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// pkg/kubelet/prober/prober.go

// Prober helps to check the liveness/readiness/startup of a container.
type prober struct {
exec execprobe.Prober
http httpprobe.Prober
tcp tcpprobe.Prober
grpc grpcprobe.Prober
runner kubecontainer.CommandRunner

recorder record.EventRecorder
}

func newProber(
runner kubecontainer.CommandRunner,
recorder record.EventRecorder) *prober {

const followNonLocalRedirects = false
return &prober{
exec: execprobe.New(),
http: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
grpc: grpcprobe.New(),
runner: runner,
recorder: recorder,
}
}

func (pb *prober) probe(ctx context.Context, probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
switch probeType {
case readiness:
probeSpec = container.ReadinessProbe
case liveness:
probeSpec = container.LivenessProbe
case startup:
probeSpec = container.StartupProbe
default:
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
}

if probeSpec == nil {
klog.InfoS("Probe is nil", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
return results.Success, nil
}

result, output, err := pb.runProbeWithRetries(ctx, probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
if err != nil || (result != probe.Success && result != probe.Warning) {
// Probe failed in one way or another.
if err != nil {
klog.V(1).ErrorS(err, "Probe errored", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
} else { // result != probe.Success
klog.V(1).InfoS("Probe failed", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "probeResult", result, "output", output)
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
}
return results.Failure, err
}
if result == probe.Warning {
pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
klog.V(3).InfoS("Probe succeeded with a warning", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "output", output)
} else {
klog.V(3).InfoS("Probe succeeded", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
}
return results.Success, nil
}

// 有重试的probe, 成功后立刻返回;如果一直不成功返回最后一次的结果
func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
if err == nil {
return result, output, nil
}
}
return result, output, err
}

func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
// 分别执行对应的健康检查
if p.Exec != nil {
klog.V(4).InfoS("Exec-Probe runProbe", "pod", klog.KObj(pod), "containerName", container.Name, "execCommand", p.Exec.Command)
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
}
if p.HTTPGet != nil {
req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
if err != nil {
return probe.Unknown, "", err
}
if klogV4 := klog.V(4); klogV4.Enabled() {
port := req.URL.Port()
host := req.URL.Hostname()
path := req.URL.Path
scheme := req.URL.Scheme
headers := p.HTTPGet.HTTPHeaders
klogV4.InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout, "headers", headers)
}
return pb.http.Probe(req, timeout)
}
if p.TCPSocket != nil {
port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
if err != nil {
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
if host == "" {
host = status.PodIP
}
klog.V(4).InfoS("TCP-Probe", "host", host, "port", port, "timeout", timeout)
return pb.tcp.Probe(host, port, timeout)
}

if p.GRPC != nil {
host := status.PodIP
service := ""
if p.GRPC.Service != nil {
service = *p.GRPC.Service
}
klog.V(4).InfoS("GRPC-Probe", "host", host, "service", service, "port", p.GRPC.Port, "timeout", timeout)
return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
}

klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// pkg/kubelet/prober/prober_manager.go
// Manager它用于管理Pod的探测(probing)过程。对于每个指定了探测(probe)的容器(container),
//Manager会创建一个探测工作者(probe worker)。探测工作者会定期对其所分配的容器进行探
//测,并缓存探测结果。当需要时(通过UpdatePodStatus方法请求),Manager会使用缓存的探测结果//来设置相应的Ready状态(PodStatus)
type Manager interface {
// 为每个创建了probe的容器创建probe workers.每个新建的pod都应该调用这个方法
AddPod(pod *v1.Pod)

// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)


// 删除removed pod状态,包括终止probe workers和删除缓存
RemovePod(pod *v1.Pod)

// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[types.UID]sets.Empty)

// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *v1.PodStatus)
}


// manager 实现了Manager接口
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
// Lock for accessing & mutating workers
workerLock sync.RWMutex

// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager

// readinessManager manages the results of readiness probes
readinessManager results.Manager

// livenessManager manages the results of liveness probes
livenessManager results.Manager

// startupManager manages the results of startup probes
startupManager results.Manager

// prober executes the probe actions.
prober *prober

start time.Time
}

func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()

key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name

if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok {
klog.V(8).ErrorS(nil, "Startup probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name)
return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}

if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}

if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
klog.V(8).ErrorS(nil, "Liveness probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}

func (m *manager) StopLivenessAndStartup(pod *v1.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()

key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}
}
}

func (m *manager) RemovePod(pod *v1.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()

key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{readiness, liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}
}
}

func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()

for key, worker := range m.workers {
if _, ok := desiredPods[key.podUID]; !ok {
worker.stop()
}
}
}

func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
for i, c := range podStatus.ContainerStatuses {
var started bool
if c.State.Running == nil {
started = false
} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
started = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getWorker(podUID, c.Name, startup)
started = !exists
}
podStatus.ContainerStatuses[i].Started = &started

if started {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok && result == results.Success {
ready = true
} else {
// The check whether there is a probe which hasn't run yet.
w, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists // no readinessProbe -> always ready
if exists {
// Trigger an immediate run of the readinessProbe to update ready state
select {
case w.manualTriggerCh <- struct{}{}:
default: // Non-blocking.
klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
}
}
}
podStatus.ContainerStatuses[i].Ready = ready
}
}
// init containers are ready if they have exited with success or if a readiness probe has
// succeeded.
for i, c := range podStatus.InitContainerStatuses {
var ready bool
if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
ready = true
}
podStatus.InitContainerStatuses[i].Ready = ready
}
}


// pkg/kubelet/prober/worker.go
// 同期性的执行探针健康检测
func (w *worker) run() {
ctx := context.Background()
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
// Do it only if the kubelet has started recently.
if probeTickerPeriod > time.Since(w.probeManager.start) {
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
}

probeTicker := time.NewTicker(probeTickerPeriod)

defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}

w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
ProberResults.Delete(w.proberResultsFailedMetricLabels)
ProberResults.Delete(w.proberResultsUnknownMetricLabels)
ProberDuration.Delete(w.proberDurationSuccessfulMetricLabels)
ProberDuration.Delete(w.proberDurationUnknownMetricLabels)
}()

probeLoop:
for w.doProbe(ctx) {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
case <-w.manualTriggerCh:
// continue
}
}
}
execProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// pkg/probe/exec/exec.go
// 返回结构体execProber(实现了Prober接口)
func New() Prober {
return execProber{}
}

// Prober is an interface defining the Probe object for container readiness/liveness checks.
type Prober interface {
Probe(e exec.Cmd) (probe.Result, string, error)
}

type execProber struct{}

func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
var dataBuffer bytes.Buffer
writer := ioutils.LimitWriter(&dataBuffer, maxReadLength)

e.SetStderr(writer)
e.SetStdout(writer)
// Start and Wait are for running a process non-blocking
// 以非阻塞的方式执行命令,不等待结果返回
err := e.Start()
// 执行成功后必须调用Wait方法释放系统资源
if err == nil {
err = e.Wait()
}
data := dataBuffer.Bytes()

klog.V(4).Infof("Exec probe response: %q", string(data))
if err != nil {
exit, ok := err.(exec.ExitError)
if ok {
// ExitStatus() == 0 命令执行成功
if exit.ExitStatus() == 0 {
return probe.Success, string(data), nil
}
return probe.Failure, string(data), nil
}

timeoutErr, ok := err.(*TimeoutError)
if ok {
if utilfeature.DefaultFeatureGate.Enabled(features.ExecProbeTimeout) {
// When exec probe timeout, data is empty, so we should return timeoutErr.Error() as the stdout.
return probe.Failure, timeoutErr.Error(), nil
}

klog.Warningf("Exec probe timed out after %s but ExecProbeTimeout feature gate was disabled", timeoutErr.Timeout())
}
// 返回的error不是ExitError,则返回Unknown
return probe.Unknown, "", err
}
// 命令执行未出错返回Success
return probe.Success, string(data), nil
}
tcpProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// pkg/probe/tcp/tcp.go
func New() Prober {
return tcpProber{}
}

// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks.
type Prober interface {
Probe(host string, port int, timeout time.Duration) (probe.Result, string, error)
}

type tcpProber struct{}

// Probe checks that a TCP connection to the address can be opened.
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}

// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
d := probe.ProbeDialer()
d.Timeout = timeout
conn, err := d.Dial("tcp", addr)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}
httpProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// pkg/probe/http/http.go
// New creates Prober that will skip TLS verification while probing.
// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname.
// If disabled, redirects to other hosts will trigger a warning result.
func New(followNonLocalRedirects bool) Prober {
tlsConfig := &tls.Config{InsecureSkipVerify: true}
return NewWithTLSConfig(tlsConfig, followNonLocalRedirects)
}

// NewWithTLSConfig takes tls config as parameter.
// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname.
// If disabled, redirects to other hosts will trigger a warning result.
func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) Prober {
// We do not want the probe use node's local proxy set.
transport := utilnet.SetTransportDefaults(
&http.Transport{
TLSClientConfig: config,
DisableKeepAlives: true,
Proxy: http.ProxyURL(nil),
DisableCompression: true, // removes Accept-Encoding header
// DialContext creates unencrypted TCP connections
// and is also used by the transport for HTTPS connection
DialContext: probe.ProbeDialer().DialContext,
})

return httpProber{transport, followNonLocalRedirects}
}

// Prober is an interface that defines the Probe function for doing HTTP readiness/liveness checks.
type Prober interface {
Probe(req *http.Request, timeout time.Duration) (probe.Result, string, error)
}

type httpProber struct {
transport *http.Transport
followNonLocalRedirects bool
}

// Probe returns a ProbeRunner capable of running an HTTP check.
func (pr httpProber) Probe(req *http.Request, timeout time.Duration) (probe.Result, string, error) {
client := &http.Client{
Timeout: timeout,
Transport: pr.transport,
CheckRedirect: RedirectChecker(pr.followNonLocalRedirects),
}
return DoHTTPProbe(req, client)
}

// GetHTTPInterface is an interface for making HTTP requests, that returns a response and error.
type GetHTTPInterface interface {
Do(req *http.Request) (*http.Response, error)
}

// DoHTTPProbe checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(req *http.Request, client GetHTTPInterface) (probe.Result, string, error) {
url := req.URL
headers := req.Header
res, err := client.Do(req)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
defer res.Body.Close()
b, err := utilio.ReadAtMost(res.Body, maxRespBodyLength)
if err != nil {
if err == utilio.ErrLimitReached {
klog.V(4).Infof("Non fatal body truncation for %s, Response: %v", url.String(), *res)
} else {
return probe.Failure, "", err
}
}
body := string(b)
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
if res.StatusCode >= http.StatusMultipleChoices { // Redirect
klog.V(4).Infof("Probe terminated redirects for %s, Response: %v", url.String(), *res)
return probe.Warning, fmt.Sprintf("Probe terminated redirects, Response body: %v", body), nil
}
klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
return probe.Success, body, nil
}
klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
// Note: Until https://issue.k8s.io/99425 is addressed, this user-facing failure message must not contain the response body.
failureMsg := fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode)
return probe.Failure, failureMsg, nil
}
grpcProbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// pkg/probe/grpc/grpc.go
// Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks.
type Prober interface {
Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error)
}

type grpcProber struct {
}

// New Prober for execute grpc probe
func New() Prober {
return grpcProber{}
}

// Probe executes a grpc call to check the liveness/readiness/startup of container.
// Returns the Result status, command output, and errors if any.
// Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior.
// err is always nil
func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) {
v := version.Get()

opts := []grpc.DialOption{
grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()), //credentials are currently not supported
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return probe.ProbeDialer().DialContext(ctx, "tcp", addr)
}),
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)

defer cancel()

addr := net.JoinHostPort(host, fmt.Sprintf("%d", port))
conn, err := grpc.DialContext(ctx, addr, opts...)

if err != nil {
if err == context.DeadlineExceeded {
klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil
} else {
klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil
}
}

defer func() {
_ = conn.Close()
}()

client := grpchealth.NewHealthClient(conn)

resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{
Service: service,
})

if err != nil {
stat, ok := status.FromError(err)
if ok {
switch stat.Code() {
case codes.Unimplemented:
klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil
case codes.DeadlineExceeded:
klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil
default:
klog.V(4).ErrorS(err, "rpc probe failed")
}
} else {
klog.V(4).ErrorS(err, "health rpc probe failed")
}

return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil
}

if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil
}

return probe.Success, fmt.Sprintf("service healthy"), nil
}
启动入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pkg/kubelet/kubelet.go
// 初始化,NewMainKubelet省略了部分参数
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...) {
...
if kubeDeps.ProbeManager != nil {
klet.probeManager = kubeDeps.ProbeManager
} else {
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
klet.runner,
kubeDeps.Recorder)
}
...
}
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
...
// 写入缓存,并启动probe workers
kl.probeManager.AddPod(pod)
...
}

REF:
1.pkg/kubelet/prober/prober.go
2.pkg/kubelet/prober/prober_manager.go
3.pkg/kubelet/prober/worker.go
4.pkg/probe/exec/exec.go
5.pkg/probe/tcp/tcp.go
6.pkg/probe/http/http.go
7.pkg/probe/grpc/grpc.go
8.pkg/kubelet/kubelet.go