Skip to content

Commit

Permalink
The part of the fixed resource filtering check in the plugin is migra…
Browse files Browse the repository at this point in the history
…ted from predicateFn to predicateResoureFn

Signed-off-by: wangyang <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed May 5, 2023
1 parent 8d5d61b commit 71385c9
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 45 deletions.
45 changes: 45 additions & 0 deletions docs/design/predicate-plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## Introduction

Before scheduling and scoring, filter out the nodes that do not meet the requirements. The filter items mainly include: whether the node is available, port, stain, node affinity, container affinity, volume, container topology, idle resources, etc.

## Design
- PrePredicate
It mainly corresponds to the Prefilter stage of kube-scheduler. The current filter content is:
- nodePort
- podAffinity
- podTopologySpread

- Predicate
It mainly corresponds to the Filter stage of kube-scheduler. The current filtering content is:
- nodeUnscheduler
- nodeAffinity
- taintToleration
- nodePort
- podAffinity
- nodeVolumeLimitsCSI
- volumeZone
- podTopologySpread

- PredicateResource
The current function is an independent function disassembled from the predicate, which is responsible for filtering the content related to the idle resources of the node, such as: the number of pods, CPU, Memory, user-defined resources, etc.

Because both allocation and preemption (preempt and reclaim) require predicate operations, if resource filtering is also processed in the predicate function, the allocation and preemption actions are incompatible, see [#2739](https://github.com /volcano-sh/volcano/issues/2739). After the disassembly process, the allocation action (allocate) judges the Predicate and PredicateResource, and the preemption action (preempt, reclaim) judges the Predicate.

Filter content mainly includes:
- podNumber
- CPU
- Memory
- ScalerResources

## Recommend practice
Configure the predicate plugin by:
```
actions: "enqueue, allocate, preempt, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: predicate
enablePredicate: false
```
`enablePredicate` is the switch of the predicate plug-in, the default is true, you can turn off the filtering function of the predicate plug-in according to the above method. `PrePredicate`, `Predicate` and `PredicateResource` are simultaneously controlled by the `enablePredicate` switch.
2 changes: 0 additions & 2 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ type PluginOption struct {
EnabledClusterOrder *bool `yaml:"EnabledClusterOrder"`
// EnabledPredicate defines whether predicateFn is enabled
EnabledPredicate *bool `yaml:"enablePredicate"`
// EnabledPredicateResource defines whether predicateResourceFn is enabled
EnabledPredicateResource *bool `yaml:"enablePredicateResource"`
// EnabledBestNode defines whether bestNodeFn is enabled
EnabledBestNode *bool `yaml:"enableBestNode"`
// EnabledNodeOrder defines whether NodeOrderFn is enabled
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
func (ssn *Session) PredicateResourceFn(task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicateResource) {
if !isEnabled(plugin.EnabledPredicate) {
continue
}
pfn, found := ssn.predicateResourceFns[plugin.Name]
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/plugins/extender/argument.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type PredicateResponse struct {
ErrorMessage string `json:"errorMessage"`
}

type PredicateResourceResponse struct {
ErrorMessage string `json:"errorMessage"`
}

type PrioritizeRequest struct {
Task *api.TaskInfo `json:"task"`
Nodes []*api.NodeInfo `json:"nodes"`
Expand Down
49 changes: 37 additions & 12 deletions pkg/scheduler/plugins/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
ExtenderOnSessionCloseVerb = "extender.onSessionCloseVerb"
// ExtenderPredicateVerb is the verb of Predicate method
ExtenderPredicateVerb = "extender.predicateVerb"
// ExtenderPredicateResourceVerb is the verb of Predicate resource method
ExtenderPredicateResourceVerb = "extender.predicateResourceVerb"
// ExtenderPrioritizeVerb is the verb of Prioritize method
ExtenderPrioritizeVerb = "extender.prioritizeVerb"
// ExtenderPreemptableVerb is the verb of Preemptable method
Expand All @@ -63,18 +65,19 @@ const (
)

type extenderConfig struct {
urlPrefix string
httpTimeout time.Duration
onSessionOpenVerb string
onSessionCloseVerb string
predicateVerb string
prioritizeVerb string
preemptableVerb string
reclaimableVerb string
queueOverusedVerb string
jobEnqueueableVerb string
jobReadyVerb string
ignorable bool
urlPrefix string
httpTimeout time.Duration
onSessionOpenVerb string
onSessionCloseVerb string
predicateVerb string
predicateResourceVerb string
prioritizeVerb string
preemptableVerb string
reclaimableVerb string
queueOverusedVerb string
jobEnqueueableVerb string
jobReadyVerb string
ignorable bool
}

type extenderPlugin struct {
Expand All @@ -100,6 +103,7 @@ func parseExtenderConfig(arguments framework.Arguments) *extenderConfig {
extender.onSessionOpenVerb: onSessionOpen
extender.onSessionCloseVerb: onSessionClose
extender.predicateVerb: predicate
extender.predicateResourceVerb: predicateResource
extender.prioritizeVerb: prioritize
extender.preemptableVerb: preemptable
extender.reclaimableVerb: reclaimable
Expand All @@ -114,6 +118,7 @@ func parseExtenderConfig(arguments framework.Arguments) *extenderConfig {
ec.onSessionOpenVerb, _ = arguments[ExtenderOnSessionOpenVerb].(string)
ec.onSessionCloseVerb, _ = arguments[ExtenderOnSessionCloseVerb].(string)
ec.predicateVerb, _ = arguments[ExtenderPredicateVerb].(string)
ec.predicateResourceVerb, _ = arguments[ExtenderPredicateResourceVerb].(string)
ec.prioritizeVerb, _ = arguments[ExtenderPrioritizeVerb].(string)
ec.preemptableVerb, _ = arguments[ExtenderPreemptableVerb].(string)
ec.reclaimableVerb, _ = arguments[ExtenderReclaimableVerb].(string)
Expand Down Expand Up @@ -180,6 +185,26 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
})
}

if ep.config.predicateResourceVerb != "" {
ssn.AddPredicateResourceFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
resp := &PredicateResourceResponse{}
err := ep.send(ep.config.predicateResourceVerb, &PredicateRequest{Task: task, Node: node}, resp)
if err != nil {
klog.Warningf("Predicate resource failed with error %v", err)

if ep.config.ignorable {
return nil
}
return err
}

if resp.ErrorMessage == "" {
return nil
}
return errors.New(resp.ErrorMessage)
})
}

if ep.config.prioritizeVerb != "" {
ssn.AddBatchNodeOrderFn(ep.Name(), func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
resp := &PrioritizeResponse{}
Expand Down
39 changes: 29 additions & 10 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,44 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return nil
})

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
ssn.AddPredicateResourceFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
return fmt.Errorf("failed to predicates, node info for %s not found", node.Name)
return fmt.Errorf("failed to predicate resource, node info for %s not found", node.Name)
}

if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
klog.V(4).Infof("NodeFutureIdle resource predicates Task <%s/%s> on Node <%s> failed",
task.Namespace, task.Name, node.Name)
return api.NewFitError(task, node, reason)
}

