Skip to content

Commit

Permalink
Merge pull request volcano-sh#1521 from william-wang/mcs-framework2
Browse files Browse the repository at this point in the history
support multi-cluster scheduling in framework
  • Loading branch information
william-wang authored Jun 9, 2021
2 parents c19a67e + d404af9 commit 14e9343
Show file tree
Hide file tree
Showing 36 changed files with 901 additions and 21 deletions.
25 changes: 25 additions & 0 deletions LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions config/crd/bases/scheduling.volcano.sh_queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ spec:
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be
dispatched to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
weight:
format: int32
type: integer
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
Expand Down
23 changes: 23 additions & 0 deletions config/crd/v1beta1/scheduling.volcano.sh_queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ spec:
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be dispatched
to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
weight:
format: int32
type: integer
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module volcano.sh/volcano
go 1.14

require (
github.com/agiledragon/gomonkey/v2 v2.1.0
github.com/fsnotify/fsnotify v1.4.9
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-multierror v1.0.0
Expand All @@ -22,7 +23,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.19.6
sigs.k8s.io/yaml v1.2.0
volcano.sh/apis v0.0.0-20210528062323-838630df511b
volcano.sh/apis v0.0.0-20210603070204-70005b2d502a
)

replace (
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey/v2 v2.1.0 h1:+5Dbq8a1fn89IgVk35O233R41FH0nBKFPn50wDZpNs0=
github.com/agiledragon/gomonkey/v2 v2.1.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -855,5 +857,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI=
volcano.sh/apis v0.0.0-20210528062323-838630df511b h1:Tgg9IcLXQRiTli7KH+xteBTIrawUM07MGvN541/XyMo=
volcano.sh/apis v0.0.0-20210528062323-838630df511b/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY=
volcano.sh/apis v0.0.0-20210603070204-70005b2d502a h1:h6c+NZglstX8JGuwLrU6/0/FnBcT26XO1S3PvtykRTk=
volcano.sh/apis v0.0.0-20210603070204-70005b2d502a/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY=
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,29 @@ spec:
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be
dispatched to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
weight:
format: int32
type: integer
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,29 @@ spec:
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be dispatched
to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
weight:
format: int32
type: integer
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
Expand Down
23 changes: 23 additions & 0 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7796,6 +7796,29 @@ spec:
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be
dispatched to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
weight:
format: int32
type: integer
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
Expand Down
25 changes: 17 additions & 8 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ type jobcontroller struct {
kubeClient kubernetes.Interface
vcClient vcclientset.Interface

jobInformer batchinformer.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer schedulinginformers.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer businformer.CommandInformer
pcInformer kubeschedulinginformers.PriorityClassInformer
jobInformer batchinformer.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer schedulinginformers.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer businformer.CommandInformer
pcInformer kubeschedulinginformers.PriorityClassInformer
queueInformer schedulinginformers.QueueInformer

// A store of jobs
jobLister batchlister.JobLister
Expand All @@ -95,6 +96,9 @@ type jobcontroller struct {
pcLister kubeschedulinglisters.PriorityClassLister
pcSynced func() bool

queueLister schedulinglisters.QueueLister
queueSynced func() bool

// queue that need to sync up
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -204,6 +208,10 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.pcLister = cc.pcInformer.Lister()
cc.pcSynced = cc.pcInformer.Informer().HasSynced

cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues()
cc.queueLister = cc.queueInformer.Lister()
cc.queueSynced = cc.queueInformer.Informer().HasSynced

// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
Expand All @@ -221,9 +229,10 @@ func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.pcInformer.Informer().Run(stopCh)
go cc.queueInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced)
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)

go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
Expand Down
42 changes: 40 additions & 2 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error {
return nil
}

func (cc *jobcontroller) GetQueueInfo(queue string) (*scheduling.Queue, error) {
queueInfo, err := cc.queueLister.Get(queue)
if err != nil {
klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error())
}

return queueInfo, err
}

func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
Expand All @@ -208,7 +217,26 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
// deep copy job to prevent mutate it
job = job.DeepCopy()

var err error
// Find queue that job belongs to, and check if the queue has forwarding metadata
queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)
if err != nil {
return err
}

var jobForwarding bool
if len(queueInfo.Spec.ExtendClusters) != 0 {
jobForwarding = true
if len(job.Annotations) == 0 {
job.Annotations = make(map[string]string)
}
job.Annotations[batch.JobForwardingKey] = "true"
job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}

// Skip job initiation if job is already initiated
if !isInitiated(job) {
if job, err = cc.initiateJob(job); err != nil {
Expand All @@ -221,6 +249,16 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
}
}

if len(queueInfo.Spec.ExtendClusters) != 0 {
jobForwarding = true
job.Annotations[batch.JobForwardingKey] = "true"
_, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}

var syncTask bool
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil {

Expand Down Expand Up @@ -284,7 +322,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, tc, ts.TopologyPolicy, i)
newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)
if err := cc.pluginOnPodCreate(job, newPod); err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"context"
"errors"
"fmt"
"reflect"
"testing"

gomonkey "github.com/agiledragon/gomonkey/v2"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -262,6 +265,12 @@ func TestSyncJobFunc(t *testing.T) {
t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()

patches := gomonkey.ApplyMethod(reflect.TypeOf(fakeController), "GetQueueInfo", func(_ *jobcontroller, _ string) (*schedulingv1alpha2.Queue, error) {
return &schedulingv1alpha2.Queue{}, nil
})

defer patches.Reset()

jobPlugins := make(map[string][]string)

for _, plugin := range testcase.Plugins {
Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index)
}

func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int) *v1.Pod {
func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int, jobForwarding bool) *v1.Pod {
templateCopy := template.DeepCopy()

pod := &v1.Pod{
Expand Down Expand Up @@ -136,6 +136,11 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy b
}
}

if jobForwarding {
pod.Annotations[batch.JobForwardingKey] = "true"
pod.Labels[batch.JobForwardingKey] = "true"
}

return pod
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestCreateJobPod(t *testing.T) {
for i, testcase := range testcases {

t.Run(testcase.Name, func(t *testing.T) {
pod := createJobPod(testcase.Job, testcase.PodTemplate, v1alpha1.NumaPolicy(""), testcase.Index)
pod := createJobPod(testcase.Job, testcase.PodTemplate, v1alpha1.NumaPolicy(""), testcase.Index, false)

if testcase.ReturnVal != nil && pod != nil && pod.Name != testcase.ReturnVal.Name && pod.Namespace != testcase.ReturnVal.Namespace {
t.Errorf("Expected Return Value to be %v but got %v in case %d", testcase.ReturnVal, pod, i)
Expand Down
Loading

0 comments on commit 14e9343

Please sign in to comment.