diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index b75032a0df..2d65555d78 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -102,11 +102,13 @@ type NodeRanker interface { type NodeSelector interface { // AllNodes returns all nodes in the network. AllNodes(ctx context.Context) ([]models.NodeInfo, error) + // AllMatchingNodes returns all nodes that match the job constrains and selection criteria. - AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error) + AllMatchingNodes(ctx context.Context, job *models.Job, options ...NodeSelectionOption) ([]models.NodeInfo, error) + // TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints // ordered in descending order based on their rank, or error if not enough nodes match. - TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error) + TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, options ...NodeSelectionOption) ([]models.NodeInfo, error) } type RetryStrategy interface { diff --git a/pkg/orchestrator/mocks.go b/pkg/orchestrator/mocks.go index 268f6704ea..9821de8f56 100644 --- a/pkg/orchestrator/mocks.go +++ b/pkg/orchestrator/mocks.go @@ -398,18 +398,18 @@ func (mr *MockNodeSelectorMockRecorder) AllNodes(ctx any) *gomock.Call { } // TopMatchingNodes mocks base method. -func (m *MockNodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error) { +func (m *MockNodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, options ...any) ([]models.NodeInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TopMatchingNodes", ctx, job, desiredCount) + ret := m.ctrl.Call(m, "TopMatchingNodes", ctx, job, desiredCount, options) ret0, _ := ret[0].([]models.NodeInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // TopMatchingNodes indicates an expected call of TopMatchingNodes. -func (mr *MockNodeSelectorMockRecorder) TopMatchingNodes(ctx, job, desiredCount any) *gomock.Call { +func (mr *MockNodeSelectorMockRecorder) TopMatchingNodes(ctx, job, desiredCount any, options ...any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopMatchingNodes", reflect.TypeOf((*MockNodeSelector)(nil).TopMatchingNodes), ctx, job, desiredCount) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopMatchingNodes", reflect.TypeOf((*MockNodeSelector)(nil).TopMatchingNodes), ctx, job, desiredCount, options) } // MockRetryStrategy is a mock of RetryStrategy interface. diff --git a/pkg/orchestrator/scheduler/batch_service_job.go b/pkg/orchestrator/scheduler/batch_service_job.go index 98be071685..6e7ba92aa4 100644 --- a/pkg/orchestrator/scheduler/batch_service_job.go +++ b/pkg/orchestrator/scheduler/batch_service_job.go @@ -153,7 +153,14 @@ func (b *BatchServiceJobScheduler) createMissingExecs( // placeExecs places the executions func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error { if len(execs) > 0 { - selectedNodes, err := b.nodeSelector.TopMatchingNodes(ctx, job, len(execs)) + // TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used + selectedNodes, err := b.nodeSelector.TopMatchingNodes( + ctx, + job, + len(execs), + orchestrator.WithApproval(false), + orchestrator.WithConnected(false), + ) if err != nil { return err } diff --git a/pkg/orchestrator/scheduler/daemon_job.go b/pkg/orchestrator/scheduler/daemon_job.go index eda155359a..cb8036aa44 100644 --- a/pkg/orchestrator/scheduler/daemon_job.go +++ b/pkg/orchestrator/scheduler/daemon_job.go @@ -85,7 +85,9 @@ func (b *DaemonJobScheduler) Process(ctx context.Context, evaluation *models.Eva func (b *DaemonJobScheduler) createMissingExecs( ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) { newExecs := execSet{} - nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job) + + // Require approval when selecting nodes, but do not require them to be connected. + nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job, orchestrator.WithApproval(true), orchestrator.WithConnected(false)) if err != nil { return newExecs, err } diff --git a/pkg/orchestrator/scheduler/ops_job.go b/pkg/orchestrator/scheduler/ops_job.go index 8815f0cbd5..35f6c68da5 100644 --- a/pkg/orchestrator/scheduler/ops_job.go +++ b/pkg/orchestrator/scheduler/ops_job.go @@ -103,7 +103,7 @@ func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evalua func (b *OpsJobScheduler) createMissingExecs( ctx context.Context, job *models.Job, plan *models.Plan) (execSet, error) { newExecs := execSet{} - nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job) + nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job, orchestrator.WithApproval(true), orchestrator.WithConnected(false)) if err != nil { return newExecs, err } diff --git a/pkg/orchestrator/selection/selector/node_selector.go b/pkg/orchestrator/selection/selector/node_selector.go index c820b9a085..c9547a7972 100644 --- a/pkg/orchestrator/selection/selector/node_selector.go +++ b/pkg/orchestrator/selection/selector/node_selector.go @@ -35,8 +35,10 @@ func (n NodeSelector) AllNodes(ctx context.Context) ([]models.NodeInfo, error) { return n.nodeDiscoverer.ListNodes(ctx) } -func (n NodeSelector) AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error) { - filteredNodes, _, err := n.rankAndFilterNodes(ctx, job) +func (n NodeSelector) AllMatchingNodes(ctx context.Context, + job *models.Job, + options ...orchestrator.NodeSelectionOption) ([]models.NodeInfo, error) { + filteredNodes, _, err := n.rankAndFilterNodes(ctx, job, options...) if err != nil { return nil, err } @@ -44,11 +46,15 @@ func (n NodeSelector) AllMatchingNodes(ctx context.Context, job *models.Job) ([] nodeInfos := generic.Map(filteredNodes, func(nr orchestrator.NodeRank) models.NodeInfo { return nr.NodeInfo }) return nodeInfos, nil } -func (n NodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error) { - possibleNodes, rejectedNodes, err := n.rankAndFilterNodes(ctx, job) + +func (n NodeSelector) TopMatchingNodes(ctx context.Context, + job *models.Job, desiredCount int, + options ...orchestrator.NodeSelectionOption) ([]models.NodeInfo, error) { + possibleNodes, rejectedNodes, err := n.rankAndFilterNodes(ctx, job, options...) if err != nil { return nil, err } + if len(possibleNodes) < desiredCount { // TODO: evaluate if we should run the job if some nodes where found err = orchestrator.NewErrNotEnoughNodes(desiredCount, append(possibleNodes, rejectedNodes...)) @@ -64,18 +70,38 @@ func (n NodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, des return selectedInfos, nil } -func (n NodeSelector) rankAndFilterNodes(ctx context.Context, job *models.Job) (selected, rejected []orchestrator.NodeRank, err error) { +func (n NodeSelector) rankAndFilterNodes(ctx context.Context, + job *models.Job, + options ...orchestrator.NodeSelectionOption) (selected, rejected []orchestrator.NodeRank, err error) { listed, err := n.nodeDiscoverer.ListNodes(ctx) if err != nil { return nil, nil, err } + // Apply constraints on the state of the nodes we want to select, but allow + // the caller to override them. + constraints := &orchestrator.NodeSelectionConstraint{ + RequireApproval: true, + RequireConnected: true, + } + for _, opt := range options { + opt(constraints) + } + nodeIDs := lo.Filter(listed, func(nodeInfo models.NodeInfo, index int) bool { - // Filter out nodes that are not compute nodes, or nodes that are not currently - // connected or approved. We only want to consider nodes that are ready to run jobs. - return nodeInfo.NodeType == models.NodeTypeCompute && - nodeInfo.State == models.NodeStates.CONNECTED && - nodeInfo.Approval == models.NodeApprovals.APPROVED + if nodeInfo.NodeType != models.NodeTypeCompute { + return false + } + + if constraints.RequireApproval && nodeInfo.Approval != models.NodeApprovals.APPROVED { + return false + } + + if constraints.RequireConnected && nodeInfo.State != models.NodeStates.CONNECTED { + return false + } + + return true }) if len(nodeIDs) == 0 { diff --git a/pkg/orchestrator/types.go b/pkg/orchestrator/types.go index 1a5aad7cc8..5ce65b9501 100644 --- a/pkg/orchestrator/types.go +++ b/pkg/orchestrator/types.go @@ -78,3 +78,22 @@ func (r NodeRank) MarshalZerologObject(e *zerolog.Event) { type RetryRequest struct { JobID string } + +type NodeSelectionConstraint struct { + RequireConnected bool + RequireApproval bool +} + +type NodeSelectionOption func(*NodeSelectionConstraint) + +func WithConnected(required bool) NodeSelectionOption { + return func(c *NodeSelectionConstraint) { + c.RequireConnected = required + } +} + +func WithApproval(required bool) NodeSelectionOption { + return func(c *NodeSelectionConstraint) { + c.RequireApproval = required + } +}