Skip to content

Commit caa1112

Browse files
lowang-bhtenzen-y
andauthored
add volcano gang-scheduler pg min resource calculation (#566)
* add volcano gang-scheduler pg min resource calculation Signed-off-by: lowang_bh <lhui_wang@163.com> * use priorityclass lister Signed-off-by: lowang_bh <lhui_wang@163.com> * Update pkg/controller/podgroup.go Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com> --------- Signed-off-by: lowang_bh <lhui_wang@163.com> Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
1 parent fda0532 commit caa1112

File tree

3 files changed

+153
-32
lines changed

3 files changed

+153
-32
lines changed

pkg/controller/mpi_job_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,12 @@ func NewMPIJobControllerWithClock(
299299
priorityClassLister schedulinglisters.PriorityClassLister
300300
priorityClassSynced cache.InformerSynced
301301
)
302+
priorityClassLister = priorityClassInformer.Lister()
303+
priorityClassSynced = priorityClassInformer.Informer().HasSynced
302304
if gangSchedulingName == options.GangSchedulerVolcano {
303-
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace)
305+
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister)
304306
} else if len(gangSchedulingName) != 0 {
305307
// Use scheduler-plugins as a default gang-scheduler.
306-
priorityClassLister = priorityClassInformer.Lister()
307-
priorityClassSynced = priorityClassInformer.Informer().HasSynced
308308
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
309309
}
310310
if podGroupCtrl != nil {

pkg/controller/podgroup.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,31 @@ type PodGroupControl interface {
6161
decoratePodTemplateSpec(pts *corev1.PodTemplateSpec, mpiJobName string)
6262
// calculatePGMinResources will calculate minResources for podGroup.
6363
calculatePGMinResources(minMember *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList
64-
// podGroupSpecIsDeepEqual will return true if the spec fields of two podGroup are equals.
64+
// pgSpecsAreEqual will return true if the spec fields of two podGroup are equals.
6565
pgSpecsAreEqual(a, b metav1.Object) bool
6666
}
6767

6868
// VolcanoCtrl is the implementation fo PodGroupControl with volcano.
6969
type VolcanoCtrl struct {
70-
Client volcanoclient.Interface
71-
InformerFactory volcanoinformers.SharedInformerFactory
72-
PodGroupInformer volcanopodgroupinformer.PodGroupInformer
73-
schedulerName string
70+
Client volcanoclient.Interface
71+
InformerFactory volcanoinformers.SharedInformerFactory
72+
PodGroupInformer volcanopodgroupinformer.PodGroupInformer
73+
PriorityClassLister schedulinglisters.PriorityClassLister
74+
schedulerName string
7475
}
7576

76-
func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string) *VolcanoCtrl {
77+
func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
7778
var informerFactoryOpts []volcanoinformers.SharedInformerOption
7879
if watchNamespace != metav1.NamespaceAll {
7980
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace))
8081
}
8182
informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
8283
return &VolcanoCtrl{
83-
Client: c,
84-
InformerFactory: informerFactory,
85-
PodGroupInformer: informerFactory.Scheduling().V1beta1().PodGroups(),
86-
schedulerName: options.GangSchedulerVolcano,
84+
Client: c,
85+
InformerFactory: informerFactory,
86+
PodGroupInformer: informerFactory.Scheduling().V1beta1().PodGroups(),
87+
PriorityClassLister: pcLister,
88+
schedulerName: options.GangSchedulerVolcano,
8789
}
8890
}
8991

@@ -167,11 +169,21 @@ func (v *VolcanoCtrl) decoratePodTemplateSpec(pts *corev1.PodTemplateSpec, mpiJo
167169
pts.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = mpiJobName
168170
}
169171

