Skip to content

Commit

Permalink
Abstract the definition and member method of the predicate status
Browse files Browse the repository at this point in the history
Signed-off-by: wangyang <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed Jul 12, 2023
1 parent 461742b commit 886e438
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 42 deletions.
14 changes: 8 additions & 6 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,19 @@ func (alloc *Action) Execute(ssn *framework.Session) {
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, reason)
}

predicateStatus, err := ssn.PredicateFn(task, node)
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
// Only nodes whose status is success after predicate filtering can be scheduled.
admitStatus := map[int]struct{}{
api.Success: {},

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
}
return nil, util.CheckPredicateStatus(predicateStatus, admitStatus)
return nil, nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down
17 changes: 9 additions & 8 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package backfill

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -74,20 +75,20 @@ func (backfill *Action) Execute(ssn *framework.Session) {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
// Only nodes whose status is success after predicate filtering can be scheduled.
admitStatus := map[int]struct{}{
api.Success: {},
}
predicateStatus, err := ssn.PredicateFn(task, node)
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}
err = util.CheckPredicateStatus(predicateStatus, admitStatus)
if err != nil {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
klog.V(3).Infof("err: %v", err)
fe.SetNodeError(node.Name, err)
continue
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,18 @@ func preempt(

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
admitStatus := map[int]struct{}{
api.Success: {},
api.Unschedulable: {},
}
predicateStatus, err := ssn.PredicateFn(task, node)
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
return nil, util.CheckPredicateStatus(predicateStatus, admitStatus)

if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)
Expand Down
18 changes: 7 additions & 11 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) {

assigned := false
for _, n := range ssn.Nodes {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
admitStatus := map[int]struct{}{
api.Success: {},
api.Unschedulable: {},
}
predicateStatus, err := ssn.PredicateFn(task, n)
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}
// If predicates failed, next node.
if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)

// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.",
task.Namespace, task.Name, n.Name)
continue
}

klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (nl *NodeLister) List() ([]*v1.Node, error) {
return nodes, nil
}

// The state of the k8s prefile is converted to the internal state of the volcano
// ConvertPredicateStatus return predicate status from k8sframework status
func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) {
internalStatus := &api.Status{}
if status.Code() == k8sframework.Success {
Expand Down
51 changes: 41 additions & 10 deletions pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,54 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
return predicateNodes, fe
}

func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error {
for _, status := range predicateStatus {
func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}

func NewPredicateHelper() PredicateHelper {
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
}

type PredicateStatus interface {
IsContainsUnschedulable() bool
IsContainsUnschedulableAndUnresolvable() bool
IsContainsErrorSkipOrWait() bool
}

type StatusSets []*api.Status

func (s StatusSets) ContainsUnschedulable() bool {
for _, status := range s {
if status == nil {
continue
}
if _, ok := admitStatus[status.Code]; !ok {
return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s",
status.Code, admitStatus, status.Reason)
if status.Code == api.Unschedulable {
return true
}
}
return nil
return false
}

func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool {
for _, status := range s {
if status == nil {
continue
}
if status.Code == api.UnschedulableAndUnresolvable {
return true
}
}
return false
}

func NewPredicateHelper() PredicateHelper {
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
func (s StatusSets) ContainsErrorSkipOrWait() bool {
for _, status := range s {
if status == nil {
continue
}
if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait {
return true
}
}
return false
}

0 comments on commit 886e438

Please sign in to comment.