Skip to content

Commit

Permalink
WIP: Executor cordoning implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafai-gr committed Aug 30, 2024
1 parent 523771f commit 996b692
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 2 deletions.
7 changes: 7 additions & 0 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewExecutorApi(publisher pulsarutils.Publisher,
// 1. Stores job and capacity information received from the executor to make it available to the scheduler.
// 2. Notifies the executor if any of its jobs are no longer active, e.g., due to being preempted by the scheduler.
// 3. Transfers any jobs scheduled on this executor cluster that the executor don't already have.
// 4. Maintains the executor cordoned status from the scheduler's perspective
func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error {
// Receive once to get info necessary to get jobs to lease.
req, err := stream.Recv()
Expand All @@ -88,6 +89,12 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
ctx := armadacontext.WithLogField(armadacontext.FromGrpcCtx(stream.Context()), "executor", req.ExecutorId)

executor := srv.executorFromLeaseRequest(ctx, req)
storedExecutor, err := srv.executorRepository.GetExecutor(ctx, executor.Id)
if err == nil {
// Since this is a lease request, we don't want to alter the cordoned status of the executor
executor.Cordoned = storedExecutor.Cordoned

Check failure on line 95 in internal/scheduler/api.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

executor.Cordoned undefined (type *schedulerobjects.Executor has no field or method Cordoned)

Check failure on line 95 in internal/scheduler/api.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

storedExecutor.Cordoned undefined (type *schedulerobjects.Executor has no field or method Cordoned)
}

if err := srv.executorRepository.StoreExecutor(ctx, executor); err != nil {
return err
}
Expand Down
19 changes: 19 additions & 0 deletions internal/scheduler/database/executor_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

// ExecutorRepository is an interface to be implemented by structs which provide executor information
type ExecutorRepository interface {
// GetExecutor returns the stored executor matching the provided id, or an error if the executor doesn't exist.
GetExecutor(ctx *armadacontext.Context, id string) (*schedulerobjects.Executor, error)
// GetExecutors returns all known executors, regardless of their last heartbeat time
GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error)
// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
Expand All @@ -39,6 +41,23 @@ func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository
}
}

// GetExecutor returns the stored executor matching the provided id
func (r *PostgresExecutorRepository) GetExecutor(ctx *armadacontext.Context, id string) (*schedulerobjects.Executor, error) {
queries := New(r.db)
request, err := queries.SelectExecutor(ctx, id)
if err != nil {
return nil, errors.WithStack(err)
}

executor := &schedulerobjects.Executor{}
err = decompressAndMarshall(request.LastRequest, r.decompressor, executor)
if err != nil {
return nil, err
}

return executor, nil
}

// GetExecutors returns all known executors, regardless of their last heartbeat time
func (r *PostgresExecutorRepository) GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error) {
queries := New(r.db)
Expand Down
23 changes: 23 additions & 0 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions internal/scheduler/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions internal/scheduler/mocks/executor_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,10 @@ type testExecutorRepository struct {
shouldError bool
}

func (t testExecutorRepository) GetExecutor(ctx *armadacontext.Context, id string) (*schedulerobjects.Executor, error) {
panic("not implemented")
}

func (t testExecutorRepository) GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error) {
panic("not implemented")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/schedulerobjects/schedulerobjects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ message Executor {
google.protobuf.Timestamp lastUpdateTime = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
// Jobs that are owned by the cluster but are not assigned to any node.
repeated string unassigned_job_runs = 9;
// Determines whether scheduling is enabled for this queue.
bool cordoned = 10;
}

// Node represents a node in a worker cluster.
Expand Down
14 changes: 14 additions & 0 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
return nil, err
}
executors = l.filterStaleExecutors(ctx, executors)
executors = l.filterCordonedExecutors(ctx, executors)

queues, err := l.queueCache.GetAll(ctx)
if err != nil {
Expand Down Expand Up @@ -621,6 +622,19 @@ func (l *FairSchedulingAlgo) filterStaleExecutors(ctx *armadacontext.Context, ex
return activeExecutors
}

// filterCordonedExecutors returns all executors that haven't been cordoned
func (l *FairSchedulingAlgo) filterCordonedExecutors(ctx *armadacontext.Context, executors []*schedulerobjects.Executor) []*schedulerobjects.Executor {
activeExecutors := make([]*schedulerobjects.Executor, 0)
for _, executor := range executors {
if !executor.Cordoned {

Check failure on line 629 in internal/scheduler/scheduling_algo.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

executor.Cordoned undefined (type *schedulerobjects.Executor has no field or method Cordoned)
activeExecutors = append(activeExecutors, executor)
} else {
ctx.Infof("Ignoring executor %s because it's cordoned status is %t", executor.Id, executor.Cordoned)

Check failure on line 632 in internal/scheduler/scheduling_algo.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

executor.Cordoned undefined (type *schedulerobjects.Executor has no field or method Cordoned)) (typecheck)
}
}
return activeExecutors
}

