From 7ad04e3d28f57f742a6330cd86c35164de22c244 Mon Sep 17 00:00:00 2001 From: Mark Sumner Date: Thu, 7 Sep 2023 11:20:24 +0100 Subject: [PATCH] Remove gang job cardinality submit check. Add placeholder for min gang size --- internal/armada/configuration/constants.go | 5 +- internal/armada/server/lease.go | 2 +- internal/common/validation/job.go | 49 +++++++------- internal/common/validation/job_test.go | 64 ++++++++++++++----- internal/scheduler/common.go | 33 +++++++--- .../scheduler/preempting_queue_scheduler.go | 2 +- internal/scheduler/queue_scheduler.go | 2 +- internal/scheduler/queue_scheduler_test.go | 2 +- internal/scheduler/scheduling_algo.go | 2 +- 9 files changed, 103 insertions(+), 58 deletions(-) diff --git a/internal/armada/configuration/constants.go b/internal/armada/configuration/constants.go index 51669f2aec6..5275264e2c8 100644 --- a/internal/armada/configuration/constants.go +++ b/internal/armada/configuration/constants.go @@ -5,8 +5,11 @@ const ( // All jobs in a gang are guaranteed to be scheduled onto the same cluster at the same time. GangIdAnnotation = "armadaproject.io/gangId" // GangCardinalityAnnotation All jobs in a gang must specify the total number of jobs in the gang via this annotation. - // The cardinality should be expressed as an integer, e.g., "3". + // The cardinality should be expressed as a positive integer, e.g., "3". GangCardinalityAnnotation = "armadaproject.io/gangCardinality" + // GangMinimumCardinalityAnnotation All jobs in a gang must specify the minimum size for the gang to be schedulable via this annotation. + // The cardinality should be expressed as a positive integer, e.g., "3". + GangMinimumCardinalityAnnotation = "armadaproject.io/gangMinimumCardinality" // The jobs that make up a gang may be constrained to be scheduled across a set of uniform nodes. // Specifically, if provided, all gang jobs are scheduled onto nodes for which the value of the provided label is equal. // Used to ensure, e.g., that all gang jobs are scheduled onto the same cluster or rack. diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 508206d9758..d2f8a063c5c 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -380,7 +380,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL // Group gangs. for _, job := range jobs { - gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job) + gangId, _, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job) if err != nil { return nil, err } diff --git a/internal/common/validation/job.go b/internal/common/validation/job.go index 5b50f6d8715..60e89778a4e 100644 --- a/internal/common/validation/job.go +++ b/internal/common/validation/job.go @@ -12,7 +12,7 @@ import ( ) func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error { - if err := validateGangs(jobs); err != nil { + if _, err := validateGangs(jobs); err != nil { return err } for _, job := range jobs { @@ -23,51 +23,58 @@ func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) err return nil } -func validateGangs(jobs []*api.Job) error { - gangDetailsByGangId := make(map[string]struct { - actualCardinality int - expectedCardinality int - expectedPriorityClassName string - expectedNodeUniformityLabel string - }) +type gangDetails = struct { + expectedCardinality int + expectedMinimumCardinality int + expectedPriorityClassName string + expectedNodeUniformityLabel string +} + +func validateGangs(jobs []*api.Job) (map[string]gangDetails, error) { + gangDetailsByGangId := make(map[string]gangDetails) for i, job := range jobs { annotations := job.Annotations - gangId, gangCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations) + gangId, gangCardinality, gangMinimumCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations) nodeUniformityLabel := annotations[configuration.GangNodeUniformityLabelAnnotation] if err != nil { - return errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId) + return nil, errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId) } if !isGangJob { continue } if gangId == "" { - return errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id) + return nil, errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id) } podSpec := util.PodSpecFromJob(job) if details, ok := gangDetailsByGangId[gangId]; ok { if details.expectedCardinality != gangCardinality { - return errors.Errorf( + return nil, errors.Errorf( "inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d", i, job.Id, gangId, details.expectedCardinality, gangCardinality, ) } + if details.expectedMinimumCardinality != gangMinimumCardinality { + return nil, errors.Errorf( + "inconsistent gang minimum cardinality for %d-th job with id %s in gang %s: expected %d but got %d", + i, job.Id, gangId, details.expectedMinimumCardinality, gangMinimumCardinality, + ) + } if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName { - return errors.Errorf( + return nil, errors.Errorf( "inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s", i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName, ) } if nodeUniformityLabel != details.expectedNodeUniformityLabel { - return errors.Errorf( + return nil, errors.Errorf( "inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s", i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel, ) } - details.actualCardinality++ gangDetailsByGangId[gangId] = details } else { - details.actualCardinality = 1 details.expectedCardinality = gangCardinality + details.expectedMinimumCardinality = gangMinimumCardinality if podSpec != nil { details.expectedPriorityClassName = podSpec.PriorityClassName } @@ -75,15 +82,7 @@ func validateGangs(jobs []*api.Job) error { gangDetailsByGangId[gangId] = details } } - for gangId, details := range gangDetailsByGangId { - if details.expectedCardinality != details.actualCardinality { - return errors.Errorf( - "unexpected number of jobs for gang %s: expected %d jobs but got %d", - gangId, details.expectedCardinality, details.actualCardinality, - ) - } - } - return nil + return gangDetailsByGangId, nil } func ValidateApiJob(job *api.Job, config configuration.SchedulingConfig) error { diff --git a/internal/common/validation/job_test.go b/internal/common/validation/job_test.go index 08b54a9c719..420bea9301f 100644 --- a/internal/common/validation/job_test.go +++ b/internal/common/validation/job_test.go @@ -107,14 +107,15 @@ func Test_ValidateJobSubmitRequestItem_WithPortRepeatedInSeperateConfig(t *testi func TestValidateGangs(t *testing.T) { tests := map[string]struct { - Jobs []*api.Job - ExpectSuccess bool + Jobs []*api.Job + ExpectSuccess bool + ExpectedGangMinimumCardinalityByGangId map[string]int }{ "no gang jobs": { Jobs: []*api.Job{{}, {}}, ExpectSuccess: true, }, - "complete gang job of cardinality 1": { + "complete gang job of cardinality 1 with no minimum cardinality provided": { Jobs: []*api.Job{ { Annotations: map[string]string{ @@ -123,7 +124,21 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: true, + ExpectSuccess: true, + ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 1}, + }, + "complete gang job of cardinality 2 with minimum cardinality of 1": { + Jobs: []*api.Job{ + { + Annotations: map[string]string{ + configuration.GangIdAnnotation: "foo", + configuration.GangCardinalityAnnotation: strconv.Itoa(2), + configuration.GangMinimumCardinalityAnnotation: strconv.Itoa(1), + }, + }, + }, + ExpectSuccess: true, + ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 1}, }, "empty gangId": { Jobs: []*api.Job{ @@ -134,7 +149,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "complete gang job of cardinality 3": { Jobs: []*api.Job{ @@ -157,7 +173,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: true, + ExpectSuccess: true, + ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3}, }, "two complete gangs": { Jobs: []*api.Job{ @@ -192,9 +209,10 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: true, + ExpectSuccess: true, + ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3, "bar": 2}, }, - "one complete and one incomplete gang": { + "one complete and one incomplete gang are passed through": { Jobs: []*api.Job{ { Annotations: map[string]string{ @@ -221,7 +239,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: true, + ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3, "bar": 2}, }, "missing cardinality": { Jobs: []*api.Job{ @@ -237,7 +256,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "invalid cardinality": { Jobs: []*api.Job{ @@ -253,7 +273,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "zero cardinality": { Jobs: []*api.Job{ @@ -264,7 +285,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "negative cardinality": { Jobs: []*api.Job{ @@ -275,7 +297,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "inconsistent cardinality": { Jobs: []*api.Job{ @@ -310,7 +333,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "inconsistent PriorityClassName": { Jobs: []*api.Job{ @@ -333,7 +357,8 @@ func TestValidateGangs(t *testing.T) { }, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, "inconsistent NodeUniformityLabel": { Jobs: []*api.Job{ @@ -354,17 +379,22 @@ func TestValidateGangs(t *testing.T) { PodSpec: &v1.PodSpec{}, }, }, - ExpectSuccess: false, + ExpectSuccess: false, + ExpectedGangMinimumCardinalityByGangId: nil, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - err := validateGangs(tc.Jobs) + gangDetailsById, err := validateGangs(tc.Jobs) if tc.ExpectSuccess { assert.NoError(t, err) } else { assert.Error(t, err) } + + for id, e := range gangDetailsById { + assert.Equal(t, tc.ExpectedGangMinimumCardinalityByGangId[id], e.expectedMinimumCardinality) + } }) } } diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 9b7f63c48b2..4b0bc6e2940 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -124,30 +124,43 @@ func targetNodeIdFromNodeSelector(nodeSelector map[string]string) (string, bool) return nodeId, ok } -// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, isGangJob, error). -func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, bool, error) { +// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error). +func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, int, bool, error) { return GangIdAndCardinalityFromAnnotations(job.GetAnnotations()) } -// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, isGangJob, error). -func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, bool, error) { +// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error). +func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, int, bool, error) { if annotations == nil { - return "", 0, false, nil + return "", 0, 0, false, nil } gangId, ok := annotations[configuration.GangIdAnnotation] if !ok { - return "", 0, false, nil + return "", 0, 0, false, nil } gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation] if !ok { - return "", 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation) + return "", 0, 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation) } gangCardinality, err := strconv.Atoi(gangCardinalityString) if err != nil { - return "", 0, false, errors.WithStack(err) + return "", 0, 0, false, errors.WithStack(err) } if gangCardinality <= 0 { - return "", 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality) + return "", 0, 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality) + } + gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation] + if !ok { + // If this is not set, default the minimum gang size to gangCardinality + return gangId, gangCardinality, gangCardinality, true, nil + } else { + gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString) + if err != nil { + return "", 0, 0, false, errors.WithStack(err) + } + if gangMinimumCardinality <= 0 { + return "", 0, 0, false, errors.Errorf("gang minimum cardinality is non-positive %d", gangMinimumCardinality) + } + return gangId, gangCardinality, gangMinimumCardinality, true, nil } - return gangId, gangCardinality, true, nil } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index ba753af0368..ebf2c35b390 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -626,7 +626,7 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preemptedJobs, schedul } } for _, job := range scheduledJobs { - gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job) + gangId, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job) if err != nil { return err } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index ba6c223f49a..f262f924b8b 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -211,7 +211,7 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e } } - gangId, gangCardinality, isGangJob, err := GangIdAndCardinalityFromAnnotations(job.GetAnnotations()) + gangId, gangCardinality, _, isGangJob, err := GangIdAndCardinalityFromAnnotations(job.GetAnnotations()) if err != nil { // TODO: Get from context passed in. log := logrus.NewEntry(logrus.New()) diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 02ea3929aaa..979ec21df38 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -600,7 +600,7 @@ func TestQueueScheduler(t *testing.T) { continue } assert.Equal(t, nodeDb.NumNodes(), pctx.NumNodes) - _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job) + _, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job) require.NoError(t, err) if !isGangJob { numExcludedNodes := 0 diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 2c925d465a5..2c8f26fdd65 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -283,7 +283,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t } jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job) nodeIdByJobId[job.Id()] = nodeId - gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job) + gangId, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job) if err != nil { return nil, err }