Skip to content

Commit

Permalink
Transform the predicate interface to support the outer judgment of al…
Browse files Browse the repository at this point in the history
…locate and preempt at the same time

Signed-off-by: wangyang <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed Jun 15, 2023
1 parent 927be8c commit 39d87b0
Show file tree
Hide file tree
Showing 22 changed files with 451 additions and 125 deletions.
21 changes: 20 additions & 1 deletion docs/design/device-sharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@ type Devices interface {
//HasDeviceRequest checks if the 'pod' request this device
HasDeviceRequest(pod *v1.Pod) bool
//FiltreNode checks if the 'pod' fit in current node
FilterNode(pod *v1.Pod) (bool, error)
// The first return value represents the filtering result, and the value range is "0, 1, 2, 3"
// 0: Success
// Success means that plugin ran correctly and found pod schedulable.
// 1: Error
// Error is used for internal plugin errors, unexpected input, etc.
// 2: Unschedulable
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
// 3: UnschedulableAndUnresolvable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand Down
51 changes: 41 additions & 10 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package allocate

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -96,13 +97,22 @@ func (alloc *Action) Execute(ssn *framework.Session) {
pendingTasks := map[api.JobID]*util.PriorityQueue{}

allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return api.NewFitError(task, node, reason)
return nil, api.NewFitError(task, node, reason)
}
predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}
for _, status := range predicateStatus {
if status != nil && status.Code != api.Success {
return nil, api.NewFitError(task, node, status.Reason)
}
}

return ssn.PredicateFn(task, node)
return nil, nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down Expand Up @@ -167,13 +177,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
if err := prePredicateforAllocate(ssn, task, allNodes, job); err != nil {
break
}

Expand Down Expand Up @@ -250,4 +254,31 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
}

func prePredicateforAllocate(ssn *framework.Session, task *api.TaskInfo, allNodes []*api.NodeInfo, job *api.JobInfo) error {
prePredicateStatus, err := ssn.PrePredicateFn(task)
if err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}

for _, status := range prePredicateStatus {
if status != nil && status.Code == api.Success {
klog.V(3).Infof("PrePredicate for task %s/%s failed, Code is %d, reason is %s",
task.Namespace, task.Name, status.Code, status.Reason)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
return fmt.Errorf("PrePredicate for task %s/%s, %s", task.Namespace, task.Name, status.Reason)
}
}
return nil
}

func (alloc *Action) UnInitialize() {}
61 changes: 48 additions & 13 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package backfill

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -56,26 +57,20 @@ func (backfill *Action) Execute(ssn *framework.Session) {
for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
if err := prePredicateforBackfill(ssn, task, job); err != nil {
klog.V(3).Infof("backfill %s", err.Error())
continue
}

fe := api.NewFitErrors()
// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
if err := ssn.PredicateFn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
err := predicateforBackfill(ssn, task, node, fe)
if err != nil {
klog.V(3).Infof("backfill %s", err.Error())
continue
}

Expand All @@ -101,4 +96,44 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}
}

func predicateforBackfill(ssn *framework.Session, task *api.TaskInfo, node *api.NodeInfo, fe *api.FitErrors) error {
predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
fe.SetNodeError(node.Name, err)
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
for _, status := range predicateStatus {
if status != nil && status.Code != api.Success {
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %s",
task.Namespace, task.Name, node.Name, status.Reason)
}
}
return nil
}

func prePredicateforBackfill(ssn *framework.Session, task *api.TaskInfo, job *api.JobInfo) error {
prePredicateStatus, err := ssn.PrePredicateFn(task)
if err != nil {
fitErrors := api.NewFitErrors()
for _, ni := range ssn.Nodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}

for _, status := range prePredicateStatus {
if status != nil && status.Code != api.Success {
fitErrors := api.NewFitErrors()
for _, ni := range ssn.Nodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
return fmt.Errorf("PrePredicate for task %s/%s failed, %s", task.Namespace, task.Name, status.Reason)
}
}
return nil
}

func (backfill *Action) UnInitialize() {}
27 changes: 23 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,32 @@ func preempt(
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false

allNodes := ssn.NodeList

if err := ssn.PrePredicateFn(preemptor); err != nil {
prePredicateStatus, err := ssn.PrePredicateFn(preemptor)
if err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn, true)
for _, status := range prePredicateStatus {
if status != nil && status.Code != api.Success && status.Code != api.Unschedulable {
return false, fmt.Errorf("PrePredicate for task %s/%s failed, %v", preemptor.Namespace, preemptor.Name, status.Reason)
}
}

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}
for _, status := range predicateStatus {
if status != nil && status.Code != api.Success && status.Code != api.Unschedulable {
return nil, api.NewFitError(task, node, status.Reason)
}
}

return nil, nil
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

Expand Down
38 changes: 35 additions & 3 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package reclaim

import (
"fmt"

"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
Expand Down Expand Up @@ -116,15 +118,16 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
if err := prePredicateforReclaim(ssn, task); err != nil {
klog.V(3).Infof("reclaim %s", err.Error())
continue
}

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
if err := ssn.PredicateFn(task, n); err != nil {
if err := predicateforReclaim(ssn, task, n); err != nil {
klog.V(3).Infof("reclaim %s", err.Error())
continue
}

Expand Down Expand Up @@ -207,5 +210,34 @@ func (ra *Action) Execute(ssn *framework.Session) {
}
}

func predicateforReclaim(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo) error {
predicateStatus, err := ssn.PredicateFn(task, n)
if err != nil {
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
}
for _, status := range predicateStatus {
if status != nil && status.Code != api.Success && status.Code != api.Unschedulable {
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, status.Reason)
}
}
return nil
}

func prePredicateforReclaim(ssn *framework.Session, task *api.TaskInfo) error {
prePredicateStatus, err := ssn.PrePredicateFn(task)
if err != nil {
return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}

for _, status := range prePredicateStatus {
if status != nil && status.Code != api.Success && status.Code != api.Unschedulable {
return fmt.Errorf("PrePredicate for task %s/%s failed, %v", task.Namespace, task.Name, status.Reason)
}
}
return nil
}

func (ra *Action) UnInitialize() {
}
14 changes: 8 additions & 6 deletions pkg/scheduler/api/devices/nvidia/gpushare/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gpushare

import (
"context"
"fmt"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -146,24 +148,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name)
if GpuSharingEnable {
fit, err := checkNodeGPUSharingPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuShare %s", err.Error()), err
}
}
if GpuNumberEnable {
fit, err := checkNodeGPUNumberPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuNumber %s", err.Error()), err
}
}
klog.V(4).Infoln("DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vgpu

import (
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -177,17 +179,17 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(3).Infoln("4pdvgpuDeviceSharing:Into FitInPod", pod.Name)
if VGPUEnable {
fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err
}
}
klog.V(3).Infoln("4pdvgpu DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
Loading

0 comments on commit 39d87b0

Please sign in to comment.