if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) {
klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed",
klog.V(4).Infof("NodePodNumber resource predicates Task <%s/%s> on Node <%s> failed",
task.Namespace, task.Name, node.Name)
return api.NewFitError(task, node, api.NodePodNumberExceeded)
}

if gpuDevice, ok := node.Others[api.GPUSharingDevice].(api.Devices); ok {
_, err := gpuDevice.FilterNode(task.Pod)
if err != nil {
klog.V(4).Infof("gpuSharingDevice resource predicates Task <%s/%s> on Node <%s> failed",
task.Namespace, task.Name, node.Name)
return api.NewFitError(task, node, err.Error())
}
} else {
klog.Warning("GpuShare Device assertion conversion failed, skip GpuShare related checks")
}

return nil
})

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
return fmt.Errorf("failed to predicates, node info for %s not found", node.Name)
}

predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) {
// CheckNodeUnschedulable
status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand Down Expand Up @@ -395,13 +421,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}

fit, err = node.Others[api.GPUSharingDevice].(api.Devices).FilterNode(task.Pod)
if err != nil {
return err
}
klog.V(4).Infof("checkNodeGPUPredicate predicates Task <%s/%s> on Node <%s>: fit %v",
task.Namespace, task.Name, node.Name, fit)

if predicate.proportionalEnable {
// Check ProportionalPredicate
fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional)
Expand Down
75 changes: 61 additions & 14 deletions pkg/scheduler/plugins/predicates/predicates_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package predicates

import (
"context"
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/spf13/pflag"

apiv1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"

schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/scheduler/actions/allocate"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
Expand Down Expand Up @@ -55,24 +61,54 @@ func TestEventHandler(t *testing.T) {
})
defer patches.Reset()

patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error {
return nil
})
defer patchUpdateQueueStatus.Reset()

interpodaffinityNewPatch := gomonkey.ApplyFunc(interpodaffinity.New, func(plArgs runtime.Object, h k8sframework.Handle) (k8sframework.Plugin, error) {
return &interpodaffinity.InterPodAffinity{}, nil
})
defer interpodaffinityNewPatch.Reset()
podAffinityFilter := &interpodaffinity.InterPodAffinity{}
interpodaffinityPreFilterPatch := gomonkey.ApplyMethod(reflect.TypeOf(podAffinityFilter), "PreFilter", func(_ *interpodaffinity.InterPodAffinity, ctx context.Context, cycleState *k8sframework.CycleState, pod *apiv1.Pod) (*k8sframework.PreFilterResult, *k8sframework.Status) {
return nil, nil
})
defer interpodaffinityPreFilterPatch.Reset()
interpodaffinityFilterPatch := gomonkey.ApplyMethod(reflect.TypeOf(podAffinityFilter), "Filter", func(_ *interpodaffinity.InterPodAffinity, ctx context.Context, cycleState *k8sframework.CycleState, pod *apiv1.Pod, nodeInfo *k8sframework.NodeInfo) *k8sframework.Status {
return nil
})
defer interpodaffinityFilterPatch.Reset()

