kube-controller之job

Job

k8s中,Job是用于运行批处理或单次任务的资源。它代表了一个任务或一组并行任务,直到成功完成。Job通常用于数据处理、备份或其他需要执行一次或定期执行的任务CronJob

以下是k8s中Job的一些关键特性和特点:

  1. 执行:Job确保任务成功完成后才将其标记为“完成”。每个任务都会执行直到完成,可以是一个单独的Pod或一组并行的Pod

  2. Pod创建:Job创建一个或多个Pod来执行任务。这些PodJob控制器自动管理。

  3. 任务并行性:Job可以配置为并行运行任务,可以通过指定并行Pod的数量或定义具有多个工作器的工作队列来实现。

  4. 重启策略:Job具有重启策略,用于确定任务失败或Pod终止时的处理方式。重启策略可以设置为“从不重启”、“仅在失败时重启”或“总是重启”。

  5. 任务完成条件:Job具有完成条件,定义了何时认为Job成功完成。完成条件可以基于成功完成的任务数量或用户指定的其他条件。

  1. 清理:Job可以配置为在任务完成后清理已完成的Pod。这样可以确保在Job完成后不会留下未释放的资源。

Job提供了在k8s中执行批处理和一次性任务的方式。它们是执行非交互式工作负载的基本构建块,可以与其他k8s资源(例如持久卷、配置映射和机密)结合使用,执行复杂的任务。

详细信息可查看官方文档


源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// pkg/controller/job/job_controller.go
var (
// DefaultJobBackOff is the default backoff period. Exported for tests.
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period. Exported for tests.
MaxJobBackOff = 360 * time.Second
// MaxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. Exported for tests
MaxUncountedPods = 500
// MaxPodCreateDeletePerSync is the maximum number of pods that can be
// created or deleted in a single sync call. Exported for tests.
MaxPodCreateDeletePerSync = 500
)

type Controller struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface

// To allow injection of the following for testing.
updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error)
patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error
syncHandler func(ctx context.Context, jobKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
// 判断是否至少出现一次全量同步
podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced

// A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface

// finalizerExpectations tracks the Pod UIDs for which the controller
// expects to observe the tracking finalizer removed.
finalizerExpectations *uidTrackingExpectations

// A store of jobs
jobLister batchv1listers.JobLister

// A store of pods, populated by the podController
podStore corelisters.PodLister

// Jobs that need to be updated
queue workqueue.RateLimitingInterface

// Orphan deleted pods that still have a Job tracking finalizer to be removed
orphanQueue workqueue.RateLimitingInterface

broadcaster record.EventBroadcaster
recorder record.EventRecorder

podUpdateBatchPeriod time.Duration

clock clock.WithTicker

backoffRecordStore *backoffStore
}

func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
return newControllerWithClock(podInformer, jobInformer, kubeClient, &clock.RealClock{})
}

func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller {
eventBroadcaster := record.NewBroadcaster()

jm := &Controller{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
},
expectations: controller.NewControllerExpectations(),
finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)),
orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)),
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock,
backoffRecordStore: newBackoffRecordStore(),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
}

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.deleteJob,
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: func(obj interface{}) {
jm.deletePod(obj, true)
},
})
jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced

// 向apiserver更新job的状态
jm.updateStatusHandler = jm.updateJobStatus
// 设置对应的Handler函数
// 对job进行patch操作
jm.patchJobHandler = jm.patchJob
// 调谐函数
jm.syncHandler = jm.syncJob

metrics.Register()

return jm
}

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
startTime := jm.clock.Now()
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if len(ns) == 0 || len(name) == 0 {
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
// 获取对应的Job
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
// 如果Job不存在,说明Job已经被删除
// 删除对应的expectations
klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(key)

err := jm.backoffRecordStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err)
}
return nil
}
return err
}
// make a copy so we don't mutate the shared cache
job := *sharedJob.DeepCopy()

// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
// 从backoff缓存中删除
err := jm.backoffRecordStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err)
}
return nil
}
// 目前有两种CompletionMode: 1. NonIndexed(default),2. Indexed
// NonIndexed: 当完成的pod数量=.spec.completions表示此job已经完成,每个pod都是一样的
// Indexed: Job关联了一个completon index(0-.spec.completions-1),当每个index上都有
// 一个completed pod,表示此job已完成
// 当CompletionMod=“Indexed",必须指定.spec.completions,.spec.parallelism必须<=100000
// pod名称$(job-name)-$(index)-$(random-string)
// pod hostname:$(job-name)-$(index)
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
return nil
}

// 获取CompletionMode
completionMode := getCompletionMode(&job)
action := metrics.JobSyncActionReconciling

defer func() {
result := "success"
if rErr != nil {
result = "error"
}

metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(jm.clock.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}()

// UncountedTerminatedPods 记录terminated状态但是job controller并没有记录的pod的UID
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)

// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
satisfiedExpectations := jm.expectations.SatisfiedExpectations(key)

pods, err := jm.getPodsForJob(ctx, &job)
if err != nil {
return err
}
// activePod p.Status.Phase != v1.PodSucceeded && p.Status.Phase!=v1.PodFailed && p.DeletionTimestamp==nil
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))

// 返回未记录的最近成功和失败的pod
newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers)
// 更新成功的pod数量
succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
// 更新失败的pod数量
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = pointer.Int32(countReadyPods(activePods))
}

// Job first start. Set StartTime only if the job is not in the suspended state.
if job.Status.StartTime == nil && !jobSuspended(&job) {
now := metav1.NewTime(jm.clock.Now())
job.Status.StartTime = &now
}

newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(jm.clock, key, newSucceededPods, newFailedPods)

var manageJobErr error
var finishedCondition *batch.JobCondition

jobHasNewFailure := failed > job.Status.Failed
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
(failed > *job.Spec.BackoffLimit)

if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
}
}
if finishedCondition == nil {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
} else if jm.pastActiveDeadline(&job) {
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}

var prevSucceededIndexes, succeededIndexes orderedIntervals
// 如果是IndexedJob,统计成功的indexes
if isIndexedJob(&job) {
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
succeeded = int32(succeededIndexes.total())
}
suspendCondChanged := false
// Remove active pods if Job failed.
if finishedCondition != nil {
deleted, err := jm.deleteActivePods(ctx, &job, activePods)
if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet.
finishedCondition = nil
}
active -= deleted
manageJobErr = err
} else {
manageJobCalled := false
if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo)
manageJobCalled = true
}
complete := false
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
complete = succeeded > 0 && active == 0
} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = succeeded >= *job.Spec.Completions && active == 0
}
if complete {
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
} else if manageJobCalled {
// Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make
// updates.
if job.Spec.Suspend != nil && *job.Spec.Suspend {
// Job can be in the suspended state only if it is NOT completed.
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", jm.clock.Now())
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
}
} else {
// Job not suspended.
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed", jm.clock.Now())
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
// Resumed jobs will always reset StartTime to current time. This is
// done because the ActiveDeadlineSeconds timer shouldn't go off
// whilst the Job is still suspended and resetting StartTime is
// consistent with resuming a Job created in the suspended state.
// (ActiveDeadlineSeconds is interpreted as the number of seconds a
// Job is continuously active.)
now := metav1.NewTime(jm.clock.Now())
job.Status.StartTime = &now
}
}
}
}

needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active
job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo)
if err != nil {
if apierrors.IsConflict(err) {
// we probably have a stale informer cache
// so don't return an error to avoid backoff
jm.enqueueController(&job, false)
return nil
}
return fmt.Errorf("tracking status: %w", err)
}

// 判断Job是否已经完成
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return fmt.Errorf("failed pod(s) detected for job key %q", key)
}

return manageJobErr
}

// pkg/controller/controller_utils.go
func IsJobFinished(j *batch.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true
}
}
return false
}

REF:
1.pkg/controller/job/job_controller.go
2.pkg/controller/controller_utils.go