Skip to content

Commit

Permalink
Support preempting BestEffort pods when the pods number of nodes reac…
Browse files Browse the repository at this point in the history
…hes the upper limit

Signed-off-by: lili <lili_9309@163.com>
  • Loading branch information
Lily922 committed Mar 5, 2024
1 parent e757ed7 commit 3564d64
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 84 deletions.
150 changes: 104 additions & 46 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,62 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
pendingTasks := backfill.pickUpPendingTasks(ssn)
for _, task := range pendingTasks {
job := ssn.Jobs[task.Job]
ph := util.NewPredicateHelper()
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
// TODO (k82cn): backfill for other case.
}
}

func (backfill *Action) UnInitialize() {}

func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskInfo {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobs := map[api.QueueID]*util.PriorityQueue{}
tasks := map[api.JobID]*util.PriorityQueue{}
var pendingTasks []*api.TaskInfo
for _, job := range ssn.Jobs {
if job.IsPending() {
continue
Expand All @@ -70,55 +126,57 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()
queue, found := ssn.Queues[job.Queue]
if !found {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
if !task.BestEffort {
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

for _, task := range job.TaskStatusIndex[api.Pipelined] {
if !task.BestEffort {
continue
}

stmt := framework.NewStatement(ssn)
err := stmt.UnPipeline(task)
if err != nil {
klog.Errorf("Failed to unpipeline task: %s", err.Error())
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

if _, existed := tasks[job.UID]; !existed {
continue
}

if _, existed := jobs[queue.UID]; !existed {
queues.Push(queue)
jobs[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
jobs[job.Queue].Push(job)
}

for !queues.Empty() {
queue := queues.Pop().(*api.QueueInfo)
for !jobs[queue.UID].Empty() {
job := jobs[queue.UID].Pop().(*api.JobInfo)
for !tasks[job.UID].Empty() {
pendingTasks = append(pendingTasks, tasks[job.UID].Pop().(*api.TaskInfo))
}
// TODO (k82cn): backfill for other case.
}
}
return pendingTasks
}

func (backfill *Action) UnInitialize() {}
180 changes: 180 additions & 0 deletions pkg/scheduler/actions/backfill/backfill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package backfill

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
)

func TestPickUpPendingTasks(t *testing.T) {
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("drf", drf.New)
trueValue := true
tilers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "priority",
EnabledPreemptable: &trueValue,
EnabledTaskOrder: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "drf",
EnabledQueueOrder: &trueValue,
},
},
},
}

newTaskFunc := func(uid, jobUid types.UID, priority int32, status api.TaskStatus, resources *api.Resource) *api.TaskInfo {
isBestEffort := resources.IsEmpty()
return &api.TaskInfo{
UID: api.TaskID(uid),
Job: api.JobID(jobUid),
Name: string(uid),
Priority: priority,
TransactionContext: api.TransactionContext{
Status: status,
},
Resreq: resources,
InitResreq: resources,
BestEffort: isBestEffort,
NumaInfo: &api.TopologyInfo{
ResMap: map[int]v1.ResourceList{},
},
}
}

type jobs struct {
uid api.JobID
jobPriorityClassName string
phase scheduling.PodGroupPhase
tasks []*api.TaskInfo
}

testCases := []struct {
name string
jobs []jobs
expectedResult []*api.TaskInfo
}{
{
name: "test",
jobs: []jobs{
{
uid: "job-1",
jobPriorityClassName: "job-priority-1",
phase: scheduling.PodGroupInqueue,
tasks: []*api.TaskInfo{
newTaskFunc("job-1-besteffort-task-1", "job-1", 1, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-unbesteffort-task-1", "job-1", 1, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-1-besteffort-task-2", "job-1", 2, api.Pipelined, api.EmptyResource()),
newTaskFunc("job-1-unbesteffort-task-2", "job-1", 2, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-1-besteffort-task-3", "job-1", 3, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-unbesteffort-task-3", "job-1", 3, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-1-besteffort-task-4", "job-1", 4, api.Pipelined, api.EmptyResource()),
newTaskFunc("job-1-unbesteffort-task-4", "job-1", 4, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
},
},
{
uid: "job-2",
jobPriorityClassName: "job-priority-2",
phase: scheduling.PodGroupInqueue,
tasks: []*api.TaskInfo{
newTaskFunc("job-2-besteffort-task-1", "job-2", 1, api.Pending, api.EmptyResource()),
newTaskFunc("job-2-unbesteffort-task-1", "job-2", 1, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-2-besteffort-task-2", "job-2", 2, api.Pipelined, api.EmptyResource()),
newTaskFunc("job-2-unbesteffort-task-2", "job-2", 2, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-2-besteffort-task-3", "job-2", 3, api.Pending, api.EmptyResource()),
newTaskFunc("job-2-unbesteffort-task-3", "job-2", 3, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
newTaskFunc("job-2-besteffort-task-4", "job-2", 4, api.Pipelined, api.EmptyResource()),
newTaskFunc("job-2-unbesteffort-task-4", "job-2", 4, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})),
},
},
{
uid: "job-3",
jobPriorityClassName: "job-priority-1",
phase: scheduling.PodGroupPending,
tasks: []*api.TaskInfo{
newTaskFunc("job-3-besteffort-task-1", "job-3", 1, api.Pending, api.EmptyResource()),
newTaskFunc("job-3-besteffort-task-2", "job-3", 2, api.Pipelined, api.EmptyResource())},
},
},
expectedResult: []*api.TaskInfo{
newTaskFunc("job-2-besteffort-task-4", "job-2", 4, api.Pending, api.EmptyResource()),
newTaskFunc("job-2-besteffort-task-3", "job-2", 3, api.Pending, api.EmptyResource()),
newTaskFunc("job-2-besteffort-task-2", "job-2", 2, api.Pending, api.EmptyResource()),
newTaskFunc("job-2-besteffort-task-1", "job-2", 1, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-besteffort-task-4", "job-1", 4, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-besteffort-task-3", "job-1", 3, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-besteffort-task-2", "job-1", 2, api.Pending, api.EmptyResource()),
newTaskFunc("job-1-besteffort-task-1", "job-1", 1, api.Pending, api.EmptyResource()),
},
},
}

for _, tc := range testCases {
jobInfos := map[api.JobID]*api.JobInfo{}
for _, job := range tc.jobs {
jobInfo := api.NewJobInfo(job.uid, job.tasks...)
jobInfo.Queue = "queue-1"
jobInfo.PodGroup = &api.PodGroup{
PodGroup: scheduling.PodGroup{
Spec: scheduling.PodGroupSpec{
PriorityClassName: job.jobPriorityClassName,
},
Status: scheduling.PodGroupStatus{
Phase: job.phase,
},
},
}
jobInfo.Budget = &api.DisruptionBudget{}
jobInfos[job.uid] = jobInfo
}
schedulerCache := &cache.SchedulerCache{
Jobs: jobInfos,
Queues: map[api.QueueID]*api.QueueInfo{
"queue-1": {
UID: "queue-1",
Queue: &scheduling.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "queue-1",
},
},
},
},
PriorityClasses: map[string]*schedulingapi.PriorityClass{
"job-priority-1": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-1",
},
Value: 1,
},
"job-priority-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-2",
},
Value: 2,
},
},
}
ssn := framework.OpenSession(schedulerCache, tilers, []conf.Configuration{})
actualResult := New().pickUpPendingTasks(ssn)
if !assert.Equal(t, tc.expectedResult, actualResult) {
t.Errorf("unexpected test; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedResult, actualResult)
}
}
}
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
if !task.Preemptable {
Expand Down Expand Up @@ -172,8 +172,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
// Preempt tasks within job.
Expand Down
Loading

0 comments on commit 3564d64

Please sign in to comment.