From 4428c37a6ca1d296b0fcf8eaa4b3e28ed432f924 Mon Sep 17 00:00:00 2001 From: william-wang Date: Tue, 8 Jun 2021 14:38:08 +0800 Subject: [PATCH 1/2] Support multi-cluster scheduling in framework Signed-off-by: william-wang --- pkg/controllers/job/job_controller.go | 25 ++++--- pkg/controllers/job/job_controller_actions.go | 34 +++++++++- pkg/controllers/job/job_controller_util.go | 7 +- .../job/job_controller_util_test.go | 2 +- pkg/scheduler/api/silo_cluster_info.go | 40 +++++++++++ pkg/scheduler/cache/cache.go | 66 +++++++++++++++++-- pkg/scheduler/cache/interface.go | 8 +++ pkg/scheduler/conf/scheduler_conf.go | 2 + pkg/scheduler/framework/session.go | 8 +++ pkg/scheduler/framework/session_plugins.go | 29 ++++++++ 10 files changed, 204 insertions(+), 17 deletions(-) create mode 100644 pkg/scheduler/api/silo_cluster_info.go diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 1914f4fa56..1b7f5af56b 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -62,13 +62,14 @@ type jobcontroller struct { kubeClient kubernetes.Interface vcClient vcclientset.Interface - jobInformer batchinformer.JobInformer - podInformer coreinformers.PodInformer - pvcInformer coreinformers.PersistentVolumeClaimInformer - pgInformer schedulinginformers.PodGroupInformer - svcInformer coreinformers.ServiceInformer - cmdInformer businformer.CommandInformer - pcInformer kubeschedulinginformers.PriorityClassInformer + jobInformer batchinformer.JobInformer + podInformer coreinformers.PodInformer + pvcInformer coreinformers.PersistentVolumeClaimInformer + pgInformer schedulinginformers.PodGroupInformer + svcInformer coreinformers.ServiceInformer + cmdInformer businformer.CommandInformer + pcInformer kubeschedulinginformers.PriorityClassInformer + queueInformer schedulinginformers.QueueInformer // A store of jobs jobLister batchlister.JobLister @@ -95,6 +96,9 @@ type jobcontroller struct { pcLister kubeschedulinglisters.PriorityClassLister pcSynced func() bool + queueLister schedulinglisters.QueueLister + queueSynced func() bool + // queue that need to sync up queueList []workqueue.RateLimitingInterface commandQueue workqueue.RateLimitingInterface @@ -204,6 +208,10 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { cc.pcLister = cc.pcInformer.Lister() cc.pcSynced = cc.pcInformer.Informer().HasSynced + cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues() + cc.queueLister = cc.queueInformer.Lister() + cc.queueSynced = cc.queueInformer.Informer().HasSynced + // Register actions state.SyncJob = cc.syncJob state.KillJob = cc.killJob @@ -221,9 +229,10 @@ func (cc *jobcontroller) Run(stopCh <-chan struct{}) { go cc.svcInformer.Informer().Run(stopCh) go cc.cmdInformer.Informer().Run(stopCh) go cc.pcInformer.Informer().Run(stopCh) + go cc.queueInformer.Informer().Run(stopCh) cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced, - cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced) + cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced) go wait.Until(cc.handleCommands, 0, stopCh) var i uint32 diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index eeef255292..d607a4fa50 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -208,7 +208,27 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat // deep copy job to prevent mutate it job = job.DeepCopy() - var err error + // Find queue that job belongs to, and check if the queue has forwarding metadata + queueInfo, err := cc.queueLister.Get(job.Spec.Queue) + if err != nil { + klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error()) + return err + } + + var jobForwarding bool + if len(queueInfo.Spec.ExtendClusters) != 0 { + jobForwarding = true + if len(job.Annotations) == 0 { + job.Annotations = make(map[string]string) + } + job.Annotations[batch.JobForwardingKey] = "true" + job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) + return err + } + } + // Skip job initiation if job is already initiated if !isInitiated(job) { if job, err = cc.initiateJob(job); err != nil { @@ -221,6 +241,16 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat } } + if len(queueInfo.Spec.ExtendClusters) != 0 { + jobForwarding = true + job.Annotations[batch.JobForwardingKey] = "true" + _, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error()) + return err + } + } + var syncTask bool if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil { @@ -284,7 +314,7 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat for i := 0; i < int(ts.Replicas); i++ { podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i) if pod, found := pods[podName]; !found { - newPod := createJobPod(job, tc, ts.TopologyPolicy, i) + newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding) if err := cc.pluginOnPodCreate(job, newPod); err != nil { return err } diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 1f19b3fc2b..cad3eda0ca 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -37,7 +37,7 @@ func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(jobhelpers.PodNameFmt, jobName, taskName, index) } -func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int) *v1.Pod { +func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy batch.NumaPolicy, ix int, jobForwarding bool) *v1.Pod { templateCopy := template.DeepCopy() pod := &v1.Pod{ @@ -136,6 +136,11 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy b } } + if jobForwarding { + pod.Annotations[batch.JobForwardingKey] = "true" + pod.Labels[batch.JobForwardingKey] = "true" + } + return pod } diff --git a/pkg/controllers/job/job_controller_util_test.go b/pkg/controllers/job/job_controller_util_test.go index a0025633de..b918b239d3 100644 --- a/pkg/controllers/job/job_controller_util_test.go +++ b/pkg/controllers/job/job_controller_util_test.go @@ -240,7 +240,7 @@ func TestCreateJobPod(t *testing.T) { for i, testcase := range testcases { t.Run(testcase.Name, func(t *testing.T) { - pod := createJobPod(testcase.Job, testcase.PodTemplate, v1alpha1.NumaPolicy(""), testcase.Index) + pod := createJobPod(testcase.Job, testcase.PodTemplate, v1alpha1.NumaPolicy(""), testcase.Index, false) if testcase.ReturnVal != nil && pod != nil && pod.Name != testcase.ReturnVal.Name && pod.Namespace != testcase.ReturnVal.Namespace { t.Errorf("Expected Return Value to be %v but got %v in case %d", testcase.ReturnVal, pod, i) diff --git a/pkg/scheduler/api/silo_cluster_info.go b/pkg/scheduler/api/silo_cluster_info.go new file mode 100644 index 0000000000..d05414dff5 --- /dev/null +++ b/pkg/scheduler/api/silo_cluster_info.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "k8s.io/apimachinery/pkg/types" + + "volcano.sh/apis/pkg/apis/scheduling" +) + +// ClusterID is UID type, serves as unique ID for each queue +type ClusterID types.UID + +// SiloClusterInfo will have all details about queue +type SiloClusterInfo struct { + UID ClusterID + Cluster *scheduling.Cluster +} + +// NewSiloClusterInfo creates new queueInfo object +func NewSiloClusterInfo(cluster *scheduling.Cluster) *SiloClusterInfo { + return &SiloClusterInfo{ + UID: ClusterID(cluster.Name), + Cluster: cluster, + } +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 22b5645995..ac79a7fc1a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -44,6 +44,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" + batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling" schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -97,11 +98,13 @@ type SchedulerCache struct { csiStorageCapacityInformer storagev1alpha1.CSIStorageCapacityInformer cpuInformer cpuinformerv1.NumatopologyInformer - Binder Binder - Evictor Evictor - StatusUpdater StatusUpdater - VolumeBinder VolumeBinder - Recorder record.EventRecorder + Binder Binder + Evictor Evictor + StatusUpdater StatusUpdater + PodGroupBinder BatchBinder + VolumeBinder VolumeBinder + + Recorder record.EventRecorder Jobs map[schedulingapi.JobID]*schedulingapi.JobInfo Nodes map[string]*schedulingapi.NodeInfo @@ -272,6 +275,45 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *schedulingapi.TaskInfo, podVol return dvb.volumeBinder.BindPodVolumes(task.Pod, podVolumes) } +type podgroupBinder struct { + kubeclient *kubernetes.Clientset + vcclient *vcclient.Clientset +} + +// Bind will add silo cluster annotaion on pod and podgroup +func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*schedulingapi.JobInfo, error) { + if len(job.Tasks) == 0 { + klog.V(4).Infof("Job pods have not been created yet") + return job, nil + } + for _, task := range job.Tasks { + pod := task.Pod + pod.Annotations[batch.ForwardClusterKey] = cluster + pod.ResourceVersion = "" + _, err := pgb.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Error while update pod annotation with error: %v", err) + return nil, err + } + } + + pg := job.PodGroup + pg.Annotations[batch.ForwardClusterKey] = cluster + podgroup := &vcv1beta1.PodGroup{} + if err := schedulingscheme.Scheme.Convert(&pg.PodGroup, podgroup, nil); err != nil { + klog.Errorf("Error while converting PodGroup to v1alpha1.PodGroup with error: %v", err) + return nil, err + } + newPg, err := pgb.vcclient.SchedulingV1beta1().PodGroups(pg.Namespace).Update(context.TODO(), podgroup, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Error while update PodGroup annotation with error: %v", err) + return nil, err + } + job.PodGroup.ResourceVersion = newPg.ResourceVersion + klog.V(4).Infof("Bind PodGroup <%s> successfully", job.PodGroup.Name) + return job, nil +} + func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { kubeClient, err := kubernetes.NewForConfig(config) if err != nil { @@ -341,6 +383,11 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s vcclient: sc.vcClient, } + sc.PodGroupBinder = &podgroupBinder{ + kubeclient: sc.kubeClient, + vcclient: sc.vcClient, + } + informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0) // create informer for node information @@ -627,6 +674,15 @@ func (sc *SchedulerCache) Bind(taskInfo *schedulingapi.TaskInfo, hostname string return nil } +// BindPodGroup binds job to silo cluster +func (sc *SchedulerCache) BindPodGroup(job *schedulingapi.JobInfo, cluster string) error { + if _, err := sc.PodGroupBinder.Bind(job, cluster); err != nil { + klog.Errorf("Bind job <%s> to cluster <%s> failed: %v", job.Name, cluster, err) + return err + } + return nil +} + // GetPodVolumes get pod volume on the host func (sc *SchedulerCache) GetPodVolumes(task *schedulingapi.TaskInfo, node *v1.Node) (*volumescheduling.PodVolumes, error) { return sc.VolumeBinder.GetPodVolumes(task, node) diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 07b74e03c3..951d9ff529 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -40,6 +40,9 @@ type Cache interface { // TODO(jinzhej): clean up expire Tasks. Bind(task *api.TaskInfo, hostname string) error + // Bind Pod/PodGroup to cluster + BindPodGroup(job *api.JobInfo, cluster string) error + // Evict evicts the task to release resources. Evict(task *api.TaskInfo, reason string) error @@ -86,3 +89,8 @@ type StatusUpdater interface { UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) } + +// BatchBinder updates podgroup or job information +type BatchBinder interface { + Bind(job *api.JobInfo, cluster string) (*api.JobInfo, error) +} diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index 0b67433387..c14fe5be50 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -62,6 +62,8 @@ type PluginOption struct { // EnabledQueueOrder defines whether queueOrderFn is enabled EnabledQueueOrder *bool `yaml:"enableQueueOrder"` // EnabledPredicate defines whether predicateFn is enabled + EnabledClusterOrder *bool `yaml:"EnabledClusterOrder"` + // EnableClusterOrder defines whether clusterOrderFn is enabled EnabledPredicate *bool `yaml:"enablePredicate"` // EnabledBestNode defines whether bestNodeFn is enabled EnabledBestNode *bool `yaml:"enableBestNode"` diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index b185dd48b6..52bfcd8f20 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -59,6 +59,7 @@ type Session struct { queueOrderFns map[string]api.CompareFn taskOrderFns map[string]api.CompareFn namespaceOrderFns map[string]api.CompareFn + clusterOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn bestNodeFns map[string]api.BestNodeFn nodeOrderFns map[string]api.NodeOrderFn @@ -96,6 +97,7 @@ func openSession(cache cache.Cache) *Session { queueOrderFns: map[string]api.CompareFn{}, taskOrderFns: map[string]api.CompareFn{}, namespaceOrderFns: map[string]api.CompareFn{}, + clusterOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, bestNodeFns: map[string]api.BestNodeFn{}, nodeOrderFns: map[string]api.NodeOrderFn{}, @@ -166,6 +168,7 @@ func closeSession(ssn *Session) { ssn.jobOrderFns = nil ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil + ssn.clusterOrderFns = nil klog.V(3).Infof("Close Session %v", ssn.UID) } @@ -393,6 +396,11 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { return nil } +// BindPodGroup bind PodGroup to specified cluster +func (ssn *Session) BindPodGroup(job *api.JobInfo, cluster string) error { + return ssn.cache.BindPodGroup(job, cluster) +} + // UpdatePodGroupCondition update job condition accordingly. func (ssn *Session) UpdatePodGroupCondition(jobInfo *api.JobInfo, cond *scheduling.PodGroupCondition) error { job, ok := ssn.Jobs[jobInfo.UID] diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 946f1f0b9a..45159b0cd8 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -19,6 +19,7 @@ package framework import ( k8sframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -32,6 +33,11 @@ func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) { ssn.queueOrderFns[name] = qf } +// AddClusterOrderFn add queue order function +func (ssn *Session) AddClusterOrderFn(name string, qf api.CompareFn) { + ssn.clusterOrderFns[name] = qf +} + // AddTaskOrderFn add task order function func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { ssn.taskOrderFns[name] = cf @@ -488,6 +494,29 @@ func (ssn *Session) NamespaceOrderFn(l, r interface{}) bool { return lv < rv } +// ClusterOrderFn invoke ClusterOrderFn function of the plugins +func (ssn *Session) ClusterOrderFn(l, r interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledClusterOrder) { + continue + } + cof, found := ssn.clusterOrderFns[plugin.Name] + if !found { + continue + } + if j := cof(l, r); j != 0 { + return j < 0 + } + } + } + + // If no cluster order funcs, order cluster by ClusterID + lv := l.(*scheduling.Cluster) + rv := r.(*scheduling.Cluster) + return lv.Name < rv.Name +} + // QueueOrderFn invoke queueorder function of the plugins func (ssn *Session) QueueOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { From d404af9b21ddb4c8221666600a998562de0f729c Mon Sep 17 00:00:00 2001 From: william-wang Date: Tue, 8 Jun 2021 20:48:16 +0800 Subject: [PATCH 2/2] Update the vendor Signed-off-by: william-wang --- .../agiledragon/gomonkey/v2/LICENSE | 25 ++ .../bases/scheduling.volcano.sh_queues.yaml | 23 ++ .../v1beta1/scheduling.volcano.sh_queues.yaml | 23 ++ go.mod | 3 +- go.sum | 6 +- .../bases/scheduling.volcano.sh_queues.yaml | 23 ++ .../v1beta1/scheduling.volcano.sh_queues.yaml | 23 ++ installer/volcano-development.yaml | 23 ++ pkg/controllers/job/job_controller_actions.go | 12 +- .../job/job_controller_actions_test.go | 9 + pkg/controllers/job/job_state_test.go | 17 ++ .../agiledragon/gomonkey/v2/LICENSE | 21 ++ .../agiledragon/gomonkey/v2/README.md | 37 +++ .../github.com/agiledragon/gomonkey/v2/go.mod | 5 + .../github.com/agiledragon/gomonkey/v2/go.sum | 13 + .../agiledragon/gomonkey/v2/jmp_amd64.go | 18 ++ .../gomonkey/v2/modify_binary_darwin.go | 19 ++ .../gomonkey/v2/modify_binary_linux.go | 19 ++ .../gomonkey/v2/modify_binary_windows.go | 25 ++ .../agiledragon/gomonkey/v2/patch.go | 232 ++++++++++++++++++ vendor/modules.txt | 5 +- .../apis/pkg/apis/batch/v1alpha1/labels.go | 8 + .../apis/pkg/apis/scheduling/types.go | 10 + .../apis/pkg/apis/scheduling/v1beta1/types.go | 10 + .../v1beta1/zz_generated.conversion.go | 36 +++ .../v1beta1/zz_generated.deepcopy.go | 30 +++ .../apis/scheduling/zz_generated.deepcopy.go | 30 +++ 27 files changed, 699 insertions(+), 6 deletions(-) create mode 100644 LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/LICENSE create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/README.md create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/go.mod create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/go.sum create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go create mode 100644 vendor/github.com/agiledragon/gomonkey/v2/patch.go diff --git a/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE b/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE new file mode 100644 index 0000000000..7adc86eb25 --- /dev/null +++ b/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE @@ -0,0 +1,25 @@ += vendor/github.com/agiledragon/gomonkey/v2 licensed under: = + +MIT License + +Copyright (c) 2018 Zhang Xiaolong + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + += vendor/github.com/agiledragon/gomonkey/v2/LICENSE 9bd88aaa83a25e41d110ebfa6571e8cf diff --git a/config/crd/bases/scheduling.volcano.sh_queues.yaml b/config/crd/bases/scheduling.volcano.sh_queues.yaml index d9605538e4..eace3a3575 100644 --- a/config/crd/bases/scheduling.volcano.sh_queues.yaml +++ b/config/crd/bases/scheduling.volcano.sh_queues.yaml @@ -49,6 +49,29 @@ spec: x-kubernetes-int-or-string: true description: ResourceList is a set of (resource name, quantity) pairs. type: object + extendClusters: + description: extendCluster indicate the jobs in this Queue will be + dispatched to these clusters. + items: + description: CluterSpec represents the template of Cluster + properties: + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + name: + type: string + weight: + format: int32 + type: integer + type: object + type: array guarantee: description: Guarantee indicate configuration about resource reservation properties: diff --git a/config/crd/v1beta1/scheduling.volcano.sh_queues.yaml b/config/crd/v1beta1/scheduling.volcano.sh_queues.yaml index 70106f6ea4..7cb77eef5c 100644 --- a/config/crd/v1beta1/scheduling.volcano.sh_queues.yaml +++ b/config/crd/v1beta1/scheduling.volcano.sh_queues.yaml @@ -49,6 +49,29 @@ spec: x-kubernetes-int-or-string: true description: ResourceList is a set of (resource name, quantity) pairs. type: object + extendClusters: + description: extendCluster indicate the jobs in this Queue will be dispatched + to these clusters. + items: + description: CluterSpec represents the template of Cluster + properties: + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + name: + type: string + weight: + format: int32 + type: integer + type: object + type: array guarantee: description: Guarantee indicate configuration about resource reservation properties: diff --git a/go.mod b/go.mod index 921397adfb..4b8359e8a9 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module volcano.sh/volcano go 1.14 require ( + github.com/agiledragon/gomonkey/v2 v2.1.0 github.com/fsnotify/fsnotify v1.4.9 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/hashicorp/go-multierror v1.0.0 @@ -22,7 +23,7 @@ require ( k8s.io/klog v1.0.0 k8s.io/kubernetes v1.19.6 sigs.k8s.io/yaml v1.2.0 - volcano.sh/apis v0.0.0-20210528062323-838630df511b + volcano.sh/apis v0.0.0-20210603070204-70005b2d502a ) replace ( diff --git a/go.sum b/go.sum index 62d35e2f4c..b34694ac40 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/agiledragon/gomonkey/v2 v2.1.0 h1:+5Dbq8a1fn89IgVk35O233R41FH0nBKFPn50wDZpNs0= +github.com/agiledragon/gomonkey/v2 v2.1.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -855,5 +857,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI= -volcano.sh/apis v0.0.0-20210528062323-838630df511b h1:Tgg9IcLXQRiTli7KH+xteBTIrawUM07MGvN541/XyMo= -volcano.sh/apis v0.0.0-20210528062323-838630df511b/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY= +volcano.sh/apis v0.0.0-20210603070204-70005b2d502a h1:h6c+NZglstX8JGuwLrU6/0/FnBcT26XO1S3PvtykRTk= +volcano.sh/apis v0.0.0-20210603070204-70005b2d502a/go.mod h1:UaeJ/s5Hyd+ZhFLc+Kw9YlgM8gRZ/5OzXqHa0yKOoXY= diff --git a/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_queues.yaml b/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_queues.yaml index 30507fb53d..e540a5d0fb 100644 --- a/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_queues.yaml +++ b/installer/helm/chart/volcano/crd/bases/scheduling.volcano.sh_queues.yaml @@ -47,6 +47,29 @@ spec: x-kubernetes-int-or-string: true description: ResourceList is a set of (resource name, quantity) pairs. type: object + extendClusters: + description: extendCluster indicate the jobs in this Queue will be + dispatched to these clusters. + items: + description: CluterSpec represents the template of Cluster + properties: + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + name: + type: string + weight: + format: int32 + type: integer + type: object + type: array guarantee: description: Guarantee indicate configuration about resource reservation properties: diff --git a/installer/helm/chart/volcano/crd/v1beta1/scheduling.volcano.sh_queues.yaml b/installer/helm/chart/volcano/crd/v1beta1/scheduling.volcano.sh_queues.yaml index 472a859c45..285d0646c0 100644 --- a/installer/helm/chart/volcano/crd/v1beta1/scheduling.volcano.sh_queues.yaml +++ b/installer/helm/chart/volcano/crd/v1beta1/scheduling.volcano.sh_queues.yaml @@ -47,6 +47,29 @@ spec: x-kubernetes-int-or-string: true description: ResourceList is a set of (resource name, quantity) pairs. type: object + extendClusters: + description: extendCluster indicate the jobs in this Queue will be dispatched + to these clusters. + items: + description: CluterSpec represents the template of Cluster + properties: + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + name: + type: string + weight: + format: int32 + type: integer + type: object + type: array guarantee: description: Guarantee indicate configuration about resource reservation properties: diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 3db1d0498e..9cea04dd93 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -7796,6 +7796,29 @@ spec: x-kubernetes-int-or-string: true description: ResourceList is a set of (resource name, quantity) pairs. type: object + extendClusters: + description: extendCluster indicate the jobs in this Queue will be + dispatched to these clusters. + items: + description: CluterSpec represents the template of Cluster + properties: + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ResourceList is a set of (resource name, quantity) + pairs. + type: object + name: + type: string + weight: + format: int32 + type: integer + type: object + type: array guarantee: description: Guarantee indicate configuration about resource reservation properties: diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index d607a4fa50..7b008c99b7 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -194,6 +194,15 @@ func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error { return nil } +func (cc *jobcontroller) GetQueueInfo(queue string) (*scheduling.Queue, error) { + queueInfo, err := cc.queueLister.Get(queue) + if err != nil { + klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error()) + } + + return queueInfo, err +} + func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { job := jobInfo.Job klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version) @@ -209,9 +218,8 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat job = job.DeepCopy() // Find queue that job belongs to, and check if the queue has forwarding metadata - queueInfo, err := cc.queueLister.Get(job.Spec.Queue) + queueInfo, err := cc.GetQueueInfo(job.Spec.Queue) if err != nil { - klog.Errorf("Failed to get queue from queueLister, error: %s", err.Error()) return err } diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 6a761b2e30..ee494d1d0d 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -20,8 +20,11 @@ import ( "context" "errors" "fmt" + "reflect" "testing" + gomonkey "github.com/agiledragon/gomonkey/v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -262,6 +265,12 @@ func TestSyncJobFunc(t *testing.T) { t.Run(testcase.Name, func(t *testing.T) { fakeController := newFakeController() + patches := gomonkey.ApplyMethod(reflect.TypeOf(fakeController), "GetQueueInfo", func(_ *jobcontroller, _ string) (*schedulingv1alpha2.Queue, error) { + return &schedulingv1alpha2.Queue{}, nil + }) + + defer patches.Reset() + jobPlugins := make(map[string][]string) for _, plugin := range testcase.Plugins { diff --git a/pkg/controllers/job/job_state_test.go b/pkg/controllers/job/job_state_test.go index a82ef77371..03ac106460 100644 --- a/pkg/controllers/job/job_state_test.go +++ b/pkg/controllers/job/job_state_test.go @@ -19,12 +19,17 @@ package job import ( "context" "fmt" + "reflect" "testing" + "github.com/agiledragon/gomonkey/v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "volcano.sh/apis/pkg/apis/batch/v1alpha1" + schedulingv1alpha2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" "volcano.sh/volcano/pkg/controllers/job/state" @@ -678,6 +683,12 @@ func TestPendingState_Execute(t *testing.T) { fakecontroller := newFakeController() state.KillJob = fakecontroller.killJob + patches := gomonkey.ApplyMethod(reflect.TypeOf(fakecontroller), "GetQueueInfo", func(_ *jobcontroller, _ string) (*schedulingv1alpha2.Queue, error) { + return &schedulingv1alpha2.Queue{}, nil + }) + + defer patches.Reset() + _, err := fakecontroller.vcClient.BatchV1alpha1().Jobs(namespace).Create(context.TODO(), testcase.JobInfo.Job, metav1.CreateOptions{}) if err != nil { t.Error("Error while creating Job") @@ -1141,6 +1152,12 @@ func TestRunningState_Execute(t *testing.T) { fakecontroller := newFakeController() state.KillJob = fakecontroller.killJob + patches := gomonkey.ApplyMethod(reflect.TypeOf(fakecontroller), "GetQueueInfo", func(_ *jobcontroller, _ string) (*schedulingv1alpha2.Queue, error) { + return &schedulingv1alpha2.Queue{}, nil + }) + + defer patches.Reset() + _, err := fakecontroller.vcClient.BatchV1alpha1().Jobs(namespace).Create(context.TODO(), testcase.JobInfo.Job, metav1.CreateOptions{}) if err != nil { t.Error("Error while creating Job") diff --git a/vendor/github.com/agiledragon/gomonkey/v2/LICENSE b/vendor/github.com/agiledragon/gomonkey/v2/LICENSE new file mode 100644 index 0000000000..d75dc90e65 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Zhang Xiaolong + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/agiledragon/gomonkey/v2/README.md b/vendor/github.com/agiledragon/gomonkey/v2/README.md new file mode 100644 index 0000000000..04d9e73c00 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/README.md @@ -0,0 +1,37 @@ +# gomonkey + +gomonkey is a library to make monkey patching in unit tests easy. + +## Features + ++ support a patch for a function ++ support a patch for a member method ++ support a patch for a interface ++ support a patch for a function variable ++ support a patch for a global variable ++ support patches of a specified sequence for a function ++ support patches of a specified sequence for a member method ++ support patches of a specified sequence for a interface ++ support patches of a specified sequence for a function variable + +## Notes ++ gomonkey fails to patch a function or a member method if inlining is enabled, please running your tests with inlining disabled by adding the command line argument that is `-gcflags=-l`(below go1.10) or `-gcflags=all=-l`(go1.10 and above). ++ gomonkey should work on any amd64 system. ++ A panic may happen when a goroutine is patching a function or a member method that is visited by another goroutine at the same time. That is to say, gomonkey is not threadsafe. ++ go1.6 version of the reflection mechanism supports the query of private member methods, but go1.7 and above does not support it. However, all versions of the reflection mechanism support the query of private functions, so gomonkey will trigger a `panic` for only patching a private member method when go1.7 and above is used. + + +## Supported Platform: + +- MAC OS X amd64 +- Linux amd64 +- Windows amd64 + +## Installation +```go +$ go get github.com/agiledragon/gomonkey +``` +## Using gomonkey + +Please refer to the test cases as idioms, very complete and detailed. + diff --git a/vendor/github.com/agiledragon/gomonkey/v2/go.mod b/vendor/github.com/agiledragon/gomonkey/v2/go.mod new file mode 100644 index 0000000000..f39f90cd57 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/go.mod @@ -0,0 +1,5 @@ +module github.com/agiledragon/gomonkey/v2 + +go 1.14 + +require github.com/smartystreets/goconvey v1.6.4 diff --git a/vendor/github.com/agiledragon/gomonkey/v2/go.sum b/vendor/github.com/agiledragon/gomonkey/v2/go.sum new file mode 100644 index 0000000000..a46a039b7a --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/go.sum @@ -0,0 +1,13 @@ +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go b/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go new file mode 100644 index 0000000000..02c1c42c70 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go @@ -0,0 +1,18 @@ +package gomonkey + +func buildJmpDirective(double uintptr) []byte { + d0 := byte(double) + d1 := byte(double >> 8) + d2 := byte(double >> 16) + d3 := byte(double >> 24) + d4 := byte(double >> 32) + d5 := byte(double >> 40) + d6 := byte(double >> 48) + d7 := byte(double >> 56) + + return []byte{ + 0x48, 0xBA, d0, d1, d2, d3, d4, d5, d6, d7, // MOV rdx, double + 0xFF, 0x22, // JMP [rdx] + } +} + diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go new file mode 100644 index 0000000000..458aa3381f --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go @@ -0,0 +1,19 @@ +package gomonkey + +import "syscall" + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + + page := entryAddress(pageStart(target), syscall.Getpagesize()) + err := syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC) + if err != nil { + panic(err) + } + copy(function, bytes) + + err = syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_EXEC) + if err != nil { + panic(err) + } +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go new file mode 100644 index 0000000000..458aa3381f --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go @@ -0,0 +1,19 @@ +package gomonkey + +import "syscall" + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + + page := entryAddress(pageStart(target), syscall.Getpagesize()) + err := syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC) + if err != nil { + panic(err) + } + copy(function, bytes) + + err = syscall.Mprotect(page, syscall.PROT_READ|syscall.PROT_EXEC) + if err != nil { + panic(err) + } +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go new file mode 100644 index 0000000000..ef0dbc756a --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go @@ -0,0 +1,25 @@ +package gomonkey + +import ( + "syscall" + "unsafe" +) + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + + proc := syscall.NewLazyDLL("kernel32.dll").NewProc("VirtualProtect") + const PROT_READ_WRITE = 0x40 + var old uint32 + result, _, _ := proc.Call(target, uintptr(len(bytes)), uintptr(PROT_READ_WRITE), uintptr(unsafe.Pointer(&old))) + if result == 0 { + panic(result) + } + copy(function, bytes) + + var ignore uint32 + result, _, _ = proc.Call(target, uintptr(len(bytes)), uintptr(old), uintptr(unsafe.Pointer(&ignore))) + if result == 0 { + panic(result) + } +} \ No newline at end of file diff --git a/vendor/github.com/agiledragon/gomonkey/v2/patch.go b/vendor/github.com/agiledragon/gomonkey/v2/patch.go new file mode 100644 index 0000000000..99df388515 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/patch.go @@ -0,0 +1,232 @@ +package gomonkey + +import ( + "fmt" + "reflect" + "syscall" + "unsafe" +) + +type Patches struct { + originals map[reflect.Value][]byte + values map[reflect.Value]reflect.Value + valueHolders map[reflect.Value]reflect.Value +} + +type Params []interface{} +type OutputCell struct { + Values Params + Times int +} + +func ApplyFunc(target, double interface{}) *Patches { + return create().ApplyFunc(target, double) +} + +func ApplyMethod(target reflect.Type, methodName string, double interface{}) *Patches { + return create().ApplyMethod(target, methodName, double) +} + +func ApplyGlobalVar(target, double interface{}) *Patches { + return create().ApplyGlobalVar(target, double) +} + +func ApplyFuncVar(target, double interface{}) *Patches { + return create().ApplyFuncVar(target, double) +} + +func ApplyFuncSeq(target interface{}, outputs []OutputCell) *Patches { + return create().ApplyFuncSeq(target, outputs) +} + +func ApplyMethodSeq(target reflect.Type, methodName string, outputs []OutputCell) *Patches { + return create().ApplyMethodSeq(target, methodName, outputs) +} + +func ApplyFuncVarSeq(target interface{}, outputs []OutputCell) *Patches { + return create().ApplyFuncVarSeq(target, outputs) +} + +func create() *Patches { + return &Patches{originals: make(map[reflect.Value][]byte), values: make(map[reflect.Value]reflect.Value), valueHolders: make(map[reflect.Value]reflect.Value)} +} + +func NewPatches() *Patches { + return create() +} + +func (this *Patches) ApplyFunc(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + d := reflect.ValueOf(double) + return this.ApplyCore(t, d) +} + +func (this *Patches) ApplyMethod(target reflect.Type, methodName string, double interface{}) *Patches { + m, ok := target.MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + d := reflect.ValueOf(double) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyGlobalVar(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + + this.values[t] = reflect.ValueOf(t.Elem().Interface()) + d := reflect.ValueOf(double) + t.Elem().Set(d) + return this +} + +func (this *Patches) ApplyFuncVar(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + d := reflect.ValueOf(double) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + this.check(t.Elem(), d) + return this.ApplyGlobalVar(target, double) +} + +func (this *Patches) ApplyFuncSeq(target interface{}, outputs []OutputCell) *Patches { + funcType := reflect.TypeOf(target) + t := reflect.ValueOf(target) + d := getDoubleFunc(funcType, outputs) + return this.ApplyCore(t, d) +} + +func (this *Patches) ApplyMethodSeq(target reflect.Type, methodName string, outputs []OutputCell) *Patches { + m, ok := target.MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + d := getDoubleFunc(m.Type, outputs) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyFuncVarSeq(target interface{}, outputs []OutputCell) *Patches { + t := reflect.ValueOf(target) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + if t.Elem().Kind() != reflect.Func { + panic("target is not a func") + } + + funcType := reflect.TypeOf(target).Elem() + double := getDoubleFunc(funcType, outputs).Interface() + return this.ApplyGlobalVar(target, double) +} + +func (this *Patches) Reset() { + for target, bytes := range this.originals { + modifyBinary(*(*uintptr)(getPointer(target)), bytes) + delete(this.originals, target) + } + + for target, variable := range this.values { + target.Elem().Set(variable) + } +} + +func (this *Patches) ApplyCore(target, double reflect.Value) *Patches { + this.check(target, double) + if _, ok := this.originals[target]; ok { + panic("patch has been existed") + } + + this.valueHolders[double] = double + original := replace(*(*uintptr)(getPointer(target)), uintptr(getPointer(double))) + this.originals[target] = original + return this +} + +func (this *Patches) check(target, double reflect.Value) { + if target.Kind() != reflect.Func { + panic("target is not a func") + } + + if double.Kind() != reflect.Func { + panic("double is not a func") + } + + if target.Type() != double.Type() { + panic(fmt.Sprintf("target type(%s) and double type(%s) are different", target.Type(), double.Type())) + } +} + +func replace(target, double uintptr) []byte { + code := buildJmpDirective(double) + bytes := entryAddress(target, len(code)) + original := make([]byte, len(bytes)) + copy(original, bytes) + modifyBinary(target, code) + return original +} + +func getDoubleFunc(funcType reflect.Type, outputs []OutputCell) reflect.Value { + if funcType.NumOut() != len(outputs[0].Values) { + panic(fmt.Sprintf("func type has %v return values, but only %v values provided as double", + funcType.NumOut(), len(outputs[0].Values))) + } + + slice := make([]Params, 0) + for _, output := range outputs { + t := 0 + if output.Times <= 1 { + t = 1 + } else { + t = output.Times + } + for j := 0; j < t; j++ { + slice = append(slice, output.Values) + } + } + + i := 0 + len := len(slice) + return reflect.MakeFunc(funcType, func(_ []reflect.Value) []reflect.Value { + if i < len { + i++ + return GetResultValues(funcType, slice[i-1]...) + } + panic("double seq is less than call seq") + }) +} + +func GetResultValues(funcType reflect.Type, results ...interface{}) []reflect.Value { + var resultValues []reflect.Value + for i, r := range results { + var resultValue reflect.Value + if r == nil { + resultValue = reflect.Zero(funcType.Out(i)) + } else { + v := reflect.New(funcType.Out(i)) + v.Elem().Set(reflect.ValueOf(r)) + resultValue = v.Elem() + } + resultValues = append(resultValues, resultValue) + } + return resultValues +} + +type funcValue struct { + _ uintptr + p unsafe.Pointer +} + +func getPointer(v reflect.Value) unsafe.Pointer { + return (*funcValue)(unsafe.Pointer(&v)).p +} + +func entryAddress(p uintptr, l int) []byte { + return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{Data: p, Len: l, Cap: l})) +} + +func pageStart(ptr uintptr) uintptr { + return ptr & ^(uintptr(syscall.Getpagesize() - 1)) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index af598fa5f1..15a23d119f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -4,6 +4,9 @@ cloud.google.com/go/compute/metadata github.com/PuerkitoBio/purell # github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 github.com/PuerkitoBio/urlesc +# github.com/agiledragon/gomonkey/v2 v2.1.0 +## explicit +github.com/agiledragon/gomonkey/v2 # github.com/beorn7/perks v1.0.1 github.com/beorn7/perks/quantile # github.com/blang/semver v3.5.0+incompatible @@ -835,7 +838,7 @@ sigs.k8s.io/structured-merge-diff/v4/value # sigs.k8s.io/yaml v1.2.0 ## explicit sigs.k8s.io/yaml -# volcano.sh/apis v0.0.0-20210528062323-838630df511b +# volcano.sh/apis v0.0.0-20210603070204-70005b2d502a ## explicit volcano.sh/apis/pkg/apis/batch/v1alpha1 volcano.sh/apis/pkg/apis/bus/v1alpha1 diff --git a/vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1/labels.go b/vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1/labels.go index 77cb11ac33..4ee1d26a36 100644 --- a/vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1/labels.go +++ b/vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1/labels.go @@ -35,4 +35,12 @@ const ( PodgroupNamePrefix = "podgroup-" // PodTemplateKey type specify a equivalence pod class PodTemplateKey = "volcano.sh/template-uid" + // JobForwardingKey job forwarding key used in job annotation + JobForwardingKey = "volcano.sh/job-forwarding" + // ForwardClusterKey cluster key used in pod annotation + ForwardClusterKey = "volcano.sh/forward-cluster" + // OrginalNameKey annotation key for resource name + OrginalNameKey = "volcano.sh/burst-name" + // BurstToSiloClusterAnnotation labels key for resource only in silo cluster + BurstToSiloClusterAnnotation = "volcano.sh/silo-resource" ) diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go index 16e71a13d3..7f0a526426 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/types.go @@ -278,6 +278,13 @@ type QueueStatus struct { Reservation Reservation `json:"reservation,omitempty" protobuf:"bytes,6,opt,name=reservation"` } +// CluterSpec represents the template of Cluster +type Cluster struct { + Name string + Weight int32 + Capacity v1.ResourceList +} + // QueueSpec represents the template of Queue. type QueueSpec struct { Weight int32 @@ -288,6 +295,9 @@ type QueueSpec struct { // Reclaimable indicate whether the queue can be reclaimed by other queue Reclaimable *bool + // extendCluster indicate the jobs in this Queue will be dispatched to these clusters. + ExtendClusters []Cluster + // Guarantee indicate configuration about resource reservation Guarantee Guarantee `json:"guarantee,omitempty" protobuf:"bytes,4,opt,name=guarantee"` } diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/types.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/types.go index 9b277b40d0..7f84057b85 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/types.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/types.go @@ -294,6 +294,13 @@ type QueueStatus struct { Reservation Reservation `json:"reservation,omitempty" protobuf:"bytes,6,opt,name=reservation"` } +// CluterSpec represents the template of Cluster +type Cluster struct { + Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"` + Weight int32 `json:"weight,omitempty" protobuf:"bytes,2,opt,name=weight"` + Capacity v1.ResourceList `json:"capacity,omitempty" protobuf:"bytes,3,opt,name=capacity"` +} + // QueueSpec represents the template of Queue. type QueueSpec struct { Weight int32 `json:"weight,omitempty" protobuf:"bytes,1,opt,name=weight"` @@ -302,6 +309,9 @@ type QueueSpec struct { // Reclaimable indicate whether the queue can be reclaimed by other queue Reclaimable *bool `json:"reclaimable,omitempty" protobuf:"bytes,3,opt,name=reclaimable"` + // extendCluster indicate the jobs in this Queue will be dispatched to these clusters. + ExtendClusters []Cluster `json:"extendClusters,omitempty" protobuf:"bytes,4,opt,name=extendClusters"` + // Guarantee indicate configuration about resource reservation Guarantee Guarantee `json:"guarantee,omitempty" protobuf:"bytes,4,opt,name=guarantee"` } diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.conversion.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.conversion.go index 7a87d215ac..35dc6ddfc3 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.conversion.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.conversion.go @@ -35,6 +35,16 @@ func init() { // RegisterConversions adds conversion functions to the given scheme. // Public to allow building arbitrary schemes. func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*Cluster)(nil), (*scheduling.Cluster)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta1_Cluster_To_scheduling_Cluster(a.(*Cluster), b.(*scheduling.Cluster), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*scheduling.Cluster)(nil), (*Cluster)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_scheduling_Cluster_To_v1beta1_Cluster(a.(*scheduling.Cluster), b.(*Cluster), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*Guarantee)(nil), (*scheduling.Guarantee)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta1_Guarantee_To_scheduling_Guarantee(a.(*Guarantee), b.(*scheduling.Guarantee), scope) }); err != nil { @@ -148,6 +158,30 @@ func RegisterConversions(s *runtime.Scheme) error { return nil } +func autoConvert_v1beta1_Cluster_To_scheduling_Cluster(in *Cluster, out *scheduling.Cluster, s conversion.Scope) error { + out.Name = in.Name + out.Weight = in.Weight + out.Capacity = *(*v1.ResourceList)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_v1beta1_Cluster_To_scheduling_Cluster is an autogenerated conversion function. +func Convert_v1beta1_Cluster_To_scheduling_Cluster(in *Cluster, out *scheduling.Cluster, s conversion.Scope) error { + return autoConvert_v1beta1_Cluster_To_scheduling_Cluster(in, out, s) +} + +func autoConvert_scheduling_Cluster_To_v1beta1_Cluster(in *scheduling.Cluster, out *Cluster, s conversion.Scope) error { + out.Name = in.Name + out.Weight = in.Weight + out.Capacity = *(*v1.ResourceList)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_scheduling_Cluster_To_v1beta1_Cluster is an autogenerated conversion function. +func Convert_scheduling_Cluster_To_v1beta1_Cluster(in *scheduling.Cluster, out *Cluster, s conversion.Scope) error { + return autoConvert_scheduling_Cluster_To_v1beta1_Cluster(in, out, s) +} + func autoConvert_v1beta1_Guarantee_To_scheduling_Guarantee(in *Guarantee, out *scheduling.Guarantee, s conversion.Scope) error { out.Resource = *(*v1.ResourceList)(unsafe.Pointer(&in.Resource)) return nil @@ -386,6 +420,7 @@ func autoConvert_v1beta1_QueueSpec_To_scheduling_QueueSpec(in *QueueSpec, out *s out.Weight = in.Weight out.Capability = *(*v1.ResourceList)(unsafe.Pointer(&in.Capability)) out.Reclaimable = (*bool)(unsafe.Pointer(in.Reclaimable)) + out.ExtendClusters = *(*[]scheduling.Cluster)(unsafe.Pointer(&in.ExtendClusters)) if err := Convert_v1beta1_Guarantee_To_scheduling_Guarantee(&in.Guarantee, &out.Guarantee, s); err != nil { return err } @@ -402,6 +437,7 @@ func autoConvert_scheduling_QueueSpec_To_v1beta1_QueueSpec(in *scheduling.QueueS out.Capability = *(*v1.ResourceList)(unsafe.Pointer(&in.Capability)) // WARNING: in.State requires manual conversion: does not exist in peer-type out.Reclaimable = (*bool)(unsafe.Pointer(in.Reclaimable)) + out.ExtendClusters = *(*[]Cluster)(unsafe.Pointer(&in.ExtendClusters)) if err := Convert_scheduling_Guarantee_To_v1beta1_Guarantee(&in.Guarantee, &out.Guarantee, s); err != nil { return err } diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.deepcopy.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.deepcopy.go index 372a642ea2..5fd4b7e297 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.deepcopy.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.deepcopy.go @@ -25,6 +25,29 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Cluster) DeepCopyInto(out *Cluster) { + *out = *in + if in.Capacity != nil { + in, out := &in.Capacity, &out.Capacity + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cluster. +func (in *Cluster) DeepCopy() *Cluster { + if in == nil { + return nil + } + out := new(Cluster) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Guarantee) DeepCopyInto(out *Guarantee) { *out = *in @@ -259,6 +282,13 @@ func (in *QueueSpec) DeepCopyInto(out *QueueSpec) { *out = new(bool) **out = **in } + if in.ExtendClusters != nil { + in, out := &in.ExtendClusters, &out.ExtendClusters + *out = make([]Cluster, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.Guarantee.DeepCopyInto(&out.Guarantee) return } diff --git a/vendor/volcano.sh/apis/pkg/apis/scheduling/zz_generated.deepcopy.go b/vendor/volcano.sh/apis/pkg/apis/scheduling/zz_generated.deepcopy.go index 09208556ac..06580860e5 100644 --- a/vendor/volcano.sh/apis/pkg/apis/scheduling/zz_generated.deepcopy.go +++ b/vendor/volcano.sh/apis/pkg/apis/scheduling/zz_generated.deepcopy.go @@ -25,6 +25,29 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Cluster) DeepCopyInto(out *Cluster) { + *out = *in + if in.Capacity != nil { + in, out := &in.Capacity, &out.Capacity + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cluster. +func (in *Cluster) DeepCopy() *Cluster { + if in == nil { + return nil + } + out := new(Cluster) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Guarantee) DeepCopyInto(out *Guarantee) { *out = *in @@ -259,6 +282,13 @@ func (in *QueueSpec) DeepCopyInto(out *QueueSpec) { *out = new(bool) **out = **in } + if in.ExtendClusters != nil { + in, out := &in.ExtendClusters, &out.ExtendClusters + *out = make([]Cluster, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.Guarantee.DeepCopyInto(&out.Guarantee) return }