Skip to content

Commit

Permalink
Merge branch 'master' into severinson-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson authored Sep 7, 2023
2 parents 0f979fc + 7ad04e3 commit e2afdec
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 58 deletions.
5 changes: 4 additions & 1 deletion internal/armada/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ const (
// All jobs in a gang are guaranteed to be scheduled onto the same cluster at the same time.
GangIdAnnotation = "armadaproject.io/gangId"
// GangCardinalityAnnotation All jobs in a gang must specify the total number of jobs in the gang via this annotation.
// The cardinality should be expressed as an integer, e.g., "3".
// The cardinality should be expressed as a positive integer, e.g., "3".
GangCardinalityAnnotation = "armadaproject.io/gangCardinality"
// GangMinimumCardinalityAnnotation All jobs in a gang must specify the minimum size for the gang to be schedulable via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangMinimumCardinalityAnnotation = "armadaproject.io/gangMinimumCardinality"
// The jobs that make up a gang may be constrained to be scheduled across a set of uniform nodes.
// Specifically, if provided, all gang jobs are scheduled onto nodes for which the value of the provided label is equal.
// Used to ensure, e.g., that all gang jobs are scheduled onto the same cluster or rack.
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL

// Group gangs.
for _, job := range jobs {
gangId, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
gangId, _, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down
49 changes: 24 additions & 25 deletions internal/common/validation/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) error {
if err := validateGangs(jobs); err != nil {
if _, err := validateGangs(jobs); err != nil {
return err
}
for _, job := range jobs {
Expand All @@ -23,67 +23,66 @@ func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) err
return nil
}

func validateGangs(jobs []*api.Job) error {
gangDetailsByGangId := make(map[string]struct {
actualCardinality int
expectedCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
})
type gangDetails = struct {
expectedCardinality int
expectedMinimumCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
}

func validateGangs(jobs []*api.Job) (map[string]gangDetails, error) {
gangDetailsByGangId := make(map[string]gangDetails)
for i, job := range jobs {
annotations := job.Annotations
gangId, gangCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
gangId, gangCardinality, gangMinimumCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
nodeUniformityLabel := annotations[configuration.GangNodeUniformityLabelAnnotation]
if err != nil {
return errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
return nil, errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
}
if !isGangJob {
continue
}
if gangId == "" {
return errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
return nil, errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
}
podSpec := util.PodSpecFromJob(job)
if details, ok := gangDetailsByGangId[gangId]; ok {
if details.expectedCardinality != gangCardinality {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedCardinality, gangCardinality,
)
}
if details.expectedMinimumCardinality != gangMinimumCardinality {
return nil, errors.Errorf(
"inconsistent gang minimum cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedMinimumCardinality, gangMinimumCardinality,
)
}
if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName,
)
}
if nodeUniformityLabel != details.expectedNodeUniformityLabel {
return errors.Errorf(
return nil, errors.Errorf(
"inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel,
)
}
details.actualCardinality++
gangDetailsByGangId[gangId] = details
} else {
details.actualCardinality = 1
details.expectedCardinality = gangCardinality
details.expectedMinimumCardinality = gangMinimumCardinality
if podSpec != nil {
details.expectedPriorityClassName = podSpec.PriorityClassName
}
details.expectedNodeUniformityLabel = nodeUniformityLabel
gangDetailsByGangId[gangId] = details
}
}
for gangId, details := range gangDetailsByGangId {
if details.expectedCardinality != details.actualCardinality {
return errors.Errorf(
"unexpected number of jobs for gang %s: expected %d jobs but got %d",
gangId, details.expectedCardinality, details.actualCardinality,
)
}
}
return nil
return gangDetailsByGangId, nil
}

func ValidateApiJob(job *api.Job, config configuration.SchedulingConfig) error {
Expand Down
64 changes: 47 additions & 17 deletions internal/common/validation/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func Test_ValidateJobSubmitRequestItem_WithPortRepeatedInSeperateConfig(t *testi

func TestValidateGangs(t *testing.T) {
tests := map[string]struct {
Jobs []*api.Job
ExpectSuccess bool
Jobs []*api.Job
ExpectSuccess bool
ExpectedGangMinimumCardinalityByGangId map[string]int
}{
"no gang jobs": {
Jobs: []*api.Job{{}, {}},
ExpectSuccess: true,
},
"complete gang job of cardinality 1": {
"complete gang job of cardinality 1 with no minimum cardinality provided": {
Jobs: []*api.Job{
{
Annotations: map[string]string{
Expand All @@ -123,7 +124,21 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: true,
ExpectSuccess: true,
ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 1},
},
"complete gang job of cardinality 2 with minimum cardinality of 1": {
Jobs: []*api.Job{
{
Annotations: map[string]string{
configuration.GangIdAnnotation: "foo",
configuration.GangCardinalityAnnotation: strconv.Itoa(2),
configuration.GangMinimumCardinalityAnnotation: strconv.Itoa(1),
},
},
},
ExpectSuccess: true,
ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 1},
},
"empty gangId": {
Jobs: []*api.Job{
Expand All @@ -134,7 +149,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"complete gang job of cardinality 3": {
Jobs: []*api.Job{
Expand All @@ -157,7 +173,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: true,
ExpectSuccess: true,
ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3},
},
"two complete gangs": {
Jobs: []*api.Job{
Expand Down Expand Up @@ -192,9 +209,10 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: true,
ExpectSuccess: true,
ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3, "bar": 2},
},
"one complete and one incomplete gang": {
"one complete and one incomplete gang are passed through": {
Jobs: []*api.Job{
{
Annotations: map[string]string{
Expand All @@ -221,7 +239,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: true,
ExpectedGangMinimumCardinalityByGangId: map[string]int{"foo": 3, "bar": 2},
},
"missing cardinality": {
Jobs: []*api.Job{
Expand All @@ -237,7 +256,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"invalid cardinality": {
Jobs: []*api.Job{
Expand All @@ -253,7 +273,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"zero cardinality": {
Jobs: []*api.Job{
Expand All @@ -264,7 +285,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"negative cardinality": {
Jobs: []*api.Job{
Expand All @@ -275,7 +297,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"inconsistent cardinality": {
Jobs: []*api.Job{
Expand Down Expand Up @@ -310,7 +333,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"inconsistent PriorityClassName": {
Jobs: []*api.Job{
Expand All @@ -333,7 +357,8 @@ func TestValidateGangs(t *testing.T) {
},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
"inconsistent NodeUniformityLabel": {
Jobs: []*api.Job{
Expand All @@ -354,17 +379,22 @@ func TestValidateGangs(t *testing.T) {
PodSpec: &v1.PodSpec{},
},
},
ExpectSuccess: false,
ExpectSuccess: false,
ExpectedGangMinimumCardinalityByGangId: nil,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
err := validateGangs(tc.Jobs)
gangDetailsById, err := validateGangs(tc.Jobs)
if tc.ExpectSuccess {
assert.NoError(t, err)
} else {
assert.Error(t, err)
}

for id, e := range gangDetailsById {
assert.Equal(t, tc.ExpectedGangMinimumCardinalityByGangId[id], e.expectedMinimumCardinality)
}
})
}
}
33 changes: 23 additions & 10 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,30 +124,43 @@ func targetNodeIdFromNodeSelector(nodeSelector map[string]string) (string, bool)
return nodeId, ok
}

// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, bool, error) {
// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, int, bool, error) {
return GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
}

// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, isGangJob, error).
func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, bool, error) {
// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).
func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, int, bool, error) {
if annotations == nil {
return "", 0, false, nil
return "", 0, 0, false, nil
}
gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return "", 0, false, nil
return "", 0, 0, false, nil
}
gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return "", 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
return "", 0, 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return "", 0, false, errors.WithStack(err)
return "", 0, 0, false, errors.WithStack(err)
}
if gangCardinality <= 0 {
return "", 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
return "", 0, 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
}
gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation]
if !ok {
// If this is not set, default the minimum gang size to gangCardinality
return gangId, gangCardinality, gangCardinality, true, nil
} else {
gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString)
if err != nil {
return "", 0, 0, false, errors.WithStack(err)
}
if gangMinimumCardinality <= 0 {
return "", 0, 0, false, errors.Errorf("gang minimum cardinality is non-positive %d", gangMinimumCardinality)
}
return gangId, gangCardinality, gangMinimumCardinality, true, nil
}
return gangId, gangCardinality, true, nil
}
2 changes: 1 addition & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preemptedJobs, schedul
}
}
for _, job := range scheduledJobs {
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
gangId, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e
}
}

gangId, gangCardinality, isGangJob, err := GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
gangId, gangCardinality, _, isGangJob, err := GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
if err != nil {
// TODO: Get from context passed in.
log := logrus.NewEntry(logrus.New())
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestQueueScheduler(t *testing.T) {
continue
}
assert.Equal(t, nodeDb.NumNodes(), pctx.NumNodes)
_, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job)
_, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(jctx.Job)
require.NoError(t, err)
if !isGangJob {
numExcludedNodes := 0
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx context.Context, t
}
jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job)
nodeIdByJobId[job.Id()] = nodeId
gangId, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
gangId, _, _, isGangJob, err := GangIdAndCardinalityFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e2afdec

Please sign in to comment.