nodevolumelimitsNewCSIPatch := gomonkey.ApplyFunc(nodevolumelimits.NewCSI, func(_ runtime.Object, handle k8sframework.Handle, fts feature.Features) (k8sframework.Plugin, error) {
return &nodevolumelimits.CSILimits{}, nil
})
defer nodevolumelimitsNewCSIPatch.Reset()
volumezoneNewPatch := gomonkey.ApplyFunc(volumezone.New, func(_ runtime.Object, handle k8sframework.Handle) (k8sframework.Plugin, error) {
return &volumezone.VolumeZone{}, nil
})
defer volumezoneNewPatch.Reset()

podtopologyspreadNewPatch := gomonkey.ApplyFunc(podtopologyspread.New, func(plArgs runtime.Object, h k8sframework.Handle, fts feature.Features) (k8sframework.Plugin, error) {
return &podtopologyspread.PodTopologySpread{}, nil
})
defer podtopologyspreadNewPatch.Reset()
podTopologySpreadFilter := &podtopologyspread.PodTopologySpread{}
podtopologyspreadPreFilterPatch := gomonkey.ApplyMethod(reflect.TypeOf(podTopologySpreadFilter), "PreFilter", func(_ *podtopologyspread.PodTopologySpread, ctx context.Context, cycleState *k8sframework.CycleState, pod *apiv1.Pod) (*k8sframework.PreFilterResult, *k8sframework.Status) {
return nil, nil
})
defer podtopologyspreadPreFilterPatch.Reset()
podtopologyspreadFilterPatch := gomonkey.ApplyMethod(reflect.TypeOf(podTopologySpreadFilter), "Filter", func(_ *podtopologyspread.PodTopologySpread, ctx context.Context, cycleState *k8sframework.CycleState, pod *apiv1.Pod, nodeInfo *k8sframework.NodeInfo) *k8sframework.Status {
return nil
})
defer podtopologyspreadFilterPatch.Reset()

framework.RegisterPluginBuilder(PluginName, New)
framework.RegisterPluginBuilder(gang.PluginName, gang.New)
framework.RegisterPluginBuilder(priority.PluginName, priority.New)
options.ServerOpts = options.NewServerOption()
defer framework.CleanupPluginBuilders()

option := options.NewServerOption()
option.AddFlags(pflag.CommandLine)
option.RegisterOptions()

config, err := kube.BuildConfig(option.KubeClientOptions)
if err != nil {
return
}

sc := cache.New(config, option.SchedulerNames, option.DefaultQueue, option.NodeSelector)
schedulerCache := sc.(*cache.SchedulerCache)

// pending pods
w1 := util.BuildPod("ns1", "worker-1", "", apiv1.PodPending, util.BuildResourceList("3", "3k"), "pg1", map[string]string{"role": "worker"}, map[string]string{"selector": "worker"})
w2 := util.BuildPod("ns1", "worker-2", "", apiv1.PodPending, util.BuildResourceList("5", "5k"), "pg1", map[string]string{"role": "worker"}, map[string]string{})
Expand Down Expand Up @@ -156,6 +192,17 @@ func TestEventHandler(t *testing.T) {
t.Logf("%s: [Event] %s", test.name, event)
}
}()

schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
PriorityClasses: make(map[string]*schedulingv1.PriorityClass),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: binder,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: &util.FakeVolumeBinder{},
Recorder: recorder,
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/plugins/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,20 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("Threshold arguments :%v", argsValue)
}

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
predicateResourceFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
for period, value := range up.threshold.cpuUsageAvg {
klog.V(4).Infof("predicateFn cpuUsageAvg:%v", up.threshold.cpuUsageAvg)
klog.V(4).Infof("predicateResourceFn cpuUsageAvg:%v", up.threshold.cpuUsageAvg)
if node.ResourceUsage.CPUUsageAvg[period] > value {
msg := fmt.Sprintf("Node %s cpu usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.CPUUsageAvg[period], value)
return fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg)
return fmt.Errorf("plugin %s predicates resource failed %s", up.Name(), msg)
}
}

for period, value := range up.threshold.memUsageAvg {
klog.V(4).Infof("predicateFn memUsageAvg:%v", up.threshold.memUsageAvg)
klog.V(4).Infof("predicateResourceFn memUsageAvg:%v", up.threshold.memUsageAvg)
if node.ResourceUsage.MEMUsageAvg[period] > value {
msg := fmt.Sprintf("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[period], value)
return fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg)
return fmt.Errorf("plugin %s memory usage predicates resource failed %s", up.Name(), msg)
}
}
klog.V(4).Infof("Usage plugin filter for task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name)
Expand All @@ -157,7 +157,7 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) {
return score, nil
}

ssn.AddPredicateFn(up.Name(), predicateFn)
ssn.AddPredicateResourceFn(up.Name(), predicateResourceFn)
ssn.AddNodeOrderFn(up.Name(), nodeOrderFn)
}

Expand Down

0 comments on commit 71385c9

Please sign in to comment.