Skip to content

Commit

Permalink
[batch/job] Allow unsuspending a job with a parallelism equal or less…
Browse files Browse the repository at this point in the history
… then min. (#3152)

The minimum can be reached in the usual partial admission operation.
Values less then minimum can be reached in case of readmitting jobs
which have marked a good part of their pods as reclaimable.
  • Loading branch information
trasc authored Sep 30, 2024
1 parent 71af438 commit 3e3f45d
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/controller/jobs/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (w *JobWebhook) validateCreate(job *Job) field.ErrorList {
var allErrs field.ErrorList
allErrs = append(allErrs, jobframework.ValidateJobOnCreate(job)...)
allErrs = append(allErrs, w.validatePartialAdmissionCreate(job)...)
allErrs = append(allErrs, w.validateSyncCompletionCreate(job)...)
return allErrs
}

Expand All @@ -137,6 +138,11 @@ func (w *JobWebhook) validatePartialAdmissionCreate(job *Job) field.ErrorList {
allErrs = append(allErrs, field.Invalid(minPodsCountAnnotationsPath, v, fmt.Sprintf("should be between 0 and %d", job.podsCount()-1)))
}
}
return allErrs
}

func (w *JobWebhook) validateSyncCompletionCreate(job *Job) field.ErrorList {
var allErrs field.ErrorList
if strVal, found := job.Annotations[JobCompletionsEqualParallelismAnnotation]; found {
enabled, err := strconv.ParseBool(strVal)
if err != nil {
Expand Down Expand Up @@ -170,7 +176,12 @@ func (w *JobWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.
}

func (w *JobWebhook) validateUpdate(oldJob, newJob *Job) field.ErrorList {
allErrs := w.validateCreate(newJob)
var allErrs field.ErrorList
allErrs = append(allErrs, jobframework.ValidateJobOnCreate(newJob)...)
if newJob.Annotations[JobMinParallelismAnnotation] != oldJob.Annotations[JobMinParallelismAnnotation] {
allErrs = append(allErrs, w.validatePartialAdmissionCreate(newJob)...)
}
allErrs = append(allErrs, w.validateSyncCompletionCreate(newJob)...)
allErrs = append(allErrs, jobframework.ValidateJobOnUpdate(oldJob, newJob)...)
allErrs = append(allErrs, validatePartialAdmissionUpdate(oldJob, newJob)...)
return allErrs
Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/jobs/job/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,36 @@ func TestValidateUpdate(t *testing.T) {
Obj(),
wantErr: nil,
},
{
name: "can update the kueue.x-k8s.io/job-min-parallelism annotation",
oldJob: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(6).
SetAnnotation(JobMinParallelismAnnotation, "3").
Obj(),
newJob: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(6).
SetAnnotation(JobMinParallelismAnnotation, "2").
Obj(),
wantErr: nil,
},
{
name: "validates kueue.x-k8s.io/job-min-parallelism annotation value (bad format)",
oldJob: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(6).
SetAnnotation(JobMinParallelismAnnotation, "3").
Obj(),
newJob: testingutil.MakeJob("job", "default").
Parallelism(4).
Completions(6).
SetAnnotation(JobMinParallelismAnnotation, "NaN").
Obj(),
wantErr: field.ErrorList{
field.Invalid(minPodsCountAnnotationsPath, "NaN", "strconv.Atoi: parsing \"NaN\": invalid syntax"),
},
},
}

for _, tc := range testcases {
Expand Down
39 changes: 39 additions & 0 deletions test/integration/webhook/jobs/job_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,43 @@ var _ = ginkgo.Describe("Job Webhook with manageJobsWithoutQueueName disabled",
createdJob.Spec.Suspend = ptr.To(false)
gomega.Expect(k8sClient.Update(ctx, createdJob)).ShouldNot(gomega.Succeed())
})

ginkgo.It("should allow unsuspending a partially admissible job with its minimum parallelism", func() {
job := testingjob.MakeJob("job-with-queue-name", ns.Name).Queue("queue").
Parallelism(6).
Completions(6).
SetAnnotation(job.JobMinParallelismAnnotation, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())

createdJob.Spec.Parallelism = ptr.To[int32](4)
createdJob.Spec.Suspend = ptr.To(false)
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
})

ginkgo.It("should allow unsuspending a partially admissible job with a parallelism lower then minimum", func() {
// This can happen if the job:
// 1. Is admitted
// 2. Makes progress and increments the reclaimable counts
// 3. Is evicted
// 4. Is re-admitted (the parallelism being less then min due to reclaim)
job := testingjob.MakeJob("job-with-queue-name", ns.Name).Queue("queue").
Parallelism(6).
Completions(6).
SetAnnotation(job.JobMinParallelismAnnotation, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())

createdJob.Spec.Parallelism = ptr.To[int32](3)
createdJob.Spec.Suspend = ptr.To(false)
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
})
})

0 comments on commit 3e3f45d

Please sign in to comment.