Skip to content

Commit

Permalink
Allow node selection to specify whether nodes must be connected/approved
Browse files Browse the repository at this point in the history
  • Loading branch information
rossjones committed Apr 9, 2024
1 parent bb8a8cd commit 4c55e38
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 19 deletions.
6 changes: 4 additions & 2 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ type NodeRanker interface {
type NodeSelector interface {
// AllNodes returns all nodes in the network.
AllNodes(ctx context.Context) ([]models.NodeInfo, error)

// AllMatchingNodes returns all nodes that match the job constrains and selection criteria.
AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error)
AllMatchingNodes(ctx context.Context, job *models.Job, options ...NodeSelectionOption) ([]models.NodeInfo, error)

// TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints
// ordered in descending order based on their rank, or error if not enough nodes match.
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error)
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, options ...NodeSelectionOption) ([]models.NodeInfo, error)
}

type RetryStrategy interface {
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,14 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
selectedNodes, err := b.nodeSelector.TopMatchingNodes(ctx, job, len(execs))
// TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
orchestrator.WithApproval(false),
orchestrator.WithConnected(false),
)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (b *DaemonJobScheduler) Process(ctx context.Context, evaluation *models.Eva
func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)

// Require approval when selecting nodes, but do not require them to be connected.
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job, orchestrator.WithApproval(true), orchestrator.WithConnected(false))
if err != nil {
return newExecs, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/scheduler/ops_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evalua
func (b *OpsJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan) (execSet, error) {
newExecs := execSet{}
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job, orchestrator.WithApproval(true), orchestrator.WithConnected(false))
if err != nil {
return newExecs, err
}
Expand Down
46 changes: 36 additions & 10 deletions pkg/orchestrator/selection/selector/node_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,26 @@ func (n NodeSelector) AllNodes(ctx context.Context) ([]models.NodeInfo, error) {
return n.nodeDiscoverer.ListNodes(ctx)
}

func (n NodeSelector) AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error) {
filteredNodes, _, err := n.rankAndFilterNodes(ctx, job)
func (n NodeSelector) AllMatchingNodes(ctx context.Context,
job *models.Job,
options ...orchestrator.NodeSelectionOption) ([]models.NodeInfo, error) {
filteredNodes, _, err := n.rankAndFilterNodes(ctx, job, options...)
if err != nil {
return nil, err
}

nodeInfos := generic.Map(filteredNodes, func(nr orchestrator.NodeRank) models.NodeInfo { return nr.NodeInfo })
return nodeInfos, nil
}
func (n NodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error) {
possibleNodes, rejectedNodes, err := n.rankAndFilterNodes(ctx, job)

func (n NodeSelector) TopMatchingNodes(ctx context.Context,
job *models.Job, desiredCount int,
options ...orchestrator.NodeSelectionOption) ([]models.NodeInfo, error) {
possibleNodes, rejectedNodes, err := n.rankAndFilterNodes(ctx, job, options...)
if err != nil {
return nil, err
}

if len(possibleNodes) < desiredCount {
// TODO: evaluate if we should run the job if some nodes where found
err = orchestrator.NewErrNotEnoughNodes(desiredCount, append(possibleNodes, rejectedNodes...))
Expand All @@ -64,18 +70,38 @@ func (n NodeSelector) TopMatchingNodes(ctx context.Context, job *models.Job, des
return selectedInfos, nil
}

func (n NodeSelector) rankAndFilterNodes(ctx context.Context, job *models.Job) (selected, rejected []orchestrator.NodeRank, err error) {
func (n NodeSelector) rankAndFilterNodes(ctx context.Context,
job *models.Job,
options ...orchestrator.NodeSelectionOption) (selected, rejected []orchestrator.NodeRank, err error) {
listed, err := n.nodeDiscoverer.ListNodes(ctx)
if err != nil {
return nil, nil, err
}

// Apply constraints on the state of the nodes we want to select, but allow
// the caller to override them.
constraints := &orchestrator.NodeSelectionConstraint{
RequireApproval: true,
RequireConnected: true,
}
for _, opt := range options {
opt(constraints)
}

nodeIDs := lo.Filter(listed, func(nodeInfo models.NodeInfo, index int) bool {
// Filter out nodes that are not compute nodes, or nodes that are not currently
// connected or approved. We only want to consider nodes that are ready to run jobs.
return nodeInfo.NodeType == models.NodeTypeCompute &&
nodeInfo.State == models.NodeStates.CONNECTED &&
nodeInfo.Approval == models.NodeApprovals.APPROVED
if nodeInfo.NodeType != models.NodeTypeCompute {
return false
}

if constraints.RequireApproval && nodeInfo.Approval != models.NodeApprovals.APPROVED {
return false
}

if constraints.RequireConnected && nodeInfo.State != models.NodeStates.CONNECTED {
return false
}

return true
})

if len(nodeIDs) == 0 {
Expand Down
19 changes: 19 additions & 0 deletions pkg/orchestrator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,22 @@ func (r NodeRank) MarshalZerologObject(e *zerolog.Event) {
type RetryRequest struct {
JobID string
}

type NodeSelectionConstraint struct {
RequireConnected bool
RequireApproval bool
}

type NodeSelectionOption func(*NodeSelectionConstraint)

func WithConnected(required bool) NodeSelectionOption {
return func(c *NodeSelectionConstraint) {
c.RequireConnected = required
}
}

func WithApproval(required bool) NodeSelectionOption {
return func(c *NodeSelectionConstraint) {
c.RequireApproval = required
}
}

0 comments on commit 4c55e38

Please sign in to comment.