Skip to content

Commit 37e9eae

Browse files
committed
Fix a bug that the PodGroupCtrl can not list priorityclass
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
1 parent 7ac5a3f commit 37e9eae

File tree

4 files changed

+44
-21
lines changed

4 files changed

+44
-21
lines changed

cmd/mpi-operator/app/server.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,16 @@ func Run(opt *options.ServerOption) error {
143143
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)
144144

145145
// For the gang scheduling
146-
var podGroupCtrl controllersv1.PodGroupControl
146+
var (
147+
podGroupCtrl controllersv1.PodGroupControl
148+
priorityClassInformer schedulinginformers.PriorityClassInformer
149+
)
147150
if opt.GangSchedulingName == options.GangSchedulerVolcano {
148151
podGroupCtrl = controllersv1.NewVolcanoCtrl(volcanoClientSet, namespace)
149152
} else if len(opt.GangSchedulingName) != 0 {
150153
// Use scheduler-plugins as a default gang-scheduler.
151-
152-
podGroupCtrl = controllersv1.NewSchedulerPluginsCtrl(schedClientSet, namespace, opt.GangSchedulingName)
153-
}
154-
var priorityClassInformer schedulinginformers.PriorityClassInformer
155-
if podGroupCtrl != nil {
156154
priorityClassInformer = kubeInformerFactory.Scheduling().V1().PriorityClasses()
155+
podGroupCtrl = controllersv1.NewSchedulerPluginsCtrl(schedClientSet, namespace, opt.GangSchedulingName, priorityClassInformer.Lister())
157156
}
158157

159158
controller := controllersv1.NewMPIJobController(

pkg/controller/mpi_job_controller_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,15 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
164164
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
165165
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())
166166

167-
var podGroupCtrl PodGroupControl
167+
var (
168+
podGroupCtrl PodGroupControl
169+
priorityClassInformer schedulinginformers.PriorityClassInformer
170+
)
168171
if f.gangSchedulingName == options.GangSchedulerVolcano {
169172
podGroupCtrl = NewVolcanoCtrl(f.volcanoClient, metav1.NamespaceAll)
170173
} else if len(f.gangSchedulingName) != 0 {
171-
podGroupCtrl = NewSchedulerPluginsCtrl(f.schedClient, metav1.NamespaceAll, "default-scheduler")
172-
}
173-
var priorityClassInformer schedulinginformers.PriorityClassInformer
174-
if podGroupCtrl != nil {
175174
priorityClassInformer = k8sI.Scheduling().V1().PriorityClasses()
175+
podGroupCtrl = NewSchedulerPluginsCtrl(f.schedClient, metav1.NamespaceAll, "default-scheduler", priorityClassInformer.Lister())
176176
}
177177

178178
c := NewMPIJobControllerWithClock(

pkg/controller/podgroup.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,22 @@ type SchedulerPluginsCtrl struct {
191191
schedulerName string
192192
}
193193

194-
func NewSchedulerPluginsCtrl(c schedclientset.Interface, watchNamespace, schedulerName string) *SchedulerPluginsCtrl {
194+
func NewSchedulerPluginsCtrl(
195+
c schedclientset.Interface,
196+
watchNamespace, schedulerName string,
197+
pcLister schedulinglisters.PriorityClassLister,
198+
) *SchedulerPluginsCtrl {
195199
var informerFactoryOpts []schedinformers.SharedInformerOption
196200
if watchNamespace != metav1.NamespaceAll {
197201
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace))
198202
}
199-
informerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
203+
pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
200204
return &SchedulerPluginsCtrl{
201-
Client: c,
202-
InformerFactory: informerFactory,
203-
PodGroupInformer: informerFactory.Scheduling().V1alpha1().PodGroups(),
204-
schedulerName: schedulerName,
205+
Client: c,
206+
InformerFactory: pgInformerFactory,
207+
PodGroupInformer: pgInformerFactory.Scheduling().V1alpha1().PodGroups(),
208+
PriorityClassLister: pcLister,
209+
schedulerName: schedulerName,
205210
}
206211
}
207212

test/integration/mpi_job_controller_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
common "github.com/kubeflow/common/pkg/apis/common/v1"
2525
batchv1 "k8s.io/api/batch/v1"
2626
corev1 "k8s.io/api/core/v1"
27+
schedulingv1 "k8s.io/api/scheduling/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/labels"
2930
"k8s.io/apimachinery/pkg/util/runtime"
@@ -490,6 +491,7 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) {
490491
kubeflow.MPIReplicaTypeLauncher: {
491492
Template: corev1.PodTemplateSpec{
492493
Spec: corev1.PodSpec{
494+
PriorityClassName: "test-pc",
493495
Containers: []corev1.Container{
494496
{
495497
Name: "main",
@@ -515,8 +517,24 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) {
515517
},
516518
},
517519
}
518-
// 1. Create MPIJob
520+
priorityClass := &schedulingv1.PriorityClass{
521+
TypeMeta: metav1.TypeMeta{
522+
APIVersion: schedulingv1.SchemeGroupVersion.String(),
523+
Kind: "PriorityClass",
524+
},
525+
ObjectMeta: metav1.ObjectMeta{
526+
Name: "test-pc",
527+
},
528+
Value: 100_000,
529+
}
530+
// 1. Create PriorityClass
519531
var err error
532+
_, err = s.kClient.SchedulingV1().PriorityClasses().Create(ctx, priorityClass, metav1.CreateOptions{})
533+
if err != nil {
534+
t.Fatalf("Failed sending priorityClass to apiserver: %v", err)
535+
}
536+
537+
// 2. Create MPIJob
520538
mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(s.namespace).Create(ctx, mpiJob, metav1.CreateOptions{})
521539
if err != nil {
522540
t.Fatalf("Failed sending job to apiserver: %v", err)
@@ -537,7 +555,7 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) {
537555
}
538556
s.events.verify(t)
539557

540-
// 2. Update SchedulingPolicy of MPIJob
558+
// 3. Update SchedulingPolicy of MPIJob
541559
updatedScheduleTimeSeconds := int32(10)
542560
mpiJob.Spec.RunPolicy.SchedulingPolicy.ScheduleTimeoutSeconds = &updatedScheduleTimeSeconds
543561
mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(s.namespace).Update(ctx, mpiJob, metav1.UpdateOptions{})
@@ -567,11 +585,12 @@ func startController(
567585
var podGroupCtrl controller.PodGroupControl
568586
var priorityClassInformer schedulinginformers.PriorityClassInformer
569587
if gangSchedulerCfg != nil {
570-
priorityClassInformer = kubeInformerFactory.Scheduling().V1().PriorityClasses()
571588
if gangSchedulerCfg.schedulerName == "volcano" {
572589
podGroupCtrl = controller.NewVolcanoCtrl(gangSchedulerCfg.volcanoClient, metav1.NamespaceAll)
573590
} else if len(gangSchedulerCfg.schedulerName) != 0 {
574-
podGroupCtrl = controller.NewSchedulerPluginsCtrl(gangSchedulerCfg.schedClient, metav1.NamespaceAll, gangSchedulerCfg.schedulerName)
591+
priorityClassInformer = kubeInformerFactory.Scheduling().V1().PriorityClasses()
592+
podGroupCtrl = controller.NewSchedulerPluginsCtrl(
593+
gangSchedulerCfg.schedClient, metav1.NamespaceAll, gangSchedulerCfg.schedulerName, priorityClassInformer.Lister())
575594
}
576595
}
577596

0 commit comments

Comments
 (0)