Skip to content

Commit

Permalink
Scheduler: refactor: replace obsolete job.ResourceRequirements (#4051)
Browse files Browse the repository at this point in the history
* Scheduler: refactor: replace obsolete job.ResourceRequirements

Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com>

* refactor preempting_queue_scheduler_test.go

Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com>

---------

Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com>
  • Loading branch information
robertdavidsmith authored Nov 14, 2024
1 parent 47e0927 commit 00c123a
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 46 deletions.
4 changes: 2 additions & 2 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (job *Job) Tolerations() []v1.Toleration {

// ResourceRequirements returns the resource requirements of the Job
// KubernetesResourceRequirements below is preferred
func (job *Job) ResourceRequirements() v1.ResourceRequirements {
func (job *Job) resourceRequirements() v1.ResourceRequirements {
if req := job.PodRequirements(); req != nil {
return req.ResourceRequirements
}
Expand Down Expand Up @@ -831,7 +831,7 @@ func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job
job.NodeSelector(),
job.Affinity(),
job.Tolerations(),
job.ResourceRequirements().Requests,
job.resourceRequirements().Requests,
job.PriorityClassName(),
)
}
4 changes: 2 additions & 2 deletions internal/scheduler/metrics/state_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio
}

queue := job.Queue()
requests := job.ResourceRequirements().Requests
requests := job.AllResourceRequirements()
latestRun := job.LatestRun()
pool := ""
node := ""
Expand All @@ -236,7 +236,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio

// Resource Seconds
for _, res := range m.trackedResourceNames {
resQty := requests[res]
resQty := requests.GetResourceByNameZeroIfMissing(string(res))
resSeconds := duration * float64(resQty.MilliValue()) / 1000
m.jobStateResourceSecondsByQueue.
WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds)
Expand Down
7 changes: 2 additions & 5 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

armadamaps "github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/adapters"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
Expand Down Expand Up @@ -132,8 +131,6 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
jobFilter := func(job *jobdb.Job) bool { return true }
job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0)
request := job.KubernetesResourceRequirements()
requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests))
assert.Nil(t, err)

jobId := job.Id()

Expand Down Expand Up @@ -177,14 +174,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
assert.True(
t,
armadamaps.DeepEqual(
map[string]internaltypes.ResourceList{jobId: requestInternalRl},
map[string]internaltypes.ResourceList{jobId: request},
boundNode.AllocatedByJobId,
),
)
assert.True(
t,
armadamaps.DeepEqual(
map[string]internaltypes.ResourceList{jobId: requestInternalRl},
map[string]internaltypes.ResourceList{jobId: request},
evictedNode.AllocatedByJobId,
),
)
Expand Down
10 changes: 5 additions & 5 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
func(jobs []*jobdb.Job) internaltypes.ResourceList {
rv := internaltypes.ResourceList{}
for _, job := range jobs {
rv.AddV1ResourceList(job.ResourceRequirements().Requests)
rv = rv.Add(job.AllResourceRequirements())
}
return rv
},
Expand All @@ -247,8 +247,8 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche
maps.Keys(jobsByQueue),
armadamaps.MapValues(
resourcesByQueue,
func(rl schedulerobjects.ResourceList) string {
return rl.CompactString()
func(rl internaltypes.ResourceList) string {
return rl.String()
},
),
jobCountPerQueue,
Expand Down
38 changes: 11 additions & 27 deletions internal/scheduler/scheduling/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
// Accounting across scheduling rounds.
roundByJobId := make(map[string]int)
indexByJobId := make(map[string]int)
allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string])
allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList)
nodeIdByJobId := make(map[string]string)
var jobIdsByGangId map[string]map[string]bool
var gangIdByJobId map[string]string
Expand All @@ -1941,7 +1941,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
)
}

demandByQueue := map[string]schedulerobjects.ResourceList{}
demandByQueue := map[string]internaltypes.ResourceList{}

// Run the scheduler.
cordonedNodes := map[int]bool{}
Expand Down Expand Up @@ -1978,12 +1978,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
queuedJobs = append(queuedJobs, job.WithQueued(true))
roundByJobId[job.Id()] = i
indexByJobId[job.Id()] = j
r, ok := demandByQueue[job.Queue()]
if !ok {
r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests))
demandByQueue[job.Queue()] = r
}
r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests)
demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Add(job.AllResourceRequirements())
}
}
err = jobDbTxn.Upsert(queuedJobs)
Expand All @@ -2005,12 +2000,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
delete(gangIdByJobId, job.Id())
delete(jobIdsByGangId[gangId], job.Id())
}
r, ok := demandByQueue[job.Queue()]
if !ok {
r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests))
demandByQueue[job.Queue()] = r
}
r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests)
demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements())
}
}
}
Expand Down Expand Up @@ -2049,11 +2039,11 @@ func TestPreemptingQueueScheduler(t *testing.T) {

for queue, priorityFactor := range tc.PriorityFactorByQueue {
weight := 1 / priorityFactor
queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources)
queueDemand := demandByQueue[queue]
err := sctx.AddQueueSchedulingContext(
queue,
weight,
internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory),
allocatedByQueueAndPriorityClass[queue],
queueDemand,
queueDemand,
limiterByQueue[queue],
Expand Down Expand Up @@ -2092,28 +2082,22 @@ func TestPreemptingQueueScheduler(t *testing.T) {
job := jctx.Job
m := allocatedByQueueAndPriorityClass[job.Queue()]
if m == nil {
m = make(schedulerobjects.QuantityByTAndResourceType[string])
m = make(map[string]internaltypes.ResourceList)
allocatedByQueueAndPriorityClass[job.Queue()] = m
}
m.SubV1ResourceList(
job.PriorityClassName(),
job.ResourceRequirements().Requests,
)
m[job.PriorityClassName()] = m[job.PriorityClassName()].Subtract(job.AllResourceRequirements())
}
for _, jctx := range result.ScheduledJobs {
job := jctx.Job
m := allocatedByQueueAndPriorityClass[job.Queue()]
if m == nil {
m = make(schedulerobjects.QuantityByTAndResourceType[string])
m = make(map[string]internaltypes.ResourceList)
allocatedByQueueAndPriorityClass[job.Queue()] = m
}
m.AddV1ResourceList(
job.PriorityClassName(),
job.ResourceRequirements().Requests,
)
m[job.PriorityClassName()] = m[job.PriorityClassName()].Add(job.AllResourceRequirements())
}
for queue, qctx := range sctx.QueueSchedulingContexts {
m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory)
m := allocatedByQueueAndPriorityClass[queue]
assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass))
}

Expand Down
9 changes: 4 additions & 5 deletions internal/scheduler/simulator/sink/job_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"

parquetWriter "github.com/xitongsys/parquet-go/writer"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
protoutil "github.com/armadaproject/armada/internal/common/proto"
Expand Down Expand Up @@ -77,10 +76,10 @@ func (j *JobWriter) createJobRunRow(st *model.StateTransition) ([]*JobRunRow, er
associatedJob := jobsList[i]
if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil {
// Resource requirements
cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU]
memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory]
ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage]
gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"]
cpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("cpu")
memoryLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("memory")
ephemeralStorageLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("ephemeral-storage")
gpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("nvidia.com/gpu")
eventTime := protoutil.ToStdTime(event.Created)

rows = append(rows, &JobRunRow{
Expand Down

0 comments on commit 00c123a

Please sign in to comment.