Skip to content

Commit

Permalink
make namespace parsing and creating informers pluggable
Browse files Browse the repository at this point in the history
  • Loading branch information
emsixteeen committed Feb 6, 2024
1 parent 4c9ac06 commit a4c30dc
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 50 deletions.
38 changes: 38 additions & 0 deletions cmd/mpi-operator/app/options/additional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package options

import (
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
)

type NamespaceParserFunc func(namespace string, kubeClient kubeclientset.Interface) ([]string, error)

type NamespaceOptions struct {
Namespaces NamespaceParserFunc
}

func DefaultNamespaceParser(namespace string, kubeClient kubeclientset.Interface) ([]string, error) {
return []string{namespace}, nil
}

type KubeInformerFunc func(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory
type MpiJobInformerFunc func(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory
type VolcanoInformerFunc func(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory
type SchedulerPluginsInformerFunc func(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory

type InformerOptions struct {
KubeInformer KubeInformerFunc
MpiJobInformer MpiJobInformerFunc
VolcanoInformer VolcanoInformerFunc
SchedulerPluginsInformer SchedulerPluginsInformerFunc
}
type AdditionalOptions struct {
NamespaceOptions
InformerOptions
}
11 changes: 11 additions & 0 deletions cmd/mpi-operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package options

import (
"flag"
"github.com/kubeflow/mpi-operator/pkg/informers"
"os"

"github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
Expand All @@ -38,11 +39,21 @@ type ServerOption struct {
LockNamespace string
QPS int
Burst int

NamespaceOptions
InformerOptions
}

// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}

s.Namespaces = DefaultNamespaceParser
s.KubeInformer = informers.DefaultKubeInformer
s.MpiJobInformer = informers.DefaultMpiJobInformer
s.VolcanoInformer = informers.DefaultVolcanoInformer
s.SchedulerPluginsInformer = informers.DefaultSchedulerPluginsInformer

return &s
}

Expand Down
46 changes: 23 additions & 23 deletions cmd/mpi-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
kubeapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -45,7 +44,6 @@ import (
"github.com/kubeflow/mpi-operator/cmd/mpi-operator/app/options"
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
kubeflowscheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
controllersv1 "github.com/kubeflow/mpi-operator/pkg/controller"
"github.com/kubeflow/mpi-operator/pkg/version"
)
Expand Down Expand Up @@ -82,13 +80,6 @@ func Run(opt *options.ServerOption) error {
version.PrintVersionAndExit(apiVersion)
}

namespace := opt.Namespace
if namespace == corev1.NamespaceAll {
klog.Info("Using cluster scoped operator")
} else {
klog.Infof("Scoping operator to namespace %s", namespace)
}

// To help debugging, immediately log version.
klog.Infof("%+v", version.Info(apiVersion))

Expand Down Expand Up @@ -118,9 +109,23 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}
if !checkCRDExists(mpiJobClientSet, namespace) {
klog.Info("CRD doesn't exist. Exiting")
os.Exit(1)

namespaces, err := opt.Namespaces(opt.Namespace, kubeClient)
if err != nil {
return err
}

if namespaces[0] == corev1.NamespaceAll {
klog.Info("Using cluster scoped operator")
} else {
klog.Infof("Scoping operator to namespace %s", namespaces)
}

for _, namespace := range namespaces {
if !checkCRDExists(mpiJobClientSet, namespace) {
klog.Info("CRD doesn't exist. Exiting")
os.Exit(1)
}
}

// Add mpi-job-controller types to the default Kubernetes Scheme so Events
Expand All @@ -132,14 +137,8 @@ func Run(opt *options.ServerOption) error {

// Set leader election start function.
run := func(ctx context.Context) {
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
if namespace != metav1.NamespaceAll {
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespace))
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespace))
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
kubeflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, kubeflowInformerFactoryOpts...)
kubeInformerFactory := opt.KubeInformer(namespaces, kubeClient)
mpiJobInformerFactory := opt.MpiJobInformer(namespaces, mpiJobClientSet)

controller, err := controllersv1.NewMPIJobController(
kubeClient,
Expand All @@ -152,14 +151,15 @@ func Run(opt *options.ServerOption) error {
kubeInformerFactory.Batch().V1().Jobs(),
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
namespace, opt.GangSchedulingName)
mpiJobInformerFactory.Kubeflow().V2beta1().MPIJobs(),
opt.VolcanoInformer, opt.SchedulerPluginsInformer,
namespaces, opt.GangSchedulingName)
if err != nil {
klog.Fatalf("Failed to setup the controller")
}

go kubeInformerFactory.Start(ctx.Done())
go kubeflowInformerFactory.Start(ctx.Done())
go mpiJobInformerFactory.Start(ctx.Done())
if controller.PodGroupCtrl != nil {
controller.PodGroupCtrl.StartInformerFactory(ctx.Done())
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,13 @@ func NewMPIJobController(
podInformer coreinformers.PodInformer,
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
namespace, gangSchedulingName string) (*MPIJobController, error) {
volcanoInformerFunc options.VolcanoInformerFunc, schedulerPluginsInformerFunc options.SchedulerPluginsInformerFunc,
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
priorityClassInformer, mpiJobInformer, &clock.RealClock{},
volcanoInformerFunc, schedulerPluginsInformerFunc,
namespaces, gangSchedulingName)
}

// NewMPIJobControllerWithClock returns a new MPIJob controller.
Expand All @@ -292,7 +295,8 @@ func NewMPIJobControllerWithClock(
priorityClassInformer schedulinginformers.PriorityClassInformer,
mpiJobInformer informers.MPIJobInformer,
clock clock.WithTicker,
namespace, gangSchedulingName string) (*MPIJobController, error) {
volcanoInformer options.VolcanoInformerFunc, schedulerPluginsInformer options.SchedulerPluginsInformerFunc,
namespaces []string, gangSchedulingName string) (*MPIJobController, error) {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand All @@ -311,10 +315,12 @@ func NewMPIJobControllerWithClock(
priorityClassLister = priorityClassInformer.Lister()
priorityClassSynced = priorityClassInformer.Informer().HasSynced
if gangSchedulingName == options.GangSchedulerVolcano {
podGroupCtrl = NewVolcanoCtrl(volcanoClient, namespace, priorityClassLister)
volcanoInformer := volcanoInformer(namespaces, volcanoClient)
podGroupCtrl = NewVolcanoCtrl(volcanoClient, volcanoInformer, priorityClassLister)
} else if len(gangSchedulingName) != 0 {
// Use scheduler-plugins as a default gang-scheduler.
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, namespace, gangSchedulingName, priorityClassLister)
pgInformer := schedulerPluginsInformer(namespaces, schedClient)
podGroupCtrl = NewSchedulerPluginsCtrl(schedClient, pgInformer, gangSchedulingName, priorityClassLister)
}
if podGroupCtrl != nil {
podGroupSynced = podGroupCtrl.PodGroupSharedIndexInformer().HasSynced
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"fmt"
"github.com/kubeflow/mpi-operator/pkg/informers"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -46,7 +47,7 @@ import (
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
mpijobinformers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
)

var (
Expand Down Expand Up @@ -86,6 +87,8 @@ type fixture struct {
objects []runtime.Object

gangSchedulingName string

namespaces []string
}

func newFixture(t *testing.T, gangSchedulingName string) *fixture {
Expand All @@ -94,6 +97,7 @@ func newFixture(t *testing.T, gangSchedulingName string) *fixture {
f.objects = []runtime.Object{}
f.kubeObjects = []runtime.Object{}
f.gangSchedulingName = gangSchedulingName
f.namespaces = []string{metav1.NamespaceAll}
return f
}

Expand Down Expand Up @@ -155,11 +159,12 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T
return mpiJob
}

func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, mpijobinformers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
f.client = fake.NewSimpleClientset(f.objects...)
f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...)
i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc())

i := informers.DefaultMpiJobInformer(f.namespaces, f.client)
k8sI := informers.DefaultKubeInformer(f.namespaces, f.kubeClient)

c, err := NewMPIJobControllerWithClock(
f.kubeClient,
Expand All @@ -174,7 +179,8 @@ func (f *fixture) newController(clock clock.WithTicker) (*MPIJobController, info
k8sI.Scheduling().V1().PriorityClasses(),
i.Kubeflow().V2beta1().MPIJobs(),
clock,
metav1.NamespaceAll,
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
f.namespaces,
f.gangSchedulingName,
)
if err != nil {
Expand Down
14 changes: 2 additions & 12 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ type VolcanoCtrl struct {
schedulerName string
}

func NewVolcanoCtrl(c volcanoclient.Interface, watchNamespace string, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
var informerFactoryOpts []volcanoinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(watchNamespace))
}
informerFactory := volcanoinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
func NewVolcanoCtrl(c volcanoclient.Interface, informerFactory volcanoinformers.SharedInformerFactory, pcLister schedulinglisters.PriorityClassLister) *VolcanoCtrl {
return &VolcanoCtrl{
Client: c,
InformerFactory: informerFactory,
Expand Down Expand Up @@ -204,14 +199,9 @@ type SchedulerPluginsCtrl struct {

func NewSchedulerPluginsCtrl(
c schedclientset.Interface,
watchNamespace, schedulerName string,
pgInformerFactory schedinformers.SharedInformerFactory, schedulerName string,
pcLister schedulinglisters.PriorityClassLister,
) *SchedulerPluginsCtrl {
var informerFactoryOpts []schedinformers.SharedInformerOption
if watchNamespace != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(watchNamespace))
}
pgInformerFactory := schedinformers.NewSharedInformerFactoryWithOptions(c, 0, informerFactoryOpts...)
return &SchedulerPluginsCtrl{
Client: c,
InformerFactory: pgInformerFactory,
Expand Down
47 changes: 47 additions & 0 deletions pkg/informers/informers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package informers

import (
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
volcanoinformers "volcano.sh/apis/pkg/client/informers/externalversions"
)

func DefaultKubeInformer(namespaces []string, kubeClient kubeclientset.Interface) kubeinformers.SharedInformerFactory {
var kubeInformerFactoryOpts []kubeinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
kubeInformerFactoryOpts = append(kubeInformerFactoryOpts, kubeinformers.WithNamespace(namespaces[0]))
}

return kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeInformerFactoryOpts...)
}

func DefaultMpiJobInformer(namespaces []string, mpiJobClient mpijobclientset.Interface) informers.SharedInformerFactory {
var kubeflowInformerFactoryOpts []informers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
kubeflowInformerFactoryOpts = append(kubeflowInformerFactoryOpts, informers.WithNamespace(namespaces[0]))
}

return informers.NewSharedInformerFactoryWithOptions(mpiJobClient, 0, kubeflowInformerFactoryOpts...)
}

func DefaultVolcanoInformer(namespaces []string, volcanoClient volcanoclient.Interface) volcanoinformers.SharedInformerFactory {
var informerFactoryOpts []volcanoinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, volcanoinformers.WithNamespace(namespaces[0]))
}
return volcanoinformers.NewSharedInformerFactoryWithOptions(volcanoClient, 0, informerFactoryOpts...)
}

func DefaultSchedulerPluginsInformer(namespaces []string, schedClient schedclientset.Interface) schedinformers.SharedInformerFactory {
var informerFactoryOpts []schedinformers.SharedInformerOption
if namespaces[0] != metav1.NamespaceAll {
informerFactoryOpts = append(informerFactoryOpts, schedinformers.WithNamespace(namespaces[0]))
}
return schedinformers.NewSharedInformerFactoryWithOptions(schedClient, 0, informerFactoryOpts...)
}
12 changes: 7 additions & 5 deletions test/integration/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"context"
"fmt"
"github.com/kubeflow/mpi-operator/pkg/informers"
"testing"
"time"

Expand All @@ -29,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/reference"
"k8s.io/utils/pointer"
Expand All @@ -41,7 +41,6 @@ import (
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/mpi-operator/pkg/controller"
)

Expand Down Expand Up @@ -828,8 +827,10 @@ func startController(
mpiClient clientset.Interface,
gangSchedulerCfg *gangSchedulerConfig,
) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kClient, 0)
mpiInformerFactory := informers.NewSharedInformerFactory(mpiClient, 0)
namespaces := []string{metav1.NamespaceAll}

kubeInformerFactory := informers.DefaultKubeInformer(namespaces, kClient)
mpiInformerFactory := informers.DefaultMpiJobInformer(namespaces, mpiClient)
var (
volcanoClient volcanoclient.Interface
schedClient schedclientset.Interface
Expand All @@ -855,7 +856,8 @@ func startController(
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
mpiInformerFactory.Kubeflow().V2beta1().MPIJobs(),
metav1.NamespaceAll, schedulerName,
informers.DefaultVolcanoInformer, informers.DefaultSchedulerPluginsInformer,
namespaces, schedulerName,
)
if err != nil {
panic(err)
Expand Down

0 comments on commit a4c30dc

Please sign in to comment.