170-
func (v *VolcanoCtrl) calculatePGMinResources(_ *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList {
171-
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil {
172+
// calculatePGMinResources calculates minResources for volcano podGroup.
173+
// The minMember is task's total MinAvailable or replicas if task's minAvailable is not set in vcJob.
174+
// PodGroup's MinResources leaves empty now if it is not set. So we calculate the minResources among those first minMember replicas with higher priority.
175+
// ret: https://github.com/volcano-sh/volcano/blob/1933d46bdc4434772518ebb74c4281671ddeffa1/pkg/webhooks/admission/jobs/mutate/mutate_job.go#L168
176+
// ref: https://github.com/volcano-sh/volcano/blob/1933d46bdc4434772518ebb74c4281671ddeffa1/pkg/controllers/job/job_controller_actions.go#L761
177+
func (v *VolcanoCtrl) calculatePGMinResources(minMember *int32, mpiJob *kubeflow.MPIJob) *corev1.ResourceList {
178+
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil && schedPolicy.MinResources != nil {
172179
return schedPolicy.MinResources
173180
}
174-
return nil
181+
if minMember != nil && *minMember == 0 {
182+
return nil
183+
}
184+
185+
// sort task by priorityClasses
186+
return calPGMinResource(minMember, mpiJob, v.PriorityClassLister)
175187
}
176188

177189
func (v *VolcanoCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
@@ -311,16 +323,30 @@ func (s *SchedulerPluginsCtrl) calculatePGMinResources(minMember *int32, mpiJob
311323
return nil
312324
}
313325

326+
return calPGMinResource(minMember, mpiJob, s.PriorityClassLister)
327+
}
328+
329+
func (s *SchedulerPluginsCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
330+
PGa := a.(*schedv1alpha1.PodGroup)
331+
PGb := b.(*schedv1alpha1.PodGroup)
332+
return equality.Semantic.DeepEqual(PGa.Spec, PGb.Spec)
333+
}
334+
335+
var _ PodGroupControl = &SchedulerPluginsCtrl{}
336+
337+
// calPGMinResource returns the minimum resource for mpiJob with minMembers
338+
func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedulinglisters.PriorityClassLister) *corev1.ResourceList {
314339
var order replicasOrder
315340
for rt, replica := range mpiJob.Spec.MPIReplicaSpecs {
316341
rp := replicaPriority{
317342
priority: 0,
318343
replicaType: rt,
319344
ReplicaSpec: *replica,
320345
}
346+
321347
pcName := replica.Template.Spec.PriorityClassName
322-
if len(pcName) != 0 {
323-
if priorityClass, err := s.PriorityClassLister.Get(pcName); err != nil {
348+
if len(pcName) != 0 && pcLister != nil {
349+
if priorityClass, err := pcLister.Get(pcName); err != nil {
324350
klog.Warningf("Ignore replica %q priority class %q: %v", rt, pcName, err)
325351
} else {
326352
rp.priority = priorityClass.Value
@@ -359,14 +385,6 @@ func (s *SchedulerPluginsCtrl) calculatePGMinResources(minMember *int32, mpiJob
359385
return &minResources
360386
}
361387

362-
func (s *SchedulerPluginsCtrl) pgSpecsAreEqual(a, b metav1.Object) bool {
363-
PGa := a.(*schedv1alpha1.PodGroup)
364-
PGb := b.(*schedv1alpha1.PodGroup)
365-
return equality.Semantic.DeepEqual(PGa.Spec, PGb.Spec)
366-
}
367-
368-
var _ PodGroupControl = &SchedulerPluginsCtrl{}
369-
370388
// calculateMinAvailable calculates minAvailable for the PodGroup.
371389
// If the schedulingPolicy.minAvailable is nil, it returns returns `NUM(workers) + 1`; otherwise returns `schedulingPolicy.minAvailable`.
372390
func calculateMinAvailable(mpiJob *kubeflow.MPIJob) *int32 {

pkg/controller/podgroup_test.go

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package controller
1616

1717
import (
18+
"reflect"
19+
"sort"
1820
"testing"
1921

2022
"github.com/google/go-cmp/cmp"
@@ -183,6 +185,10 @@ func TestNewPodGroup(t *testing.T) {
183185
MinMember: 3,
184186
Queue: "project-x",
185187
PriorityClassName: "high",
188+
MinResources: &corev1.ResourceList{
189+
corev1.ResourceCPU: resource.MustParse("21"),
190+
corev1.ResourceMemory: resource.MustParse("42Gi"),
191+
},
186192
},
187193
},
188194
wantSchedPG: &schedv1alpha1.PodGroup{
@@ -207,8 +213,10 @@ func TestNewPodGroup(t *testing.T) {
207213
for name, tc := range testCases {
208214
t.Run(name, func(t *testing.T) {
209215
volcanoFixture := newFixture(t, "default-scheduler")
216+
jobController, _, _ := volcanoFixture.newController(clock.RealClock{})
210217
volcanoPGCtrl := &VolcanoCtrl{
211-
Client: volcanoFixture.volcanoClient,
218+
Client: volcanoFixture.volcanoClient,
219+
PriorityClassLister: jobController.priorityClassLister,
212220
}
213221
volcanoPG := volcanoPGCtrl.newPodGroup(tc.mpiJob)
214222
if diff := cmp.Diff(tc.wantVolcanoPG, volcanoPG, ignoreReferences); len(diff) != 0 {
@@ -361,8 +369,10 @@ func TestDecoratePodTemplateSpec(t *testing.T) {
361369

362370
func TestCalculatePGMinResources(t *testing.T) {
363371
volcanoTests := map[string]struct {
364-
job *kubeflow.MPIJob
365-
want *corev1.ResourceList
372+
job *kubeflow.MPIJob
373+
priorityClasses []*schedulingv1.PriorityClass
374+
minMember int32
375+
want *corev1.ResourceList
366376
}{
367377
"minResources is not empty": {
368378
job: &kubeflow.MPIJob{
@@ -388,12 +398,68 @@ func TestCalculatePGMinResources(t *testing.T) {
388398
},
389399
want: nil,
390400
},
401+
"without priorityClass": {
402+
minMember: 3,
403+
job: &kubeflow.MPIJob{
404+
ObjectMeta: metav1.ObjectMeta{
405+
Name: "test",
406+
},
407+
Spec: kubeflow.MPIJobSpec{
408+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{
409+
kubeflow.MPIReplicaTypeLauncher: {
410+
Replicas: pointer.Int32(1),
411+
Template: corev1.PodTemplateSpec{
412+
Spec: corev1.PodSpec{
413+
Containers: []corev1.Container{
414+
{
415+
Resources: corev1.ResourceRequirements{
416+
Requests: corev1.ResourceList{
417+
corev1.ResourceCPU: resource.MustParse("2"),
418+
corev1.ResourceMemory: resource.MustParse("1Gi"),
419+
},
420+
},
421+
},
422+
},
423+
},
424+
},
425+
},
426+
kubeflow.MPIReplicaTypeWorker: {
427+
Replicas: pointer.Int32(2),
428+
Template: corev1.PodTemplateSpec{
429+
Spec: corev1.PodSpec{
430+
Containers: []corev1.Container{
431+
{
432+
Resources: corev1.ResourceRequirements{
433+
Requests: corev1.ResourceList{
434+
corev1.ResourceCPU: resource.MustParse("10"),
435+
corev1.ResourceMemory: resource.MustParse("32Gi"),
436+
},
437+
},
438+
},
439+
},
440+
},
441+
},
442+
},
443+
},
444+
},
445+
},
446+
want: &corev1.ResourceList{
447+
corev1.ResourceCPU: resource.MustParse("22"),
448+
corev1.ResourceMemory: resource.MustParse("65Gi"),
449+
},
450+
},
391451
}
392452
for name, tc := range volcanoTests {
393453
t.Run(name, func(t *testing.T) {
394-
f := newFixture(t, "scheduler-plugins-scheduler")
395-
pgCtrl := VolcanoCtrl{Client: f.volcanoClient}
396-
got := pgCtrl.calculatePGMinResources(pointer.Int32(0), tc.job)
454+
f := newFixture(t, "volcano-scheduler")
455+
if tc.priorityClasses != nil {
456+
for _, pc := range tc.priorityClasses {
457+
f.setUpPriorityClass(pc)
458+
}
459+
}
460+
jobController, _, _ := f.newController(clock.RealClock{})
461+
pgCtrl := VolcanoCtrl{Client: f.volcanoClient, PriorityClassLister: jobController.priorityClassLister}
462+
got := pgCtrl.calculatePGMinResources(&tc.minMember, tc.job)
397463
if diff := cmp.Diff(tc.want, got); len(diff) != 0 {
398464
t.Fatalf("Unexpected calculatePGMinResources for the volcano (-want,+got):\n%s", diff)
399465
}
@@ -752,3 +818,40 @@ func TestCalculateMinAvailable(t *testing.T) {
752818
})
753819
}
754820
}
821+
822+
func TestReplicasOrder(t *testing.T) {
823+
var lancherReplic, wokerReplic int32 = 1, 2
824+
tests := map[string]struct {
825+
original replicasOrder
826+
expected replicasOrder
827+
}{
828+
"1-lancher, 2-worker, lancher higher priority": {
829+
original: replicasOrder{
830+
{priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
831+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
832+
},
833+
expected: replicasOrder{
834+
{priority: 1, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
835+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
836+
},
837+
},
838+
"1-lancher, 2-worker, equal priority": {
839+
original: replicasOrder{
840+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
841+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
842+
},
843+
expected: replicasOrder{
844+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeWorker, ReplicaSpec: common.ReplicaSpec{Replicas: &wokerReplic}},
845+
{priority: 0, replicaType: kubeflow.MPIReplicaTypeLauncher, ReplicaSpec: common.ReplicaSpec{Replicas: &lancherReplic}},
846+
},
847+
},
848+
}
849+
for name, tc := range tests {
850+
t.Run(name, func(t *testing.T) {
851+
sort.Sort(sort.Reverse(tc.original))
852+
if !reflect.DeepEqual(tc.original, tc.expected) {
853+
t.Fatalf("Unexpected sort list (-want,+got):\n-want:%v\n+got:%v", tc.expected, tc.original)
854+
}
855+
})
856+
}
857+
}

0 commit comments

Comments
 (0)