diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index e3411e313a6..28b1af0d7b2 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -60,6 +60,62 @@ func (backfill *Action) Execute(ssn *framework.Session) { } // TODO (k82cn): When backfill, it's also need to balance between Queues. + pendingTasks := backfill.pickUpPendingTasks(ssn) + for _, task := range pendingTasks { + job := ssn.Jobs[task.Job] + ph := util.NewPredicateHelper() + allocated := false + fe := api.NewFitErrors() + + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err) + for _, ni := range ssn.Nodes { + fe.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fe + break + } + + predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true) + if len(predicateNodes) == 0 { + job.NodesFitErrors[task.UID] = fitErrors + break + } + + node := predicateNodes[0] + if len(predicateNodes) > 1 { + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + node = ssn.BestNodeFn(task, nodeScores) + if node == nil { + node = util.SelectBestNode(nodeScores) + } + } + + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) + if err := ssn.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) + fe.SetNodeError(node.Name, err) + continue + } + + metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) + metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) + allocated = true + + if !allocated { + job.NodesFitErrors[task.UID] = fe + } + // TODO (k82cn): backfill for other case. + } +} + +func (backfill *Action) UnInitialize() {} + +func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskInfo { + queues := util.NewPriorityQueue(ssn.QueueOrderFn) + jobs := map[api.QueueID]*util.PriorityQueue{} + tasks := map[api.JobID]*util.PriorityQueue{} + var pendingTasks []*api.TaskInfo for _, job := range ssn.Jobs { if job.IsPending() { continue @@ -70,55 +126,57 @@ func (backfill *Action) Execute(ssn *framework.Session) { continue } - ph := util.NewPredicateHelper() + queue, found := ssn.Queues[job.Queue] + if !found { + continue + } for _, task := range job.TaskStatusIndex[api.Pending] { - if task.InitResreq.IsEmpty() { - allocated := false - fe := api.NewFitErrors() - - if err := ssn.PrePredicateFn(task); err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err) - for _, ni := range ssn.Nodes { - fe.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fe - break - } - - predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true) - if len(predicateNodes) == 0 { - job.NodesFitErrors[task.UID] = fitErrors - break - } - - node := predicateNodes[0] - if len(predicateNodes) > 1 { - nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) - node = ssn.BestNodeFn(task, nodeScores) - if node == nil { - node = util.SelectBestNode(nodeScores) - } - } - - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) - if err := ssn.Allocate(task, node); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) - fe.SetNodeError(node.Name, err) - continue - } - - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - allocated = true - - if !allocated { - job.NodesFitErrors[task.UID] = fe - } + if !task.BestEffort { + continue + } + if _, existed := tasks[job.UID]; !existed { + tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn) + } + tasks[job.UID].Push(task) + } + + for _, task := range job.TaskStatusIndex[api.Pipelined] { + if !task.BestEffort { + continue + } + + stmt := framework.NewStatement(ssn) + err := stmt.UnPipeline(task) + if err != nil { + klog.Errorf("Failed to unpipeline task: %s", err.Error()) + continue + } + if _, existed := tasks[job.UID]; !existed { + tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn) + } + tasks[job.UID].Push(task) + } + + if _, existed := tasks[job.UID]; !existed { + continue + } + + if _, existed := jobs[queue.UID]; !existed { + queues.Push(queue) + jobs[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) + } + jobs[job.Queue].Push(job) + } + + for !queues.Empty() { + queue := queues.Pop().(*api.QueueInfo) + for !jobs[queue.UID].Empty() { + job := jobs[queue.UID].Pop().(*api.JobInfo) + for !tasks[job.UID].Empty() { + pendingTasks = append(pendingTasks, tasks[job.UID].Pop().(*api.TaskInfo)) } - // TODO (k82cn): backfill for other case. } } + return pendingTasks } - -func (backfill *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/backfill/backfill_test.go b/pkg/scheduler/actions/backfill/backfill_test.go new file mode 100644 index 00000000000..0546beb6b4e --- /dev/null +++ b/pkg/scheduler/actions/backfill/backfill_test.go @@ -0,0 +1,180 @@ +package backfill + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + schedulingapi "k8s.io/api/scheduling/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "volcano.sh/apis/pkg/apis/scheduling" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/plugins/drf" + "volcano.sh/volcano/pkg/scheduler/plugins/priority" +) + +func TestPickUpPendingTasks(t *testing.T) { + framework.RegisterPluginBuilder("priority", priority.New) + framework.RegisterPluginBuilder("drf", drf.New) + trueValue := true + tilers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: "priority", + EnabledPreemptable: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledJobOrder: &trueValue, + }, + { + Name: "drf", + EnabledQueueOrder: &trueValue, + }, + }, + }, + } + + newTaskFunc := func(uid, jobUid types.UID, priority int32, status api.TaskStatus, resources *api.Resource) *api.TaskInfo { + isBestEffort := resources.IsEmpty() + return &api.TaskInfo{ + UID: api.TaskID(uid), + Job: api.JobID(jobUid), + Name: string(uid), + Priority: priority, + TransactionContext: api.TransactionContext{ + Status: status, + }, + Resreq: resources, + InitResreq: resources, + BestEffort: isBestEffort, + NumaInfo: &api.TopologyInfo{ + ResMap: map[int]v1.ResourceList{}, + }, + } + } + + type jobs struct { + uid api.JobID + jobPriorityClassName string + phase scheduling.PodGroupPhase + tasks []*api.TaskInfo + } + + testCases := []struct { + name string + jobs []jobs + expectedResult []*api.TaskInfo + }{ + { + name: "test", + jobs: []jobs{ + { + uid: "job-1", + jobPriorityClassName: "job-priority-1", + phase: scheduling.PodGroupInqueue, + tasks: []*api.TaskInfo{ + newTaskFunc("job-1-besteffort-task-1", "job-1", 1, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-unbesteffort-task-1", "job-1", 1, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-1-besteffort-task-2", "job-1", 2, api.Pipelined, api.EmptyResource()), + newTaskFunc("job-1-unbesteffort-task-2", "job-1", 2, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-1-besteffort-task-3", "job-1", 3, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-unbesteffort-task-3", "job-1", 3, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-1-besteffort-task-4", "job-1", 4, api.Pipelined, api.EmptyResource()), + newTaskFunc("job-1-unbesteffort-task-4", "job-1", 4, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + }, + }, + { + uid: "job-2", + jobPriorityClassName: "job-priority-2", + phase: scheduling.PodGroupInqueue, + tasks: []*api.TaskInfo{ + newTaskFunc("job-2-besteffort-task-1", "job-2", 1, api.Pending, api.EmptyResource()), + newTaskFunc("job-2-unbesteffort-task-1", "job-2", 1, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-2-besteffort-task-2", "job-2", 2, api.Pipelined, api.EmptyResource()), + newTaskFunc("job-2-unbesteffort-task-2", "job-2", 2, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-2-besteffort-task-3", "job-2", 3, api.Pending, api.EmptyResource()), + newTaskFunc("job-2-unbesteffort-task-3", "job-2", 3, api.Pending, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + newTaskFunc("job-2-besteffort-task-4", "job-2", 4, api.Pipelined, api.EmptyResource()), + newTaskFunc("job-2-unbesteffort-task-4", "job-2", 4, api.Pipelined, api.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")})), + }, + }, + { + uid: "job-3", + jobPriorityClassName: "job-priority-1", + phase: scheduling.PodGroupPending, + tasks: []*api.TaskInfo{ + newTaskFunc("job-3-besteffort-task-1", "job-3", 1, api.Pending, api.EmptyResource()), + newTaskFunc("job-3-besteffort-task-2", "job-3", 2, api.Pipelined, api.EmptyResource())}, + }, + }, + expectedResult: []*api.TaskInfo{ + newTaskFunc("job-2-besteffort-task-4", "job-2", 4, api.Pending, api.EmptyResource()), + newTaskFunc("job-2-besteffort-task-3", "job-2", 3, api.Pending, api.EmptyResource()), + newTaskFunc("job-2-besteffort-task-2", "job-2", 2, api.Pending, api.EmptyResource()), + newTaskFunc("job-2-besteffort-task-1", "job-2", 1, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-besteffort-task-4", "job-1", 4, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-besteffort-task-3", "job-1", 3, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-besteffort-task-2", "job-1", 2, api.Pending, api.EmptyResource()), + newTaskFunc("job-1-besteffort-task-1", "job-1", 1, api.Pending, api.EmptyResource()), + }, + }, + } + + for _, tc := range testCases { + jobInfos := map[api.JobID]*api.JobInfo{} + for _, job := range tc.jobs { + jobInfo := api.NewJobInfo(job.uid, job.tasks...) + jobInfo.Queue = "queue-1" + jobInfo.PodGroup = &api.PodGroup{ + PodGroup: scheduling.PodGroup{ + Spec: scheduling.PodGroupSpec{ + PriorityClassName: job.jobPriorityClassName, + }, + Status: scheduling.PodGroupStatus{ + Phase: job.phase, + }, + }, + } + jobInfo.Budget = &api.DisruptionBudget{} + jobInfos[job.uid] = jobInfo + } + schedulerCache := &cache.SchedulerCache{ + Jobs: jobInfos, + Queues: map[api.QueueID]*api.QueueInfo{ + "queue-1": { + UID: "queue-1", + Queue: &scheduling.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "queue-1", + }, + }, + }, + }, + PriorityClasses: map[string]*schedulingapi.PriorityClass{ + "job-priority-1": { + ObjectMeta: metav1.ObjectMeta{ + Name: "job-priority-1", + }, + Value: 1, + }, + "job-priority-2": { + ObjectMeta: metav1.ObjectMeta{ + Name: "job-priority-2", + }, + Value: 2, + }, + }, + } + ssn := framework.OpenSession(schedulerCache, tilers, []conf.Configuration{}) + actualResult := New().pickUpPendingTasks(ssn) + if !assert.Equal(t, tc.expectedResult, actualResult) { + t.Errorf("unexpected test; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedResult, actualResult) + } + } +} diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 18105eb43b8..0ff3032b16a 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -117,8 +117,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) { if !api.PreemptableStatus(task.Status) { return false } - // Ignore task with empty resource request. - if task.Resreq.IsEmpty() { + // BestEffort pod is not supported to preempt unBestEffort pod. + if preemptor.BestEffort && !task.BestEffort { return false } if !task.Preemptable { @@ -172,8 +172,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) { if !api.PreemptableStatus(task.Status) { return false } - // Ignore task with empty resource request. - if task.Resreq.IsEmpty() { + // BestEffort pod is not supported to preempt unBestEffort pod. + if preemptor.BestEffort && !task.BestEffort { return false } // Preempt tasks within job. diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index e6ba1f68eec..1fe870128f1 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -207,6 +207,44 @@ func TestPreempt(t *testing.T) { }, expected: 1, }, + { + // case about #3335 + name: "unBestEffort high-priority pod preempt BestEffort low-priority pod in same queue", + podGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"), + util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"), + }, + pods: []*v1.Pod{ + util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)), + util.BuildPod("c1", "preemptor1", "", v1.PodPending, api.BuildResourceList("3", "3G"), "pg2", make(map[string]string), make(map[string]string)), + }, + nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)), + }, + queues: []*schedulingv1beta1.Queue{ + util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")), + }, + expected: 1, + }, + { + // case about #3335 + name: "BestEffort high-priority pod preempt BestEffort low-priority pod in same queue", + podGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroupWithPrio("pg1", "c1", "q1", 0, map[string]int32{}, schedulingv1beta1.PodGroupInqueue, "low-priority"), + util.BuildPodGroupWithPrio("pg2", "c1", "q1", 1, map[string]int32{"": 1}, schedulingv1beta1.PodGroupInqueue, "high-priority"), + }, + pods: []*v1.Pod{ + util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, v1.ResourceList{}, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)), + util.BuildPod("c1", "preemptor1", "", v1.PodPending, v1.ResourceList{}, "pg2", make(map[string]string), make(map[string]string)), + }, + nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("12", "12G", []api.ScalarResource{{Name: "pods", Value: "1"}}...), make(map[string]string)), + }, + queues: []*schedulingv1beta1.Queue{ + util.BuildQueue("q1", 1, api.BuildResourceList("6", "6G")), + }, + expected: 1, + }, } preempt := New() diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index a9a8d95e36a..7436aba99b3 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -691,14 +691,6 @@ func (ji *JobInfo) ReadyTaskNum() int32 { occupied += len(ji.TaskStatusIndex[Allocated]) occupied += len(ji.TaskStatusIndex[Succeeded]) - if tasks, found := ji.TaskStatusIndex[Pending]; found { - for _, task := range tasks { - if task.BestEffort { - occupied++ - } - } - } - return int32(occupied) } @@ -707,6 +699,16 @@ func (ji *JobInfo) WaitingTaskNum() int32 { return int32(len(ji.TaskStatusIndex[Pipelined])) } +func (ji *JobInfo) PendingBestEffortTaskNum() int32 { + count := 0 + for _, task := range ji.TaskStatusIndex[Pending] { + if task.BestEffort { + count++ + } + } + return int32(count) +} + // CheckTaskValid returns whether each task of job is valid. func (ji *JobInfo) CheckTaskValid() bool { // if job minAvailable is less than sum of(task minAvailable), skip this check @@ -819,14 +821,6 @@ func (ji *JobInfo) CheckTaskStarving() bool { } continue } - - if status == Pending { - for _, task := range tasks { - if task.InitResreq.IsEmpty() { - occupiedMap[getTaskID(task.Pod)]++ - } - } - } } for taskID, minNum := range ji.TaskMinAvailable { if occupiedMap[taskID] < minNum { @@ -852,11 +846,16 @@ func (ji *JobInfo) ValidTaskNum() int32 { return int32(occupied) } -// Ready returns whether job is ready for run -func (ji *JobInfo) Ready() bool { - occupied := ji.ReadyTaskNum() +func (ji *JobInfo) IsReady() bool { + return ji.ReadyTaskNum()+ji.PendingBestEffortTaskNum() >= ji.MinAvailable +} + +func (ji *JobInfo) IsPipelined() bool { + return ji.WaitingTaskNum()+ji.ReadyTaskNum()+ji.PendingBestEffortTaskNum() >= ji.MinAvailable +} - return occupied >= ji.MinAvailable +func (ji *JobInfo) IsStarving() bool { + return ji.WaitingTaskNum()+ji.ReadyTaskNum() < ji.MinAvailable } // IsPending returns whether job is in pending status diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index ab3ca7c1abc..78ae1e4e649 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -17,14 +17,16 @@ limitations under the License. package api import ( + "k8s.io/apimachinery/pkg/api/resource" "reflect" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "volcano.sh/apis/pkg/apis/scheduling" - schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) @@ -292,3 +294,92 @@ func TestTaskSchedulingReason(t *testing.T) { } } } + +func TestJobInfo(t *testing.T) { + newTaskFunc := func(uid, jobUid types.UID, status TaskStatus, resources *Resource) *TaskInfo { + isBestEffort := resources.IsEmpty() + return &TaskInfo{ + UID: TaskID(uid), + Job: JobID(jobUid), + Name: string(uid), + TransactionContext: TransactionContext{ + Status: status, + }, + Resreq: resources, + InitResreq: resources, + BestEffort: isBestEffort, + NumaInfo: &TopologyInfo{ + ResMap: map[int]v1.ResourceList{}, + }, + } + } + + testCases := []struct { + name string + jobUID JobID + jobMinAvailable int32 + tasks []*TaskInfo + expectedPendingBestEffortTaskNum int32 + expectedIsReady bool + expectedIsPipelined bool + expectedIsStarving bool + }{ + { + name: "starving job", + jobUID: "job-1", + jobMinAvailable: 5, + tasks: []*TaskInfo{ + newTaskFunc("pending-besteffort-task-1", "job-1", Pending, EmptyResource()), + newTaskFunc("pipelined-besteffort-task-1", "job-1", Pipelined, EmptyResource()), + newTaskFunc("running-besteffort-task-1", "job-1", Running, EmptyResource()), + newTaskFunc("pending-unbesteffort-task-1", "job-1", Pending, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + newTaskFunc("pipelined-unbesteffort-task-1", "job-1", Pipelined, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + newTaskFunc("running-unbesteffort-task-1", "job-1", Running, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + }, + expectedPendingBestEffortTaskNum: 1, + expectedIsReady: false, + expectedIsPipelined: true, + expectedIsStarving: true, + }, + + { + name: "ready job", + jobUID: "job-1", + jobMinAvailable: 3, + tasks: []*TaskInfo{ + newTaskFunc("pending-besteffort-task-1", "job-1", Pending, EmptyResource()), + newTaskFunc("pipelined-besteffort-task-1", "job-1", Pipelined, EmptyResource()), + newTaskFunc("running-besteffort-task-1", "job-1", Running, EmptyResource()), + newTaskFunc("pending-unbesteffort-task-1", "job-1", Pending, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + newTaskFunc("pipelined-unbesteffort-task-1", "job-1", Pipelined, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + newTaskFunc("running-unbesteffort-task-1", "job-1", Running, NewResource(v1.ResourceList{"cpu": resource.MustParse("100m")})), + }, + expectedPendingBestEffortTaskNum: 1, + expectedIsReady: true, + expectedIsPipelined: true, + expectedIsStarving: false, + }, + } + + for _, tc := range testCases { + jobInfo := NewJobInfo(tc.jobUID, tc.tasks...) + jobInfo.MinAvailable = tc.jobMinAvailable + actualPendingBestEffortTaskNum := jobInfo.PendingBestEffortTaskNum() + actualIsReady := jobInfo.IsReady() + actualIsPipelined := jobInfo.IsPipelined() + actualIsStarving := jobInfo.IsStarving() + + if !assert.Equal(t, actualPendingBestEffortTaskNum, tc.expectedPendingBestEffortTaskNum) { + t.Errorf("unexpected PendingBestEffortTaskNum; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedPendingBestEffortTaskNum, actualPendingBestEffortTaskNum) + } + if !assert.Equal(t, actualIsReady, tc.expectedIsReady) { + t.Errorf("unexpected IsReady; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedIsReady, actualIsReady) + } + if !assert.Equal(t, actualIsPipelined, tc.expectedIsPipelined) { + t.Errorf("unexpected IsPipelined; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedIsPipelined, actualIsPipelined) + } + if !assert.Equal(t, actualIsStarving, tc.expectedIsStarving) { + t.Errorf("unexpected IsStarving; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedIsStarving, actualIsStarving) + } + } +} diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index e316deed040..fb834818fd0 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -187,7 +187,7 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { func (s *Statement) pipeline(task *api.TaskInfo) { } -func (s *Statement) unpipeline(task *api.TaskInfo) error { +func (s *Statement) UnPipeline(task *api.TaskInfo) error { job, found := s.ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pending); err != nil { @@ -360,7 +360,7 @@ func (s *Statement) Discard() { klog.Errorf("Failed to unevict task: %s", err.Error()) } case Pipeline: - err := s.unpipeline(op.task) + err := s.UnPipeline(op.task) if err != nil { klog.Errorf("Failed to unpipeline task: %s", err.Error()) } diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index e89a4a7d95e..9f802b107da 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" - k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index fb4f13a4472..a1c88c01c95 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -112,8 +112,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) - lReady := lv.Ready() - rReady := rv.Ready() + lReady := lv.IsReady() + rReady := rv.IsReady() klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t", lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady) @@ -136,7 +136,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddJobOrderFn(gp.Name(), jobOrderFn) ssn.AddJobReadyFn(gp.Name(), func(obj interface{}) bool { ji := obj.(*api.JobInfo) - if ji.CheckTaskReady() && ji.Ready() { + if ji.CheckTaskReady() && ji.IsReady() { return true } return false @@ -144,8 +144,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { pipelinedFn := func(obj interface{}) int { ji := obj.(*api.JobInfo) - occupied := ji.WaitingTaskNum() + ji.ReadyTaskNum() - if ji.CheckTaskPipelined() && occupied >= ji.MinAvailable { + if ji.CheckTaskPipelined() && ji.IsPipelined() { return util.Permit } return util.Reject @@ -154,9 +153,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { jobStarvingFn := func(obj interface{}) bool { ji := obj.(*api.JobInfo) - occupied := ji.WaitingTaskNum() + ji.ReadyTaskNum() // In the preemption scenario, the taskMinAvailable configuration is not concerned, only the jobMinAvailable is concerned - return occupied < ji.MinAvailable + return ji.IsStarving() } ssn.AddJobStarvingFns(gp.Name(), jobStarvingFn) } @@ -165,7 +163,7 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { var unreadyTaskCount int32 var unScheduleJobCount int for _, job := range ssn.Jobs { - if !job.Ready() { + if !job.IsReady() { schedulableTaskNum := func() (num int32) { for _, task := range job.TaskStatusIndex[api.Pending] { ctx := task.GetTransactionContext() diff --git a/pkg/scheduler/plugins/tdm/tdm.go b/pkg/scheduler/plugins/tdm/tdm.go index f28aac4d180..e0d19af2e66 100644 --- a/pkg/scheduler/plugins/tdm/tdm.go +++ b/pkg/scheduler/plugins/tdm/tdm.go @@ -279,8 +279,7 @@ func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) { jobPipelinedFn := func(obj interface{}) int { jobInfo := obj.(*api.JobInfo) - occupied := jobInfo.WaitingTaskNum() + jobInfo.ReadyTaskNum() - if occupied >= jobInfo.MinAvailable { + if jobInfo.IsPipelined() { return tutil.Permit } return tutil.Reject