// filterLaggingExecutors returns all executors with <= l.schedulingConfig.MaxUnacknowledgedJobsPerExecutor unacknowledged jobs,
// where unacknowledged means the executor has not echoed the job since it was scheduled.
//
Expand Down
40 changes: 40 additions & 0 deletions internal/scheduleringester/dbops.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ type (
InsertPartitionMarker struct {
markers []*schedulerdb.Marker
}
CordonExecutors map[string]bool
UncordonExecutors map[string]bool
)

type JobSetOperation interface {
Expand Down Expand Up @@ -282,6 +284,28 @@ func (a *InsertPartitionMarker) Merge(b DbOperation) bool {
return false
}

func (a CordonExecutors) Merge(b DbOperation) bool {
switch op := b.(type) {
case CordonExecutors:
for executor, cordoned := range op {
a[executor] = cordoned
}
return true
}
return false
}

func (a UncordonExecutors) Merge(b DbOperation) bool {
switch op := b.(type) {
case UncordonExecutors:
for executor, cordoned := range op {
a[executor] = cordoned
}
return true
}
return false
}

// mergeInMap merges an op b into a, provided that b is of the same type as a.
// For example, if a is of type MarkJobSetsCancelRequested, b is only merged if also of type MarkJobSetsCancelRequested.
// Returns true if the ops were merged and false otherwise.
Expand Down Expand Up @@ -431,6 +455,22 @@ func (a MarkJobsValidated) CanBeAppliedBefore(b DbOperation) bool {
return !definesJob(a, b)
}

func (a CordonExecutors) CanBeAppliedBefore(b DbOperation) bool {
switch b.(type) {
case UncordonExecutors:
return false
}
return true
}

func (a UncordonExecutors) CanBeAppliedBefore(b DbOperation) bool {
switch b.(type) {
case CordonExecutors:
return false
}
return true
}

// definesJobInSet returns true if b is an InsertJobs operation
// that inserts at least one job in any of the job sets that make
// up the keys of a.
Expand Down
18 changes: 18 additions & 0 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (c *InstructionConverter) dbOperationsFromEventSequence(es *armadaevents.Ev
operationsFromEvent, err = c.handleJobRunAssigned(event.GetJobRunAssigned(), eventTime)
case *armadaevents.EventSequence_Event_JobValidated:
operationsFromEvent, err = c.handleJobValidated(event.GetJobValidated())
case *armadaevents.EventSequence_Event_CordonExecutor:

Check failure on line 112 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

undefined: armadaevents.EventSequence_Event_CordonExecutor

Check failure on line 112 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

undefined: armadaevents.EventSequence_Event_CordonExecutor

Check failure on line 112 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / test / Golang Unit Tests

undefined: armadaevents.EventSequence_Event_CordonExecutor
operationsFromEvent, err = c.handleCordonExecutor(event.GetCordonExecutor())

Check failure on line 113 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

event.GetCordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetCordonExecutor)

Check failure on line 113 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

event.GetCordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetCordonExecutor)

Check failure on line 113 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / test / Golang Unit Tests

event.GetCordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetCordonExecutor)
case *armadaevents.EventSequence_Event_UncordonExecutor:

Check failure on line 114 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

undefined: armadaevents.EventSequence_Event_UncordonExecutor

Check failure on line 114 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

undefined: armadaevents.EventSequence_Event_UncordonExecutor

Check failure on line 114 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / test / Golang Unit Tests

undefined: armadaevents.EventSequence_Event_UncordonExecutor
operationsFromEvent, err = c.handleUncordonExecutor(event.GetUncordonExecutor())

Check failure on line 115 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

event.GetUncordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetUncordonExecutor)

Check failure on line 115 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

event.GetUncordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetUncordonExecutor)

Check failure on line 115 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / test / Golang Unit Tests

event.GetUncordonExecutor undefined (type *armadaevents.EventSequence_Event has no field or method GetUncordonExecutor)
case *armadaevents.EventSequence_Event_ReprioritisedJob,
*armadaevents.EventSequence_Event_ResourceUtilisation,
*armadaevents.EventSequence_Event_JobRunCancelled,
Expand Down Expand Up @@ -417,6 +421,20 @@ func (c *InstructionConverter) handleJobValidated(checked *armadaevents.JobValid
}, nil
}

func (c *InstructionConverter) handleCordonExecutor(checked *armadaevents.CordonExecutor) ([]DbOperation, error) {

Check failure on line 424 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

undefined: armadaevents.CordonExecutor
executor := checked.GetName()
return []DbOperation{
CordonExecutors{executor: true},
}, nil
}

func (c *InstructionConverter) handleUncordonExecutor(checked *armadaevents.UncordonExecutor) ([]DbOperation, error) {

Check failure on line 431 in internal/scheduleringester/instructions.go

View workflow job for this annotation

GitHub Actions / build / prepare

undefined: armadaevents.UncordonExecutor
executor := checked.GetName()
return []DbOperation{
UncordonExecutors{executor: true},
}, nil
}

// schedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler.
func (c *InstructionConverter) schedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error) {
return SchedulingInfoFromSubmitJob(submitJob, submitTime)
Expand Down
Loading

0 comments on commit 996b692

Please sign in to comment.