Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PredicateFn for allocate and preempt actions #2916

Merged
21 changes: 20 additions & 1 deletion docs/design/device-sharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@ type Devices interface {
//HasDeviceRequest checks if the 'pod' request this device
HasDeviceRequest(pod *v1.Pod) bool
//FiltreNode checks if the 'pod' fit in current node
FilterNode(pod *v1.Pod) (bool, error)
// The first return value represents the filtering result, and the value range is "0, 1, 2, 3"
// 0: Success
// Success means that plugin ran correctly and found pod schedulable.
// 1: Error
// Error is used for internal plugin errors, unexpected input, etc.
// 2: Unschedulable
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
// 3: UnschedulableAndUnresolvable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand Down
20 changes: 16 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package allocate

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -96,13 +97,24 @@ func (alloc *Action) Execute(ssn *framework.Session) {
pendingTasks := map[api.JobID]*util.PriorityQueue{}

allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyang0616 Whether it is enough to use *api.Status?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predicate plugin integrates multiple filter plug-ins of kube-scheduler. It is necessary to collect the filtering results of each plug-in and judge on the action side.

At present, it is necessary to return the list list. If there is a more elegant solution in the future, we can use it in optimize.

// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return api.NewFitError(task, node, reason)
return nil, api.NewFitError(task, node, reason)
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}

return ssn.PredicateFn(task, node)
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down Expand Up @@ -217,7 +229,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
Expand Down
18 changes: 16 additions & 2 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package backfill

import (
"fmt"
"time"

"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
Expand Down Expand Up @@ -72,13 +74,25 @@ func (backfill *Action) Execute(ssn *framework.Session) {
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
if err := ssn.PredicateFn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
// Only nodes whose status is success after predicate filtering can be scheduled.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please combine line #89 and #90

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to generate the errors structure. The log output has been updated.

task.Namespace, task.Name, node.Name)
klog.V(3).Infof("%v", err)
fe.SetNodeError(node.Name, err)
continue
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
Expand Down
21 changes: 18 additions & 3 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,28 @@ func preempt(
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false

allNodes := ssn.NodeList

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn, true)

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}

if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

Expand Down
13 changes: 11 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) {

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
if err := ssn.PredicateFn(task, n); err != nil {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}

// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.",
task.Namespace, task.Name, n.Name)
continue
}
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/api/devices/nvidia/gpushare/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gpushare

import (
"context"
"fmt"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -146,24 +148,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name)
if GpuSharingEnable {
fit, err := checkNodeGPUSharingPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuShare %s", err.Error()), err
}
}
if GpuNumberEnable {
fit, err := checkNodeGPUNumberPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuNumber %s", err.Error()), err
}
}
klog.V(4).Infoln("DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vgpu

import (
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -177,17 +179,17 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(3).Infoln("4pdvgpuDeviceSharing:Into FitInPod", pod.Name)
if VGPUEnable {
fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err
}
}
klog.V(3).Infoln("4pdvgpu DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
38 changes: 38 additions & 0 deletions pkg/scheduler/api/devices/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package devices

// These are predefined codes used in a Status.
const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These same status are defined in device package and api package. Is it possible to combine two to one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Volcano API package references the devices package. If the status definition in the API is used, cyclic dependency occurs.

// Success means that plugin ran correctly and found pod schedulable.
// NOTE: A nil status is also considered as "Success".
Success int = iota
// Error is used for internal plugin errors, unexpected input, etc.
Error
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
Skip
)
20 changes: 19 additions & 1 deletion pkg/scheduler/api/shared_device_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,25 @@ type Devices interface {
//HasDeviceRequest checks if the 'pod' request this device
HasDeviceRequest(pod *v1.Pod) bool
//FiltreNode checks if the 'pod' fit in current node
FilterNode(pod *v1.Pod) (bool, error)
// The first return value represents the filtering result, and the value range is "0, 1, 2, 3"
// 0: Success
// Success means that plugin ran correctly and found pod schedulable.

// 1: Error
// Error is used for internal plugin errors, unexpected input, etc.

// 2: Unschedulable
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.

// 3: UnschedulableAndUnresolvable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand Down
30 changes: 29 additions & 1 deletion pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,34 @@ type ValidateResult struct {
Message string
}

// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
// NOTE: A nil status is also considered as "Success".
Success int = iota
// Error is used for internal plugin errors, unexpected input, etc.
Error
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
Skip
)

type Status struct {
Code int
Reason string
}

// ValidateExFn is the func declaration used to validate the result.
type ValidateExFn func(interface{}) *ValidateResult

Expand All @@ -134,7 +162,7 @@ type VoteFn func(interface{}) int
type JobEnqueuedFn func(interface{})

// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) error
type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error)

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicate) {
Expand All @@ -611,13 +612,14 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
if !found {
continue
}
err := pfn(task, node)
status, err := pfn(task, node)
predicateStatus = append(predicateStatus, status...)
if err != nil {
return err
return predicateStatus, err
}
}
}
return nil
return predicateStatus, nil
}

// PrePredicateFn invoke predicate function of the plugins
Expand Down
Loading