Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick for release-1.9]Support preempting BestEffort pods when the pods number of nodes reaches the upper limit #3338

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 112 additions & 46 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,62 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
pendingTasks := backfill.pickUpPendingTasks(ssn)
for _, task := range pendingTasks {
job := ssn.Jobs[task.Job]
ph := util.NewPredicateHelper()
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
// TODO (k82cn): backfill for other case.
}
}

func (backfill *Action) UnInitialize() {}

func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskInfo {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobs := map[api.QueueID]*util.PriorityQueue{}
tasks := map[api.JobID]*util.PriorityQueue{}
var pendingTasks []*api.TaskInfo
for _, job := range ssn.Jobs {
if job.IsPending() {
continue
Expand All @@ -70,55 +126,65 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()
queue, found := ssn.Queues[job.Queue]
if !found {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
if !task.BestEffort {
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

for _, task := range job.TaskStatusIndex[api.Pipelined] {
if !task.BestEffort {
continue
}

stmt := framework.NewStatement(ssn)
err := stmt.UnPipeline(task)
if err != nil {
klog.Errorf("Failed to unpipeline task: %s", err.Error())
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

if _, existed := tasks[job.UID]; !existed {
continue
}

if _, existed := jobs[queue.UID]; !existed {
queues.Push(queue)
jobs[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
jobs[job.Queue].Push(job)
}

for !queues.Empty() {
queue, ok := queues.Pop().(*api.QueueInfo)
if !ok {
klog.V(3).Infof("QueueInfo transition failed, ignore it.")
continue
}
for !jobs[queue.UID].Empty() {
job, ok := jobs[queue.UID].Pop().(*api.JobInfo)
if !ok {
klog.Errorf("JobInfo transition failed, ignore it.")
continue
}
for !tasks[job.UID].Empty() {
pendingTasks = append(pendingTasks, tasks[job.UID].Pop().(*api.TaskInfo))
}
// TODO (k82cn): backfill for other case.
}
}
return pendingTasks
}

func (backfill *Action) UnInitialize() {}
162 changes: 162 additions & 0 deletions pkg/scheduler/actions/backfill/backfill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package backfill

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestPickUpPendingTasks(t *testing.T) {
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("drf", drf.New)
trueValue := true
tilers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "priority",
EnabledPreemptable: &trueValue,
EnabledTaskOrder: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "drf",
EnabledQueueOrder: &trueValue,
},
},
},
}

priority4, priority3, priority2, priority1 := int32(4), int32(3), int32(2), int32(1)

testCases := []struct {
name string
pipelinedPods []*v1.Pod
pendingPods []*v1.Pod
queues []*schedulingv1beta1.Queue
podGroups []*schedulingv1beta1.PodGroup
PriorityClasses map[string]*schedulingapi.PriorityClass
expectedResult []string
}{
{
name: "test",
pendingPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-1", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-besteffort-task-3", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority3),

util.BuildPodWithPriority("default", "pg2-besteffort-task-1", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-besteffort-task-3", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority3),
},
pipelinedPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-2", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-besteffort-task-4", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority4),

util.BuildPodWithPriority("default", "pg2-besteffort-task-2", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-besteffort-task-4", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority4),
},
queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
podGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-1"),
util.BuildPodGroupWithPrio("pg2", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-2"),
},
PriorityClasses: map[string]*schedulingapi.PriorityClass{
"job-priority-1": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-1",
},
Value: 1,
},
"job-priority-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-2",
},
Value: 2,
},
},

expectedResult: []string{
"pg2-besteffort-task-4",
"pg2-besteffort-task-3",
"pg2-besteffort-task-2",
"pg2-besteffort-task-1",
"pg1-besteffort-task-4",
"pg1-besteffort-task-3",
"pg1-besteffort-task-2",
"pg1-besteffort-task-1",
},
},
}

for _, tc := range testCases {
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: nil,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: &util.FakeVolumeBinder{},
Recorder: record.NewFakeRecorder(100),
PriorityClasses: tc.PriorityClasses,
}

for _, q := range tc.queues {
schedulerCache.AddQueueV1beta1(q)
}

for _, ss := range tc.podGroups {
schedulerCache.AddPodGroupV1beta1(ss)
}

for _, pod := range tc.pendingPods {
schedulerCache.AddPod(pod)
}

for _, pod := range tc.pipelinedPods {
schedulerCache.AddPod(pod)
}

ssn := framework.OpenSession(schedulerCache, tilers, []conf.Configuration{})
for _, pod := range tc.pipelinedPods {
jobID := api.NewTaskInfo(pod).Job
stmt := framework.NewStatement(ssn)
task, found := ssn.Jobs[jobID].Tasks[api.PodKey(pod)]
if found {
stmt.Pipeline(task, "node1")
}
}

tasks := New().pickUpPendingTasks(ssn)
var actualResult []string
for _, task := range tasks {
actualResult = append(actualResult, task.Name)
}

if !assert.Equal(t, tc.expectedResult, actualResult) {
t.Errorf("unexpected test; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedResult, actualResult)
}
}
}
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
if !task.Preemptable {
Expand Down Expand Up @@ -172,8 +172,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
// Preempt tasks within job.
Expand Down
Loading
Loading