From ce7e6c5444184d9ee69e5f1ce07263086787697d Mon Sep 17 00:00:00 2001 From: frrist Date: Thu, 25 Apr 2024 10:42:58 -0700 Subject: [PATCH] fix: require connected and approved nodes for scheduling - fixes #3784 --- pkg/node/requester.go | 53 +++++++++++-------- pkg/orchestrator/scheduler/batch_job_test.go | 16 +++--- .../scheduler/batch_service_job.go | 39 +++++++------- pkg/orchestrator/scheduler/daemon_job.go | 25 ++++----- pkg/orchestrator/scheduler/daemon_job_test.go | 14 +++-- pkg/orchestrator/scheduler/ops_job.go | 29 +++++----- pkg/orchestrator/scheduler/ops_job_test.go | 14 +++-- .../scheduler/service_job_test.go | 16 +++--- 8 files changed, 115 insertions(+), 91 deletions(-) diff --git a/pkg/node/requester.go b/pkg/node/requester.go index cf1a88b655..5103bd1d27 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -99,12 +99,6 @@ func NewRequesterNode( }), ) - // node selector - nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{ - NodeDiscoverer: nodeInfoStore, - NodeRanker: nodeRankerChain, - }) - // evaluation broker evalBroker, err := evaluation.NewInMemoryBroker(evaluation.InMemoryBrokerParams{ VisibilityTimeout: requesterConfig.EvalBrokerVisibilityTimeout, @@ -151,26 +145,41 @@ func NewRequesterNode( retryStrategy = retryStrategyChain } - // scheduler provider - batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(scheduler.BatchServiceJobSchedulerParams{ - JobStore: jobStore, - Planner: planners, - NodeSelector: nodeSelector, - RetryStrategy: retryStrategy, + // TODO(forrest): [refactor] the selector constraints ought to be a parameter to the node selector. + // node selector + nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{ + NodeDiscoverer: nodeInfoStore, + NodeRanker: nodeRankerChain, }) + // selector constraints: require nodes be online and approved to schedule + selectorConstraints := orchestrator.NodeSelectionConstraints{ + RequireConnected: true, + RequireApproval: true, + } + + // scheduler provider + batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler( + jobStore, + planners, + nodeSelector, + retryStrategy, + selectorConstraints, + ) schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{ models.JobTypeBatch: batchServiceJobScheduler, models.JobTypeService: batchServiceJobScheduler, - models.JobTypeOps: scheduler.NewOpsJobScheduler(scheduler.OpsJobSchedulerParams{ - JobStore: jobStore, - Planner: planners, - NodeSelector: nodeSelector, - }), - models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(scheduler.DaemonJobSchedulerParams{ - JobStore: jobStore, - Planner: planners, - NodeSelector: nodeSelector, - }), + models.JobTypeOps: scheduler.NewOpsJobScheduler( + jobStore, + planners, + nodeSelector, + selectorConstraints, + ), + models.JobTypeDaemon: scheduler.NewDaemonJobScheduler( + jobStore, + planners, + nodeSelector, + selectorConstraints, + ), }) workers := make([]*orchestrator.Worker, 0, requesterConfig.WorkerCount) diff --git a/pkg/orchestrator/scheduler/batch_job_test.go b/pkg/orchestrator/scheduler/batch_job_test.go index bc34c88148..d286e4cf93 100644 --- a/pkg/orchestrator/scheduler/batch_job_test.go +++ b/pkg/orchestrator/scheduler/batch_job_test.go @@ -49,12 +49,16 @@ func (s *BatchJobSchedulerTestSuite) SetupTest() { s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl) s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true}) - s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{ - JobStore: s.jobStore, - Planner: s.planner, - NodeSelector: s.nodeSelector, - RetryStrategy: s.retryStrategy, - }) + s.scheduler = NewBatchServiceJobScheduler( + s.jobStore, + s.planner, + s.nodeSelector, + s.retryStrategy, + orchestrator.NodeSelectionConstraints{ + RequireConnected: false, + RequireApproval: false, + }, + ) } func TestBatchSchedulerTestSuite(t *testing.T) { diff --git a/pkg/orchestrator/scheduler/batch_service_job.go b/pkg/orchestrator/scheduler/batch_service_job.go index bc4010cc0d..85adc03fef 100644 --- a/pkg/orchestrator/scheduler/batch_service_job.go +++ b/pkg/orchestrator/scheduler/batch_service_job.go @@ -18,25 +18,26 @@ import ( // - batch jobs that run until completion on N number of nodes // - service jobs than run until stopped on N number of nodes type BatchServiceJobScheduler struct { - jobStore jobstore.Store - planner orchestrator.Planner - nodeSelector orchestrator.NodeSelector - retryStrategy orchestrator.RetryStrategy + jobStore jobstore.Store + planner orchestrator.Planner + nodeSelector orchestrator.NodeSelector + retryStrategy orchestrator.RetryStrategy + selectorConstraints orchestrator.NodeSelectionConstraints } -type BatchServiceJobSchedulerParams struct { - JobStore jobstore.Store - Planner orchestrator.Planner - NodeSelector orchestrator.NodeSelector - RetryStrategy orchestrator.RetryStrategy -} - -func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler { +func NewBatchServiceJobScheduler( + store jobstore.Store, + planner orchestrator.Planner, + selector orchestrator.NodeSelector, + strategy orchestrator.RetryStrategy, + constraints orchestrator.NodeSelectionConstraints, +) *BatchServiceJobScheduler { return &BatchServiceJobScheduler{ - jobStore: params.JobStore, - planner: params.Planner, - nodeSelector: params.NodeSelector, - retryStrategy: params.RetryStrategy, + jobStore: store, + planner: planner, + nodeSelector: selector, + retryStrategy: strategy, + selectorConstraints: constraints, } } @@ -155,15 +156,11 @@ func (b *BatchServiceJobScheduler) createMissingExecs( // placeExecs places the executions func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error { if len(execs) > 0 { - // TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used selectedNodes, err := b.nodeSelector.TopMatchingNodes( ctx, job, len(execs), - &orchestrator.NodeSelectionConstraints{ - RequireApproval: false, - RequireConnected: false, - }, + &b.selectorConstraints, ) if err != nil { return err diff --git a/pkg/orchestrator/scheduler/daemon_job.go b/pkg/orchestrator/scheduler/daemon_job.go index da0cc3ed75..522938e3e6 100644 --- a/pkg/orchestrator/scheduler/daemon_job.go +++ b/pkg/orchestrator/scheduler/daemon_job.go @@ -18,19 +18,20 @@ type DaemonJobScheduler struct { jobStore jobstore.Store planner orchestrator.Planner nodeSelector orchestrator.NodeSelector + constraints orchestrator.NodeSelectionConstraints } -type DaemonJobSchedulerParams struct { - JobStore jobstore.Store - Planner orchestrator.Planner - NodeSelector orchestrator.NodeSelector -} - -func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler { +func NewDaemonJobScheduler( + store jobstore.Store, + planner orchestrator.Planner, + selector orchestrator.NodeSelector, + constraints orchestrator.NodeSelectionConstraints, +) *DaemonJobScheduler { return &DaemonJobScheduler{ - jobStore: params.JobStore, - planner: params.Planner, - nodeSelector: params.NodeSelector, + jobStore: store, + planner: planner, + nodeSelector: selector, + constraints: constraints, } } @@ -86,11 +87,11 @@ func (b *DaemonJobScheduler) createMissingExecs( ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) { newExecs := execSet{} - // Require approval when selecting nodes, but do not require them to be connected. + // Require nodes to be approved and connected to schedule work. nodes, err := b.nodeSelector.AllMatchingNodes( ctx, job, - &orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: false}, + &orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: true}, ) if err != nil { return newExecs, err diff --git a/pkg/orchestrator/scheduler/daemon_job_test.go b/pkg/orchestrator/scheduler/daemon_job_test.go index 5f74a87077..6d854c1a1a 100644 --- a/pkg/orchestrator/scheduler/daemon_job_test.go +++ b/pkg/orchestrator/scheduler/daemon_job_test.go @@ -30,11 +30,15 @@ func (s *DaemonJobSchedulerTestSuite) SetupTest() { s.planner = orchestrator.NewMockPlanner(ctrl) s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl) - s.scheduler = NewDaemonJobScheduler(DaemonJobSchedulerParams{ - JobStore: s.jobStore, - Planner: s.planner, - NodeSelector: s.nodeSelector, - }) + s.scheduler = NewDaemonJobScheduler( + s.jobStore, + s.planner, + s.nodeSelector, + orchestrator.NodeSelectionConstraints{ + RequireConnected: true, + RequireApproval: true, + }, + ) } func TestDaemonJobSchedulerTestSuite(t *testing.T) { diff --git a/pkg/orchestrator/scheduler/ops_job.go b/pkg/orchestrator/scheduler/ops_job.go index 57d3a27b9b..72aa8970ee 100644 --- a/pkg/orchestrator/scheduler/ops_job.go +++ b/pkg/orchestrator/scheduler/ops_job.go @@ -16,22 +16,23 @@ import ( // OpsJobScheduler is a scheduler for batch jobs that run until completion type OpsJobScheduler struct { - jobStore jobstore.Store - planner orchestrator.Planner - nodeSelector orchestrator.NodeSelector + jobStore jobstore.Store + planner orchestrator.Planner + nodeSelector orchestrator.NodeSelector + selectorConstraints orchestrator.NodeSelectionConstraints } -type OpsJobSchedulerParams struct { - JobStore jobstore.Store - Planner orchestrator.Planner - NodeSelector orchestrator.NodeSelector -} - -func NewOpsJobScheduler(params OpsJobSchedulerParams) *OpsJobScheduler { +func NewOpsJobScheduler( + store jobstore.Store, + planner orchestrator.Planner, + selector orchestrator.NodeSelector, + constraints orchestrator.NodeSelectionConstraints, +) *OpsJobScheduler { return &OpsJobScheduler{ - jobStore: params.JobStore, - planner: params.Planner, - nodeSelector: params.NodeSelector, + jobStore: store, + planner: planner, + nodeSelector: selector, + selectorConstraints: constraints, } } @@ -107,7 +108,7 @@ func (b *OpsJobScheduler) createMissingExecs( ctx, job, &orchestrator.NodeSelectionConstraints{ RequireApproval: true, - RequireConnected: false, + RequireConnected: true, }) if err != nil { return newExecs, err diff --git a/pkg/orchestrator/scheduler/ops_job_test.go b/pkg/orchestrator/scheduler/ops_job_test.go index c9813071c3..7da41caed7 100644 --- a/pkg/orchestrator/scheduler/ops_job_test.go +++ b/pkg/orchestrator/scheduler/ops_job_test.go @@ -30,11 +30,15 @@ func (s *OpsJobSchedulerTestSuite) SetupTest() { s.planner = orchestrator.NewMockPlanner(ctrl) s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl) - s.scheduler = NewOpsJobScheduler(OpsJobSchedulerParams{ - JobStore: s.jobStore, - Planner: s.planner, - NodeSelector: s.nodeSelector, - }) + s.scheduler = NewOpsJobScheduler( + s.jobStore, + s.planner, + s.nodeSelector, + orchestrator.NodeSelectionConstraints{ + RequireConnected: false, + RequireApproval: false, + }, + ) } func TestOpsJobSchedulerTestSuite(t *testing.T) { diff --git a/pkg/orchestrator/scheduler/service_job_test.go b/pkg/orchestrator/scheduler/service_job_test.go index 9fcac97b15..a3b6d173bd 100644 --- a/pkg/orchestrator/scheduler/service_job_test.go +++ b/pkg/orchestrator/scheduler/service_job_test.go @@ -41,12 +41,16 @@ func (s *ServiceJobSchedulerTestSuite) SetupTest() { s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl) s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true}) - s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{ - JobStore: s.jobStore, - Planner: s.planner, - NodeSelector: s.nodeSelector, - RetryStrategy: s.retryStrategy, - }) + s.scheduler = NewBatchServiceJobScheduler( + s.jobStore, + s.planner, + s.nodeSelector, + s.retryStrategy, + orchestrator.NodeSelectionConstraints{ + RequireConnected: false, + RequireApproval: false, + }, + ) } func TestServiceSchedulerTestSuite(t *testing.T) {