Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split the predicate function to solve the resource filtering problem encountered during preemption #2818

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
predicateResourceFn is specially used to filter resource-related indi…
…cators, and predicateFn filters inherent properties of nodes

Signed-off-by: wangyang <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed Jun 2, 2023
commit 292ca12f5ce616a0e0900c48c45b7d632067c1c4
11 changes: 8 additions & 3 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return api.NewFitError(task, node, reason)
if err := ssn.PredicateResourceFn(task, node); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PredicateResource is just a resource comparation and do we have to make it as a session API.
It seems there is no need to implement the function with different ways in different plugins.

make it as a session API.

return err
}

return ssn.PredicateFn(task, node)
// Check for predicate
if err := ssn.PredicateFn(task, node); err != nil {
return err
}

return nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down
9 changes: 7 additions & 2 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ func (backfill *Action) Execute(ssn *framework.Session) {
// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
if err := ssn.PredicateResourceFn(task, node); err != nil {
klog.V(3).Infof("Predicate resource information failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

if err := ssn.PredicateFn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type JobEnqueuedFn func(interface{})
// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) error

// PredicateResourceFn is the func declaration used to predicate node resource information for task.
type PredicateResourceFn func(*TaskInfo, *NodeInfo) error

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ type PluginOption struct {
EnabledReclaimable *bool `yaml:"enableReclaimable"`
// EnabledQueueOrder defines whether queueOrderFn is enabled
EnabledQueueOrder *bool `yaml:"enableQueueOrder"`
// EnabledPredicate defines whether predicateFn is enabled
EnabledClusterOrder *bool `yaml:"EnabledClusterOrder"`
// EnableClusterOrder defines whether clusterOrderFn is enabled
EnabledClusterOrder *bool `yaml:"EnabledClusterOrder"`
// EnabledPredicate defines whether predicateFn is enabled
EnabledPredicate *bool `yaml:"enablePredicate"`
// EnabledPredicateResource defines whether predicateResourceFn is enabled
EnabledPredicateResource *bool `yaml:"enablePredicateResource"`
// EnabledBestNode defines whether bestNodeFn is enabled
EnabledBestNode *bool `yaml:"enableBestNode"`
// EnabledNodeOrder defines whether NodeOrderFn is enabled
Expand Down
104 changes: 53 additions & 51 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,33 @@ type Session struct {
Configurations []conf.Configuration
NodeList []*api.NodeInfo

plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
predicateResourceFns map[string]api.PredicateResourceFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
allocatableFns map[string]api.AllocatableFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
jobEnqueueableFns map[string]api.VoteFn
jobEnqueuedFns map[string]api.JobEnqueuedFn
targetJobFns map[string]api.TargetJobFn
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
}

func openSession(cache cache.Cache) *Session {
Expand All @@ -118,31 +119,32 @@ func openSession(cache cache.Cache) *Session {
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
predicateResourceFns: map[string]api.PredicateResourceFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down
27 changes: 27 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}

// AddPredicateResourceFn add Predicate Resource information function
func (ssn *Session) AddPredicateResourceFn(name string, pf api.PredicateResourceFn) {
ssn.predicateResourceFns[name] = pf
}

// AddPrePredicateFn add PrePredicate function
func (ssn *Session) AddPrePredicateFn(name string, pf api.PrePredicateFn) {
ssn.prePredicateFns[name] = pf
Expand Down Expand Up @@ -620,6 +625,28 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
return nil
}

// PredicateResourceFn invoke predicate resource function of the plugins
// Including resource information such as: CPU, Memory, extended resources such as: GPU, etc.
// Including the number of pods
func (ssn *Session) PredicateResourceFn(task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicateResource) {
continue
}
pfn, found := ssn.predicateResourceFns[plugin.Name]
if !found {
continue
}
err := pfn(task, node)
if err != nil {
return err
}
}
}
return nil
}

// PrePredicateFn invoke predicate function of the plugins
func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error {
for _, tier := range ssn.Tiers {
Expand Down