Skip to content

Commit

Permalink
Support preempting BestEffort pods when the pods number of nodes reac…
Browse files Browse the repository at this point in the history
…hes the upper limit

Signed-off-by: lili <lili_9309@163.com>
  • Loading branch information
Lily922 committed Mar 8, 2024
1 parent e757ed7 commit 27363e2
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 84 deletions.
158 changes: 112 additions & 46 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,62 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
pendingTasks := backfill.pickUpPendingTasks(ssn)
for _, task := range pendingTasks {
job := ssn.Jobs[task.Job]
ph := util.NewPredicateHelper()
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
// TODO (k82cn): backfill for other case.
}
}

func (backfill *Action) UnInitialize() {}

func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskInfo {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobs := map[api.QueueID]*util.PriorityQueue{}
tasks := map[api.JobID]*util.PriorityQueue{}
var pendingTasks []*api.TaskInfo
for _, job := range ssn.Jobs {
if job.IsPending() {
continue
Expand All @@ -70,55 +126,65 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()
queue, found := ssn.Queues[job.Queue]
if !found {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
if !task.BestEffort {
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

for _, task := range job.TaskStatusIndex[api.Pipelined] {
if !task.BestEffort {
continue
}

stmt := framework.NewStatement(ssn)
err := stmt.UnPipeline(task)
if err != nil {
klog.Errorf("Failed to unpipeline task: %s", err.Error())
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

if _, existed := tasks[job.UID]; !existed {
continue
}

if _, existed := jobs[queue.UID]; !existed {
queues.Push(queue)
jobs[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
jobs[job.Queue].Push(job)
}

for !queues.Empty() {
queue, ok := queues.Pop().(*api.QueueInfo)
if !ok {
klog.V(3).Infof("QueueInfo transition failed, ignore it.")
continue
}
for !jobs[queue.UID].Empty() {
job, ok := jobs[queue.UID].Pop().(*api.JobInfo)
if !ok {
klog.Errorf("JobInfo transition failed, ignore it.")
continue
}
for !tasks[job.UID].Empty() {
pendingTasks = append(pendingTasks, tasks[job.UID].Pop().(*api.TaskInfo))
}
// TODO (k82cn): backfill for other case.
}
}
return pendingTasks
}

func (backfill *Action) UnInitialize() {}
162 changes: 162 additions & 0 deletions pkg/scheduler/actions/backfill/backfill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package backfill

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestPickUpPendingTasks(t *testing.T) {
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("drf", drf.New)
trueValue := true
tilers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "priority",
EnabledPreemptable: &trueValue,
EnabledTaskOrder: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "drf",
EnabledQueueOrder: &trueValue,
},
},
},
}

priority4, priority3, priority2, priority1 := int32(4), int32(3), int32(2), int32(1)

testCases := []struct {
name string
pipelinedPods []*v1.Pod
pendingPods []*v1.Pod
queues []*schedulingv1beta1.Queue
podGroups []*schedulingv1beta1.PodGroup
PriorityClasses map[string]*schedulingapi.PriorityClass
expectedResult []string
}{
{
name: "test",
pendingPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-1", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-besteffort-task-3", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority3),

util.BuildPodWithPriority("default", "pg2-besteffort-task-1", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-besteffort-task-3", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority3),
},
pipelinedPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-2", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-besteffort-task-4", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority4),

util.BuildPodWithPriority("default", "pg2-besteffort-task-2", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-besteffort-task-4", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority4),
},
queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
podGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-1"),
util.BuildPodGroupWithPrio("pg2", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-2"),
},
PriorityClasses: map[string]*schedulingapi.PriorityClass{
"job-priority-1": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-1",
},
Value: 1,
},
"job-priority-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-2",
},
Value: 2,
},
},

expectedResult: []string{
"pg2-besteffort-task-4",
"pg2-besteffort-task-3",
"pg2-besteffort-task-2",
"pg2-besteffort-task-1",
"pg1-besteffort-task-4",
"pg1-besteffort-task-3",
"pg1-besteffort-task-2",
"pg1-besteffort-task-1",
},
},
}

for _, tc := range testCases {
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: nil,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: &util.FakeVolumeBinder{},
Recorder: record.NewFakeRecorder(100),
PriorityClasses: tc.PriorityClasses,
}

for _, q := range tc.queues {
schedulerCache.AddQueueV1beta1(q)
}

for _, ss := range tc.podGroups {
schedulerCache.AddPodGroupV1beta1(ss)
}

for _, pod := range tc.pendingPods {
schedulerCache.AddPod(pod)
}

for _, pod := range tc.pipelinedPods {
schedulerCache.AddPod(pod)
}

ssn := framework.OpenSession(schedulerCache, tilers, []conf.Configuration{})
for _, pod := range tc.pipelinedPods {
jobID := api.NewTaskInfo(pod).Job
stmt := framework.NewStatement(ssn)
task, found := ssn.Jobs[jobID].Tasks[api.PodKey(pod)]
if found {
stmt.Pipeline(task, "node1")
}
}

tasks := New().pickUpPendingTasks(ssn)
var actualResult []string
for _, task := range tasks {
actualResult = append(actualResult, task.Name)
}

if !assert.Equal(t, tc.expectedResult, actualResult) {
t.Errorf("unexpected test; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedResult, actualResult)
}
}
}
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
if !task.Preemptable {
Expand Down Expand Up @@ -172,8 +172,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
// Preempt tasks within job.
Expand Down
Loading

0 comments on commit 27363e2

Please sign in to comment.