From 39b1f5c3fe9f15c2c3bd19938c9eecd632781121 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Sun, 28 Apr 2019 17:03:59 +0800 Subject: [PATCH] Update kube batch source --- Gopkg.lock | 18 +- Gopkg.toml | 3 +- .../cmd/kube-batch/app/options/options.go | 7 +- .../kube-batch/cmd/kube-batch/app/server.go | 3 + .../pkg/apis/scheduling/v1alpha1/types.go | 18 +- .../v1alpha1/zz_generated.deepcopy.go | 19 +- .../clientset/versioned/scheme/register.go | 22 +- .../typed/scheduling/v1alpha1/podgroup.go | 17 ++ .../typed/scheduling/v1alpha1/queue.go | 33 +++ .../internalinterfaces/factory_interfaces.go | 2 + .../scheduling/v1alpha1/podgroup.go | 6 +- .../scheduling/v1alpha1/queue.go | 6 +- .../scheduler/actions/allocate/allocate.go | 92 ++++---- .../pkg/scheduler/actions/preempt/preempt.go | 35 +-- .../pkg/scheduler/api/cluster_info.go | 1 + .../kube-batch/pkg/scheduler/api/helpers.go | 10 +- .../pkg/scheduler/api/helpers/helpers.go | 14 +- .../kube-batch/pkg/scheduler/api/job_info.go | 83 +++++++- .../kube-batch/pkg/scheduler/api/node_info.go | 10 +- .../kube-batch/pkg/scheduler/api/pod_info.go | 4 +- .../pkg/scheduler/api/queue_info.go | 4 + .../pkg/scheduler/api/resource_info.go | 201 +++++++++++++++--- .../pkg/scheduler/api/test_utils.go | 2 +- .../kube-batch/pkg/scheduler/api/types.go | 5 +- .../kube-batch/pkg/scheduler/cache/cache.go | 30 ++- .../pkg/scheduler/cache/event_handlers.go | 20 +- .../pkg/scheduler/cache/interface.go | 6 +- .../kube-batch/pkg/scheduler/cache/util.go | 4 +- .../pkg/scheduler/conf/scheduler_conf.go | 34 +-- .../{util/sort.go => framework/arguments.go} | 35 +-- .../pkg/scheduler/framework/event.go | 2 + .../pkg/scheduler/framework/framework.go | 2 + .../pkg/scheduler/framework/interface.go | 1 + .../pkg/scheduler/framework/plugins.go | 10 +- .../pkg/scheduler/framework/session.go | 57 ++--- .../scheduler/framework/session_plugins.go | 87 ++++++-- .../pkg/scheduler/framework/statement.go | 5 + .../pkg/scheduler/metrics/metrics.go | 24 +-- .../plugins/conformance/conformance.go | 7 +- .../pkg/scheduler/plugins/defaults.go | 52 +++++ .../pkg/scheduler/plugins/drf/drf.go | 7 +- .../pkg/scheduler/plugins/gang/gang.go | 67 ++---- .../scheduler/plugins/nodeorder/nodeorder.go | 134 ++---------- .../plugins/predicates/predicates.go | 161 +++++++------- .../scheduler/plugins/priority/priority.go | 5 +- .../plugins/proportion/proportion.go | 7 +- .../pkg/scheduler/plugins/util/util.go | 114 ++++++++++ .../kube-batch/pkg/scheduler/scheduler.go | 6 +- .../kube-batch/pkg/scheduler/util.go | 9 + .../pkg/scheduler/util/priority_queue.go | 6 + .../pkg/scheduler/util/scheduler_helper.go | 114 ++++++++++ .../pkg/scheduler/util/test_utils.go | 163 ++++++++++++++ .../kube-batch/pkg/version/version.go | 4 +- 53 files changed, 1275 insertions(+), 513 deletions(-) rename vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/{util/sort.go => framework/arguments.go} (57%) create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/defaults.go create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go create mode 100644 vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/test_utils.go diff --git a/Gopkg.lock b/Gopkg.lock index 3803e56fa4..d4fe1488e4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -203,8 +203,8 @@ version = "1.1.4" [[projects]] - branch = "release-0.4" - digest = "1:e9ad5f6b6e8e2b261432d0e8bdb7a7edef7526b7428ccdf5d5040b96e58bae66" + branch = "volcano-master" + digest = "1:52294fbd6648d468a03845d1d908a8d6e5a86d5cd9fe7f32236d8275721a92df" name = "github.com/kubernetes-sigs/kube-batch" packages = [ "cmd/kube-batch/app", @@ -239,11 +239,13 @@ "pkg/scheduler/plugins/predicates", "pkg/scheduler/plugins/priority", "pkg/scheduler/plugins/proportion", + "pkg/scheduler/plugins/util", "pkg/scheduler/util", "pkg/version", ] pruneopts = "UT" - revision = "4ec1cce0e5f8a4309bed32f9feff0fc1dadffe9f" + revision = "5e9977d68a13938fbb6d7dda564da0dca1b1c98c" + source = "https://github.com/volcano-sh/kube-batch" [[projects]] digest = "1:ff5ebae34cfbf047d505ee150de27e60570e8c394b3b8fdbb720ff6ac71985fc" @@ -1049,11 +1051,6 @@ "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1", "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions", "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api", - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers", - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache", - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf", - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework", - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics", "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins", "github.com/onsi/ginkgo", "github.com/onsi/gomega", @@ -1107,12 +1104,7 @@ "k8s.io/code-generator/cmd/lister-gen", "k8s.io/gengo/args", "k8s.io/gengo/examples/deepcopy-gen/generators", - "k8s.io/kubernetes/pkg/apis/scheduling", - "k8s.io/kubernetes/pkg/scheduler/algorithm", - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates", - "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities", "k8s.io/kubernetes/pkg/scheduler/api", - "k8s.io/kubernetes/pkg/scheduler/cache", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 7f10a5160c..9b313dfac1 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -38,7 +38,8 @@ required = [ [[constraint]] name = "github.com/kubernetes-sigs/kube-batch" - branch = "release-0.4" + branch = "volcano-master" + source = "https://github.com/volcano-sh/kube-batch" [[constraint]] name = "github.com/onsi/ginkgo" diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go index 21e62b28c8..d88e890a8c 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options/options.go @@ -45,6 +45,7 @@ type ServerOption struct { EnablePriorityClass bool } +// ServerOpts server options var ServerOpts *ServerOption // NewServerOption creates a new CMServer with a default config. @@ -58,7 +59,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information") // kube-batch will ignore pods with scheduler names other than specified with the option - fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods with the scheduler-name") + fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods whose .spec.SchedulerName is same as scheduler-name") fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file") fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle") fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job") @@ -66,12 +67,13 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { "Start a leader election client and gain leadership before "+ "executing the main loop. Enable this when running replicated kube-batch for high availability") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") - fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object") + fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object that is used for leader election") fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.BoolVar(&s.EnablePriorityClass, "priority-class", true, "Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false") } +// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled func (s *ServerOption) CheckOptionOrDie() error { if s.EnableLeaderElection && s.LockObjectNamespace == "" { return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled") @@ -80,6 +82,7 @@ func (s *ServerOption) CheckOptionOrDie() error { return nil } +// RegisterOptions registers options func (s *ServerOption) RegisterOptions() { ServerOpts = s } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go index 50f41ddb4d..f0e1491d72 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/server.go @@ -34,6 +34,8 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + // Register gcp auth _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" @@ -57,6 +59,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return rest.InClusterConfig() } +// Run the kubeBatch scheduler func Run(opt *options.ServerOption) error { if opt.PrintVersion { version.PrintVersionAndExit(apiVersion) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go index c8ff2c7db7..e4d0004c08 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/types.go @@ -17,7 +17,7 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -173,10 +173,24 @@ type Queue struct { // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` - // Specification of the desired behavior of the pod group. + // Specification of the desired behavior of the queue. // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status // +optional Spec QueueSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` + + // The status of queue. + // +optional + Status QueueStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` +} + +// QueueStatus represents the status of Queue. +type QueueStatus struct { + // The number of 'Unknonw' PodGroup in this queue. + Unknown int32 `json:"unknown,omitempty" protobuf:"bytes,1,opt,name=unknown"` + // The number of 'Pending' PodGroup in this queue. + Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"` + // The number of 'Running' PodGroup in this queue. + Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"` } // QueueSpec represents the template of Queue. diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go index aaaa4e2c21..3bc3811730 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ // +build !ignore_autogenerated /* -Copyright 2018 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -147,6 +147,7 @@ func (in *Queue) DeepCopyInto(out *Queue) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec + out.Status = in.Status return } @@ -216,3 +217,19 @@ func (in *QueueSpec) DeepCopy() *QueueSpec { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QueueStatus) DeepCopyInto(out *QueueStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueStatus. +func (in *QueueStatus) DeepCopy() *QueueStatus { + if in == nil { + return nil + } + out := new(QueueStatus) + in.DeepCopyInto(out) + return out +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme/register.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme/register.go index e08eecc63e..17b3d9ccb5 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme/register.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme/register.go @@ -24,19 +24,14 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" serializer "k8s.io/apimachinery/pkg/runtime/serializer" - - corev1 "k8s.io/api/core/v1" - //PDB defintions - policyv1beta1 "k8s.io/api/policy/v1beta1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) var Scheme = runtime.NewScheme() var Codecs = serializer.NewCodecFactory(Scheme) var ParameterCodec = runtime.NewParameterCodec(Scheme) - -func init() { - v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) - AddToScheme(Scheme) +var localSchemeBuilder = runtime.SchemeBuilder{ + schedulingv1alpha1.AddToScheme, } // AddToScheme adds all types of this clientset into the given scheme. This allows composition @@ -49,12 +44,13 @@ func init() { // ) // // kclientset, _ := kubernetes.NewForConfig(c) -// aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. -func AddToScheme(scheme *runtime.Scheme) { - schedulingv1alpha1.AddToScheme(scheme) - corev1.AddToScheme(scheme) - policyv1beta1.AddToScheme(scheme) +var AddToScheme = localSchemeBuilder.AddToScheme + +func init() { + v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) + utilruntime.Must(AddToScheme(Scheme)) } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/podgroup.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/podgroup.go index ea0cb3628c..0bfd5a6dab 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/podgroup.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/podgroup.go @@ -19,6 +19,8 @@ limitations under the License. package v1alpha1 import ( + "time" + v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" scheme "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -76,11 +78,16 @@ func (c *podGroups) Get(name string, options v1.GetOptions) (result *v1alpha1.Po // List takes label and field selectors, and returns the list of PodGroups that match those selectors. func (c *podGroups) List(opts v1.ListOptions) (result *v1alpha1.PodGroupList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } result = &v1alpha1.PodGroupList{} err = c.client.Get(). Namespace(c.ns). Resource("podgroups"). VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). Do(). Into(result) return @@ -88,11 +95,16 @@ func (c *podGroups) List(opts v1.ListOptions) (result *v1alpha1.PodGroupList, er // Watch returns a watch.Interface that watches the requested podGroups. func (c *podGroups) Watch(opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } opts.Watch = true return c.client.Get(). Namespace(c.ns). Resource("podgroups"). VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). Watch() } @@ -150,10 +162,15 @@ func (c *podGroups) Delete(name string, options *v1.DeleteOptions) error { // DeleteCollection deletes a collection of objects. func (c *podGroups) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + var timeout time.Duration + if listOptions.TimeoutSeconds != nil { + timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second + } return c.client.Delete(). Namespace(c.ns). Resource("podgroups"). VersionedParams(&listOptions, scheme.ParameterCodec). + Timeout(timeout). Body(options). Do(). Error() diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/queue.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/queue.go index 77b1b40aeb..9956f34bf0 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/queue.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/typed/scheduling/v1alpha1/queue.go @@ -19,6 +19,8 @@ limitations under the License. package v1alpha1 import ( + "time" + v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" scheme "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +39,7 @@ type QueuesGetter interface { type QueueInterface interface { Create(*v1alpha1.Queue) (*v1alpha1.Queue, error) Update(*v1alpha1.Queue) (*v1alpha1.Queue, error) + UpdateStatus(*v1alpha1.Queue) (*v1alpha1.Queue, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1alpha1.Queue, error) @@ -72,10 +75,15 @@ func (c *queues) Get(name string, options v1.GetOptions) (result *v1alpha1.Queue // List takes label and field selectors, and returns the list of Queues that match those selectors. func (c *queues) List(opts v1.ListOptions) (result *v1alpha1.QueueList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } result = &v1alpha1.QueueList{} err = c.client.Get(). Resource("queues"). VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). Do(). Into(result) return @@ -83,10 +91,15 @@ func (c *queues) List(opts v1.ListOptions) (result *v1alpha1.QueueList, err erro // Watch returns a watch.Interface that watches the requested queues. func (c *queues) Watch(opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } opts.Watch = true return c.client.Get(). Resource("queues"). VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). Watch() } @@ -113,6 +126,21 @@ func (c *queues) Update(queue *v1alpha1.Queue) (result *v1alpha1.Queue, err erro return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *queues) UpdateStatus(queue *v1alpha1.Queue) (result *v1alpha1.Queue, err error) { + result = &v1alpha1.Queue{} + err = c.client.Put(). + Resource("queues"). + Name(queue.Name). + SubResource("status"). + Body(queue). + Do(). + Into(result) + return +} + // Delete takes name of the queue and deletes it. Returns an error if one occurs. func (c *queues) Delete(name string, options *v1.DeleteOptions) error { return c.client.Delete(). @@ -125,9 +153,14 @@ func (c *queues) Delete(name string, options *v1.DeleteOptions) error { // DeleteCollection deletes a collection of objects. func (c *queues) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + var timeout time.Duration + if listOptions.TimeoutSeconds != nil { + timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second + } return c.client.Delete(). Resource("queues"). VersionedParams(&listOptions, scheme.ParameterCodec). + Timeout(timeout). Body(options). Do(). Error() diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go index 85b532cafc..35707ad013 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go @@ -27,6 +27,7 @@ import ( cache "k8s.io/client-go/tools/cache" ) +// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer. type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer // SharedInformerFactory a small interface to allow for adding an informer without an import cycle @@ -35,4 +36,5 @@ type SharedInformerFactory interface { InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer } +// TweakListOptionsFunc is a function that transforms a v1.ListOptions. type TweakListOptionsFunc func(*v1.ListOptions) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/podgroup.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/podgroup.go index 5b967af45b..c0c14f5d83 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/podgroup.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/podgroup.go @@ -21,7 +21,7 @@ package v1alpha1 import ( time "time" - scheduling_v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + schedulingv1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" versioned "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" internalinterfaces "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces" v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" @@ -70,7 +70,7 @@ func NewFilteredPodGroupInformer(client versioned.Interface, namespace string, r return client.SchedulingV1alpha1().PodGroups(namespace).Watch(options) }, }, - &scheduling_v1alpha1.PodGroup{}, + &schedulingv1alpha1.PodGroup{}, resyncPeriod, indexers, ) @@ -81,7 +81,7 @@ func (f *podGroupInformer) defaultInformer(client versioned.Interface, resyncPer } func (f *podGroupInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&scheduling_v1alpha1.PodGroup{}, f.defaultInformer) + return f.factory.InformerFor(&schedulingv1alpha1.PodGroup{}, f.defaultInformer) } func (f *podGroupInformer) Lister() v1alpha1.PodGroupLister { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/queue.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/queue.go index d23433f52a..fb1d656484 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/queue.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1/queue.go @@ -21,7 +21,7 @@ package v1alpha1 import ( time "time" - scheduling_v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + schedulingv1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" versioned "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" internalinterfaces "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/internalinterfaces" v1alpha1 "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" @@ -69,7 +69,7 @@ func NewFilteredQueueInformer(client versioned.Interface, resyncPeriod time.Dura return client.SchedulingV1alpha1().Queues().Watch(options) }, }, - &scheduling_v1alpha1.Queue{}, + &schedulingv1alpha1.Queue{}, resyncPeriod, indexers, ) @@ -80,7 +80,7 @@ func (f *queueInformer) defaultInformer(client versioned.Interface, resyncPeriod } func (f *queueInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&scheduling_v1alpha1.Queue{}, f.defaultInformer) + return f.factory.InformerFor(&schedulingv1alpha1.Queue{}, f.defaultInformer) } func (f *queueInformer) Lister() v1alpha1.QueueLister { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go index 912d20bb72..df28d6aaca 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate/allocate.go @@ -17,6 +17,8 @@ limitations under the License. package allocate import ( + "fmt" + "github.com/golang/glog" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" @@ -66,6 +68,24 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { pendingTasks := map[api.JobID]*util.PriorityQueue{} + allNodes := util.GetNodeList(ssn.Nodes) + + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + // Check for Resource Predicate + // TODO: We could not allocate resource to task from both node.Idle and node.Releasing now, + // after it is done, we could change the following compare to: + // clonedNode := node.Idle.Clone() + // if !task.InitResreq.LessEqual(clonedNode.Add(node.Releasing)) { + // ... + // } + if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) { + return fmt.Errorf("task <%s/%s> ResourceFit failed on node <%s>", + task.Namespace, task.Name, node.Name) + } + + return ssn.PredicateFn(task, node) + } + for { if queues.Empty() { break @@ -107,11 +127,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { tasks.Len(), job.Namespace, job.Name) for !tasks.Empty() { - predicateNodes := []*api.NodeInfo{} - nodeScores := map[int][]*api.NodeInfo{} - task := tasks.Pop().(*api.TaskInfo) - assigned := false glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name) @@ -123,47 +139,29 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { if len(job.NodesFitDelta) > 0 { job.NodesFitDelta = make(api.NodeResourceMap) } - for _, node := range ssn.Nodes { - glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", - task.Namespace, task.Name, node.Name, task.Resreq, node.Idle) - - // TODO (k82cn): Enable eCache for performance improvement. - if err := ssn.PredicateFn(task, node); err != nil { - glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) - continue - } else { - predicateNodes = append(predicateNodes, node) - } - } - for _, node := range predicateNodes { - score, err := ssn.NodeOrderFn(task, node) - if err != nil { - glog.V(3).Infof("Error in Calculating Priority for the node:%v", err) - } else { - nodeScores[score] = append(nodeScores[score], node) - } + + predicateNodes := util.PredicateNodes(task, allNodes, predicateFn) + if len(predicateNodes) == 0 { + break } - selectedNodes := util.SelectBestNode(nodeScores) - for _, node := range selectedNodes { - // Allocate idle resource to the task. - if task.InitResreq.LessEqual(node.Idle) { - glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", - task.Namespace, task.Name, node.Name) - if err := ssn.Allocate(task, node.Name); err != nil { - glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", - task.UID, node.Name, ssn.UID, err) - continue - } - assigned = true - break - } else { - //store information about missing resources - job.NodesFitDelta[node.Name] = node.Idle.Clone() - job.NodesFitDelta[node.Name].FitDelta(task.Resreq) - glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources", - task.Namespace, task.Name, node.Name) + + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderFn) + + node := util.SelectBestNode(nodeScores) + // Allocate idle resource to the task. + if task.InitResreq.LessEqual(node.Idle) { + glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", + task.Namespace, task.Name, node.Name) + if err := ssn.Allocate(task, node.Name); err != nil { + glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", + task.UID, node.Name, ssn.UID, err) } + } else { + //store information about missing resources + job.NodesFitDelta[node.Name] = node.Idle.Clone() + job.NodesFitDelta[node.Name].FitDelta(task.InitResreq) + glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources", + task.Namespace, task.Name, node.Name) // Allocate releasing resource to the task if any. if task.InitResreq.LessEqual(node.Releasing) { @@ -172,18 +170,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { if err := ssn.Pipeline(task, node.Name); err != nil { glog.Errorf("Failed to pipeline Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) - continue } - - assigned = true - break } } - if !assigned { - break - } - if ssn.JobReady(job) { jobs.Push(job) break diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go index 57e0dd4173..f76571dcd4 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt/preempt.go @@ -114,15 +114,15 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { assigned = true } - // If job not ready, keep preempting - if ssn.JobReady(preemptorJob) { + // If job is not pipelined, keep preempting + if ssn.JobPipelined(preemptorJob) { stmt.Commit() break } } - // If job not ready after try all tasks, next job. - if !ssn.JobReady(preemptorJob) { + // If job is not pipelined after try all tasks, next job. + if !ssn.JobPipelined(preemptorJob) { stmt.Discard() continue } @@ -175,28 +175,15 @@ func preempt( nodes map[string]*api.NodeInfo, filter func(*api.TaskInfo) bool, ) (bool, error) { - predicateNodes := []*api.NodeInfo{} - nodeScores := map[int][]*api.NodeInfo{} assigned := false - for _, node := range nodes { - if err := ssn.PredicateFn(preemptor, node); err != nil { - glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", - preemptor.Namespace, preemptor.Name, node.Name, err) - continue - } else { - predicateNodes = append(predicateNodes, node) - } - } - for _, node := range predicateNodes { - score, err := ssn.NodeOrderFn(preemptor, node) - if err != nil { - glog.V(3).Infof("Error in Calculating Priority for the node:%v", err) - } else { - nodeScores[score] = append(nodeScores[score], node) - } - } - selectedNodes := util.SelectBestNode(nodeScores) + allNodes := util.GetNodeList(nodes) + + predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) + + nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderFn) + + selectedNodes := util.SortNodes(nodeScores) for _, node := range selectedNodes { glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", preemptor.Namespace, preemptor.Name, node.Name) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/cluster_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/cluster_info.go index 8264f7a7cc..40f9b9f6bc 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/cluster_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/cluster_info.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package api import "fmt" diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers.go index 194c208a56..e5f0804680 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers.go @@ -19,17 +19,17 @@ package api import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" clientcache "k8s.io/client-go/tools/cache" ) // PodKey returns the string key of a pod. func PodKey(pod *v1.Pod) TaskID { - if key, err := clientcache.MetaNamespaceKeyFunc(pod); err != nil { + key, err := clientcache.MetaNamespaceKeyFunc(pod) + if err != nil { return TaskID(fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)) - } else { - return TaskID(key) } + return TaskID(key) } func getTaskStatus(pod *v1.Pod) TaskStatus { @@ -60,6 +60,7 @@ func getTaskStatus(pod *v1.Pod) TaskStatus { return Unknown } +// AllocatedStatus checks whether the tasks has AllocatedStatus func AllocatedStatus(status TaskStatus) bool { switch status { case Bound, Binding, Running, Allocated: @@ -69,6 +70,7 @@ func AllocatedStatus(status TaskStatus) bool { } } +// MergeErrors is used to merge multiple errors into single error func MergeErrors(errs ...error) error { msg := "errors: " diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers/helpers.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers/helpers.go index 680a5f8efd..b11fbe641e 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers/helpers.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/helpers/helpers.go @@ -19,19 +19,31 @@ package helpers import ( "math" + v1 "k8s.io/api/core/v1" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +// Min is used to find the min of two resource types func Min(l, r *api.Resource) *api.Resource { res := &api.Resource{} res.MilliCPU = math.Min(l.MilliCPU, r.MilliCPU) - res.MilliGPU = math.Min(l.MilliGPU, r.MilliGPU) res.Memory = math.Min(l.Memory, r.Memory) + if l.ScalarResources == nil || r.ScalarResources == nil { + return res + } + + res.ScalarResources = map[v1.ResourceName]float64{} + for lName, lQuant := range l.ScalarResources { + res.ScalarResources[lName] = math.Min(lQuant, r.ScalarResources[lName]) + } + return res } +// Share is used to determine the share func Share(l, r float64) float64 { var share float64 if r == 0 { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go index 4ae0134e9e..2a9a6bfcf5 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/job_info.go @@ -21,7 +21,7 @@ import ( "sort" "strings" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -29,8 +29,10 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" ) +// TaskID is UID type for Task type TaskID types.UID +// TaskInfo will have all infos about the task type TaskInfo struct { UID TaskID Job JobID @@ -63,6 +65,7 @@ func getJobID(pod *v1.Pod) JobID { return "" } +// NewTaskInfo creates new taskInfo object for a Pod func NewTaskInfo(pod *v1.Pod) *TaskInfo { req := GetPodResourceWithoutInitContainers(pod) initResreq := GetPodResourceRequest(pod) @@ -89,6 +92,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo { return ti } +// Clone is used for cloning a task func (ti *TaskInfo) Clone() *TaskInfo { return &TaskInfo{ UID: ti.UID, @@ -105,6 +109,7 @@ func (ti *TaskInfo) Clone() *TaskInfo { } } +// String returns the taskInfo details in a string func (ti TaskInfo) String() string { return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v", ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq) @@ -115,8 +120,10 @@ type JobID types.UID type tasksMap map[TaskID]*TaskInfo +// NodeResourceMap stores resource in a node type NodeResourceMap map[string]*Resource +// JobInfo will have all info of a Job type JobInfo struct { UID JobID @@ -146,6 +153,7 @@ type JobInfo struct { PDB *policyv1.PodDisruptionBudget } +// NewJobInfo creates a new jobInfo for set of tasks func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo { job := &JobInfo{ UID: uid, @@ -167,10 +175,12 @@ func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo { return job } +// UnsetPodGroup removes podGroup details from a job func (ji *JobInfo) UnsetPodGroup() { ji.PodGroup = nil } +// SetPodGroup sets podGroup details to a job func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) { ji.Name = pg.Name ji.Namespace = pg.Namespace @@ -181,6 +191,7 @@ func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) { ji.PodGroup = pg } +// SetPDB sets PDB to a job func (ji *JobInfo) SetPDB(pdb *policyv1.PodDisruptionBudget) { ji.Name = pdb.Name ji.MinAvailable = pdb.Spec.MinAvailable.IntVal @@ -190,10 +201,12 @@ func (ji *JobInfo) SetPDB(pdb *policyv1.PodDisruptionBudget) { ji.PDB = pdb } +// UnsetPDB removes PDB info of a job func (ji *JobInfo) UnsetPDB() { ji.PDB = nil } +// GetTasks gets all tasks with the taskStatus func (ji *JobInfo) GetTasks(statuses ...TaskStatus) []*TaskInfo { var res []*TaskInfo @@ -216,6 +229,7 @@ func (ji *JobInfo) addTaskIndex(ti *TaskInfo) { ji.TaskStatusIndex[ti.Status][ti.UID] = ti } +// AddTaskInfo is used to add a task to a job func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) { ji.Tasks[ti.UID] = ti ji.addTaskIndex(ti) @@ -227,6 +241,7 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) { } } +// UpdateTaskStatus is used to update task's status in a job func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error { if err := validateStatusUpdate(task.Status, status); err != nil { return err @@ -252,6 +267,7 @@ func (ji *JobInfo) deleteTaskIndex(ti *TaskInfo) { } } +// DeleteTaskInfo is used to delete a task from a job func (ji *JobInfo) DeleteTaskInfo(ti *TaskInfo) error { if task, found := ji.Tasks[ti.UID]; found { ji.TotalRequest.Sub(task.Resreq) @@ -270,6 +286,7 @@ func (ji *JobInfo) DeleteTaskInfo(ti *TaskInfo) error { ti.Namespace, ti.Name, ji.Namespace, ji.Name) } +// Clone is used to clone a jobInfo object func (ji *JobInfo) Clone() *JobInfo { info := &JobInfo{ UID: ji.UID, @@ -304,6 +321,7 @@ func (ji *JobInfo) Clone() *JobInfo { return info } +// String returns a jobInfo object in string format func (ji JobInfo) String() string { res := "" @@ -317,7 +335,7 @@ func (ji JobInfo) String() string { ji.UID, ji.Namespace, ji.Queue, ji.Name, ji.MinAvailable, ji.PodGroup) + res } -// Error returns detailed information on why a job's task failed to fit on +// FitError returns detailed information on why a job's task failed to fit on // each available node func (ji *JobInfo) FitError() string { if len(ji.NodesFitDelta) == 0 { @@ -333,8 +351,11 @@ func (ji *JobInfo) FitError() string { if v.Get(v1.ResourceMemory) < 0 { reasons["memory"]++ } - if v.Get(GPUResourceName) < 0 { - reasons["GPU"]++ + + for rName, rQuant := range v.ScalarResources { + if rQuant < 0 { + reasons[string(rName)]++ + } } } @@ -349,3 +370,57 @@ func (ji *JobInfo) FitError() string { reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(ji.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", ")) return reasonMsg } + +// ReadyTaskNum returns the number of tasks that are ready. +func (ji *JobInfo) ReadyTaskNum() int32 { + occupid := 0 + for status, tasks := range ji.TaskStatusIndex { + if AllocatedStatus(status) || + status == Succeeded { + occupid = occupid + len(tasks) + } + } + + return int32(occupid) +} + +// WaitingTaskNum returns the number of tasks that are pipelined. +func (ji *JobInfo) WaitingTaskNum() int32 { + occupid := 0 + for status, tasks := range ji.TaskStatusIndex { + if status == Pipelined { + occupid = occupid + len(tasks) + } + } + + return int32(occupid) +} + +// ValidTaskNum returns the number of tasks that are valid. +func (ji *JobInfo) ValidTaskNum() int32 { + occupied := 0 + for status, tasks := range ji.TaskStatusIndex { + if AllocatedStatus(status) || + status == Succeeded || + status == Pipelined || + status == Pending { + occupied = occupied + len(tasks) + } + } + + return int32(occupied) +} + +// Ready returns whether job is ready for run +func (ji *JobInfo) Ready() bool { + occupied := ji.ReadyTaskNum() + + return occupied >= ji.MinAvailable +} + +// Pipelined returns whether the number of ready and pipelined task is enough +func (ji *JobInfo) Pipelined() bool { + occupied := ji.WaitingTaskNum() + ji.ReadyTaskNum() + + return occupied >= ji.MinAvailable +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go index 6565bde8ae..0e94a51f8f 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/node_info.go @@ -19,7 +19,7 @@ package api import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) // NodeInfo is node level aggregated information. @@ -41,6 +41,7 @@ type NodeInfo struct { Tasks map[TaskID]*TaskInfo } +// NewNodeInfo is used to create new nodeInfo object func NewNodeInfo(node *v1.Node) *NodeInfo { if node == nil { return &NodeInfo{ @@ -70,6 +71,7 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { } } +// Clone used to clone nodeInfo Object func (ni *NodeInfo) Clone() *NodeInfo { res := NewNodeInfo(ni.Node) @@ -80,6 +82,7 @@ func (ni *NodeInfo) Clone() *NodeInfo { return res } +// SetNode sets kubernetes node object to nodeInfo object func (ni *NodeInfo) SetNode(node *v1.Node) { ni.Name = node.Name ni.Node = node @@ -98,6 +101,7 @@ func (ni *NodeInfo) SetNode(node *v1.Node) { } } +// AddTask is used to add a task in nodeInfo object func (ni *NodeInfo) AddTask(task *TaskInfo) error { key := PodKey(task.Pod) if _, found := ni.Tasks[key]; found { @@ -128,6 +132,7 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error { return nil } +// RemoveTask used to remove a task from nodeInfo object func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { key := PodKey(ti.Pod) @@ -156,6 +161,7 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { return nil } +// UpdateTask is used to update a task in nodeInfo object func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { if err := ni.RemoveTask(ti); err != nil { return err @@ -164,6 +170,7 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { return ni.AddTask(ti) } +// String returns nodeInfo details in string format func (ni NodeInfo) String() string { res := "" @@ -178,6 +185,7 @@ func (ni NodeInfo) String() string { } +// Pods returns all pods running in that node func (ni *NodeInfo) Pods() (pods []*v1.Pod) { for _, t := range ni.Tasks { pods = append(pods, t.Pod) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/pod_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/pod_info.go index bae26a751f..56a94034bb 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/pod_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/pod_info.go @@ -17,7 +17,7 @@ limitations under the License. package api import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) // Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest. @@ -48,6 +48,8 @@ import ( // Memory: 1G // // Result: CPU: 3, Memory: 3G + +// GetPodResourceRequest returns all the resource required for that pod func GetPodResourceRequest(pod *v1.Pod) *Resource { result := GetPodResourceWithoutInitContainers(pod) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/queue_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/queue_info.go index 869a84f204..e0014c30a3 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/queue_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/queue_info.go @@ -22,8 +22,10 @@ import ( arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" ) +// QueueID is UID type, serves as unique ID for each queue type QueueID types.UID +// QueueInfo will have all details about queue type QueueInfo struct { UID QueueID Name string @@ -33,6 +35,7 @@ type QueueInfo struct { Queue *arbcorev1.Queue } +// NewQueueInfo creates new queueInfo object func NewQueueInfo(queue *arbcorev1.Queue) *QueueInfo { return &QueueInfo{ UID: QueueID(queue.Name), @@ -44,6 +47,7 @@ func NewQueueInfo(queue *arbcorev1.Queue) *QueueInfo { } } +// Clone is used to clone queueInfo object func (q *QueueInfo) Clone() *QueueInfo { return &QueueInfo{ UID: q.UID, diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go index 2941848388..6c3caab742 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/resource_info.go @@ -20,41 +20,56 @@ import ( "fmt" "math" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" ) +// Resource struct defines all the resource type type Resource struct { MilliCPU float64 Memory float64 - MilliGPU float64 + + // ScalarResources + ScalarResources map[v1.ResourceName]float64 + // MaxTaskNum is only used by predicates; it should NOT // be accounted in other operators, e.g. Add. MaxTaskNum int } const ( - // need to follow https://github.com/NVIDIA/k8s-device-plugin/blob/66a35b71ac4b5cbfb04714678b548bd77e5ba719/server.go#L20 + // GPUResourceName need to follow https://github.com/NVIDIA/k8s-device-plugin/blob/66a35b71ac4b5cbfb04714678b548bd77e5ba719/server.go#L20 GPUResourceName = "nvidia.com/gpu" ) +// EmptyResource creates a empty resource object and returns func EmptyResource() *Resource { return &Resource{} } +// Clone is used to clone a resource type func (r *Resource) Clone() *Resource { clone := &Resource{ MilliCPU: r.MilliCPU, Memory: r.Memory, - MilliGPU: r.MilliGPU, MaxTaskNum: r.MaxTaskNum, } + + if r.ScalarResources != nil { + clone.ScalarResources = make(map[v1.ResourceName]float64) + for k, v := range r.ScalarResources { + clone.ScalarResources[k] = v + } + } + return clone } var minMilliCPU float64 = 10 -var minMilliGPU float64 = 10 +var minMilliScalarResources float64 = 10 var minMemory float64 = 10 * 1024 * 1024 +// NewResource create a new resource object from resource list func NewResource(rl v1.ResourceList) *Resource { r := EmptyResource() for rName, rQuant := range rl { @@ -65,34 +80,62 @@ func NewResource(rl v1.ResourceList) *Resource { r.Memory += float64(rQuant.Value()) case v1.ResourcePods: r.MaxTaskNum += int(rQuant.Value()) - case GPUResourceName: - r.MilliGPU += float64(rQuant.MilliValue()) + default: + if v1helper.IsScalarResourceName(rName) { + r.AddScalar(rName, float64(rQuant.MilliValue())) + } } } return r } +// IsEmpty returns bool after checking any of resource is less than min possible value func (r *Resource) IsEmpty() bool { - return r.MilliCPU < minMilliCPU && r.Memory < minMemory && r.MilliGPU < minMilliGPU + if !(r.MilliCPU < minMilliCPU && r.Memory < minMemory) { + return false + } + + for _, rQuant := range r.ScalarResources { + if rQuant >= minMilliScalarResources { + return false + } + } + + return true } +// IsZero checks whether that resource is less than min possible value func (r *Resource) IsZero(rn v1.ResourceName) bool { switch rn { case v1.ResourceCPU: return r.MilliCPU < minMilliCPU case v1.ResourceMemory: return r.Memory < minMemory - case GPUResourceName: - return r.MilliGPU < minMilliGPU default: - panic("unknown resource") + if r.ScalarResources == nil { + return true + } + + if _, ok := r.ScalarResources[rn]; !ok { + panic("unknown resource") + } + + return r.ScalarResources[rn] < minMilliScalarResources } } +// Add is used to add the two resources func (r *Resource) Add(rr *Resource) *Resource { r.MilliCPU += rr.MilliCPU r.Memory += rr.Memory - r.MilliGPU += rr.MilliGPU + + for rName, rQuant := range rr.ScalarResources { + if r.ScalarResources == nil { + r.ScalarResources = map[v1.ResourceName]float64{} + } + r.ScalarResources[rName] += rQuant + } + return r } @@ -101,7 +144,14 @@ func (r *Resource) Sub(rr *Resource) *Resource { if rr.LessEqual(r) { r.MilliCPU -= rr.MilliCPU r.Memory -= rr.Memory - r.MilliGPU -= rr.MilliGPU + + for rrName, rrQuant := range rr.ScalarResources { + if r.ScalarResources == nil { + return r + } + r.ScalarResources[rrName] -= rrQuant + } + return r } @@ -121,12 +171,23 @@ func (r *Resource) SetMaxResource(rr *Resource) { if rr.Memory > r.Memory { r.Memory = rr.Memory } - if rr.MilliGPU > r.MilliGPU { - r.MilliGPU = rr.MilliGPU + + for rrName, rrQuant := range rr.ScalarResources { + if r.ScalarResources == nil { + r.ScalarResources = make(map[v1.ResourceName]float64) + for k, v := range rr.ScalarResources { + r.ScalarResources[k] = v + } + return + } + + if rrQuant > r.ScalarResources[rrName] { + r.ScalarResources[rrName] = rrQuant + } } } -//Computes the delta between a resource oject representing available +//FitDelta Computes the delta between a resource oject representing available //resources an operand representing resources being requested. Any //field that is less than 0 after the operation represents an //insufficient resource. @@ -139,47 +200,127 @@ func (r *Resource) FitDelta(rr *Resource) *Resource { r.Memory -= rr.Memory + minMemory } - if rr.MilliGPU > 0 { - r.MilliGPU -= rr.MilliGPU + minMilliGPU + for rrName, rrQuant := range rr.ScalarResources { + if r.ScalarResources == nil { + r.ScalarResources = map[v1.ResourceName]float64{} + } + + if rrQuant > 0 { + r.ScalarResources[rrName] -= rrQuant + minMilliScalarResources + } } + return r } +// Multi multiples the resource with ratio provided func (r *Resource) Multi(ratio float64) *Resource { r.MilliCPU = r.MilliCPU * ratio r.Memory = r.Memory * ratio - r.MilliGPU = r.MilliGPU * ratio + for rName, rQuant := range r.ScalarResources { + r.ScalarResources[rName] = rQuant * ratio + } return r } +// Less checks whether a resource is less than other func (r *Resource) Less(rr *Resource) bool { - return r.MilliCPU < rr.MilliCPU && r.Memory < rr.Memory && r.MilliGPU < rr.MilliGPU + if !(r.MilliCPU < rr.MilliCPU && r.Memory < rr.Memory) { + return false + } + + if r.ScalarResources == nil { + if rr.ScalarResources == nil { + return false + } + return true + } + + for rName, rQuant := range r.ScalarResources { + if rr.ScalarResources == nil { + return false + } + + rrQuant := rr.ScalarResources[rName] + if rQuant >= rrQuant { + return false + } + } + + return true } +// LessEqual checks whether a resource is less than other resource func (r *Resource) LessEqual(rr *Resource) bool { - return (r.MilliCPU < rr.MilliCPU || math.Abs(rr.MilliCPU-r.MilliCPU) < minMilliCPU) && - (r.Memory < rr.Memory || math.Abs(rr.Memory-r.Memory) < minMemory) && - (r.MilliGPU < rr.MilliGPU || math.Abs(rr.MilliGPU-r.MilliGPU) < minMilliGPU) + isLess := (r.MilliCPU < rr.MilliCPU || math.Abs(rr.MilliCPU-r.MilliCPU) < minMilliCPU) && + (r.Memory < rr.Memory || math.Abs(rr.Memory-r.Memory) < minMemory) + if !isLess { + return false + } + + if r.ScalarResources == nil { + return true + } + + for rName, rQuant := range r.ScalarResources { + if rr.ScalarResources == nil { + return false + } + + rrQuant := rr.ScalarResources[rName] + if !(rQuant < rrQuant || math.Abs(rrQuant-rQuant) < minMilliScalarResources) { + return false + } + } + + return true } +// String returns resource details in string format func (r *Resource) String() string { - return fmt.Sprintf("cpu %0.2f, memory %0.2f, GPU %0.2f", - r.MilliCPU, r.Memory, r.MilliGPU) + str := fmt.Sprintf("cpu %0.2f, memory %0.2f", r.MilliCPU, r.Memory) + for rName, rQuant := range r.ScalarResources { + str = fmt.Sprintf("%s, %s %0.2f", str, rName, rQuant) + } + return str } +// Get returns the resource value for that particular resource type func (r *Resource) Get(rn v1.ResourceName) float64 { switch rn { case v1.ResourceCPU: return r.MilliCPU case v1.ResourceMemory: return r.Memory - case GPUResourceName: - return r.MilliGPU default: - panic("not support resource.") + if r.ScalarResources == nil { + return 0 + } + return r.ScalarResources[rn] } } -func ResourceNames() []v1.ResourceName { - return []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, GPUResourceName} +// ResourceNames returns all resource types +func (r *Resource) ResourceNames() []v1.ResourceName { + resNames := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory} + + for rName := range r.ScalarResources { + resNames = append(resNames, rName) + } + + return resNames +} + +// AddScalar adds a resource by a scalar value of this resource. +func (r *Resource) AddScalar(name v1.ResourceName, quantity float64) { + r.SetScalar(name, r.ScalarResources[name]+quantity) +} + +// SetScalar sets a resource by a scalar value of this resource. +func (r *Resource) SetScalar(name v1.ResourceName, quantity float64) { + // Lazily allocate scalar resource map. + if r.ScalarResources == nil { + r.ScalarResources = map[v1.ResourceName]float64{} + } + r.ScalarResources[name] = quantity } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/test_utils.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/test_utils.go index b2aa7d59c6..389b3caf0d 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/test_utils.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/test_utils.go @@ -20,7 +20,7 @@ import ( "fmt" "reflect" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go index 9fbd58c6c0..4d9b67f586 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api/types.go @@ -88,13 +88,14 @@ type CompareFn func(interface{}, interface{}) int // ValidateFn is the func declaration used to check object's status. type ValidateFn func(interface{}) bool -// +// ValidateResult is struct to which can used to determine the result type ValidateResult struct { Pass bool Reason string Message string } +// ValidateExFn is the func declaration used to validate the result type ValidateExFn func(interface{}) *ValidateResult // PredicateFn is the func declaration used to predicate node for task. @@ -104,4 +105,4 @@ type PredicateFn func(*TaskInfo, *NodeInfo) error type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo // NodeOrderFn is the func declaration used to get priority score for a node for a particular task. -type NodeOrderFn func(*TaskInfo, *NodeInfo) (int, error) +type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go index 442e4dfb54..13041c4c64 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/cache.go @@ -24,7 +24,7 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/scheduling/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -68,6 +68,7 @@ func New(config *rest.Config, schedulerName string, defaultQueue string) Cache { return newSchedulerCache(config, schedulerName, defaultQueue) } +//SchedulerCache cache for the kube batch type SchedulerCache struct { sync.Mutex @@ -75,7 +76,7 @@ type SchedulerCache struct { kbclient *kbver.Clientset defaultQueue string - // schedulerName is the name for kube-batch scheduler + // schedulerName is the name for kube batch scheduler schedulerName string podInformer infov1.PodInformer @@ -111,6 +112,7 @@ type defaultBinder struct { kubeclient *kubernetes.Clientset } +//Bind will send bind request to api server func (db *defaultBinder) Bind(p *v1.Pod, hostname string) error { if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, @@ -129,6 +131,7 @@ type defaultEvictor struct { kubeclient *kubernetes.Clientset } +//Evict will send delete pod request to api server func (de *defaultEvictor) Evict(p *v1.Pod) error { glog.V(3).Infof("Evicting pod %v/%v", p.Namespace, p.Name) @@ -145,7 +148,7 @@ type defaultStatusUpdater struct { kbclient *kbver.Clientset } -// Update pod with podCondition +// UpdatePodCondition will Update pod with podCondition func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) { glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) if podutil.UpdatePodCondition(&pod.Status, condition) { @@ -154,7 +157,7 @@ func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.Po return pod, nil } -// Update pod with podCondition +// UpdatePodGroup will Update pod with podCondition func (su *defaultStatusUpdater) UpdatePodGroup(pg *v1alpha1.PodGroup) (*v1alpha1.PodGroup, error) { return su.kbclient.SchedulingV1alpha1().PodGroups(pg.Namespace).Update(pg) } @@ -163,7 +166,7 @@ type defaultVolumeBinder struct { volumeBinder *volumebinder.VolumeBinder } -// AllocateVolume allocates volume on the host to the task +// AllocateVolumes allocates volume on the host to the task func (dvb *defaultVolumeBinder) AllocateVolumes(task *api.TaskInfo, hostname string) error { allBound, err := dvb.volumeBinder.Binder.AssumePodVolumes(task.Pod, hostname) task.VolumeReady = allBound @@ -171,7 +174,7 @@ func (dvb *defaultVolumeBinder) AllocateVolumes(task *api.TaskInfo, hostname str return err } -// BindVolume binds volumes to the task +// BindVolumes binds volumes to the task func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error { // If task's volumes are ready, did not bind them again. if task.VolumeReady { @@ -296,6 +299,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s return sc } +// Run starts the schedulerCache func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.pdbInformer.Informer().Run(stopCh) go sc.podInformer.Informer().Run(stopCh) @@ -317,6 +321,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go wait.Until(sc.processCleanupJob, 0, stopCh) } +// WaitForCacheSync sync the cache with the api server func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { return cache.WaitForCacheSync(stopCh, @@ -339,6 +344,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { ) } +// findJobAndTask returns job and the task info func (sc *SchedulerCache) findJobAndTask(taskInfo *kbapi.TaskInfo) (*kbapi.JobInfo, *kbapi.TaskInfo, error) { job, found := sc.Jobs[taskInfo.Job] if !found { @@ -355,6 +361,7 @@ func (sc *SchedulerCache) findJobAndTask(taskInfo *kbapi.TaskInfo) (*kbapi.JobIn return job, task, nil } +// Evict will evict the pod func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error { sc.Mutex.Lock() defer sc.Mutex.Unlock() @@ -440,12 +447,12 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error return nil } -// AllocateVolume allocates volume on the host to the task +// AllocateVolumes allocates volume on the host to the task func (sc *SchedulerCache) AllocateVolumes(task *api.TaskInfo, hostname string) error { return sc.VolumeBinder.AllocateVolumes(task, hostname) } -// BindVolume binds volumes to the task +// BindVolumes binds volumes to the task func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error { return sc.VolumeBinder.BindVolumes(task) } @@ -482,6 +489,8 @@ func (sc *SchedulerCache) processCleanupJob() { return } + defer sc.deletedJobs.Done(obj) + job, found := obj.(*kbapi.JobInfo) if !found { glog.Errorf("Failed to convert <%v> to *JobInfo", obj) @@ -509,6 +518,9 @@ func (sc *SchedulerCache) processResyncTask() { if shutdown { return } + + defer sc.errTasks.Done(obj) + task, ok := obj.(*kbapi.TaskInfo) if !ok { glog.Errorf("failed to convert %v to *v1.Pod", obj) @@ -521,6 +533,7 @@ func (sc *SchedulerCache) processResyncTask() { } } +// Snapshot returns the complete snapshot of the cluster from cache func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { sc.Mutex.Lock() defer sc.Mutex.Unlock() @@ -575,6 +588,7 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { return snapshot } +// String returns information about the cache in a string format func (sc *SchedulerCache) String() string { sc.Mutex.Lock() defer sc.Mutex.Unlock() diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go index 7522be9b32..4260521fdc 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/event_handlers.go @@ -22,7 +22,7 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1beta1" "k8s.io/api/scheduling/v1beta1" "k8s.io/apimachinery/pkg/api/errors" @@ -179,6 +179,7 @@ func (sc *SchedulerCache) deletePod(pod *v1.Pod) error { return nil } +// AddPod add pod to scheduler cache func (sc *SchedulerCache) AddPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { @@ -199,6 +200,7 @@ func (sc *SchedulerCache) AddPod(obj interface{}) { return } +// UpdatePod update pod to scheduler cache func (sc *SchedulerCache) UpdatePod(oldObj, newObj interface{}) { oldPod, ok := oldObj.(*v1.Pod) if !ok { @@ -225,6 +227,7 @@ func (sc *SchedulerCache) UpdatePod(oldObj, newObj interface{}) { return } +// DeletePod delete pod from scheduler cache func (sc *SchedulerCache) DeletePod(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { @@ -294,6 +297,7 @@ func (sc *SchedulerCache) deleteNode(node *v1.Node) error { return nil } +// AddNode add node to scheduler cache func (sc *SchedulerCache) AddNode(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { @@ -312,6 +316,7 @@ func (sc *SchedulerCache) AddNode(obj interface{}) { return } +// UpdateNode update node to scheduler cache func (sc *SchedulerCache) UpdateNode(oldObj, newObj interface{}) { oldNode, ok := oldObj.(*v1.Node) if !ok { @@ -335,6 +340,7 @@ func (sc *SchedulerCache) UpdateNode(oldObj, newObj interface{}) { return } +// DeleteNode delete node from scheduler cache func (sc *SchedulerCache) DeleteNode(obj interface{}) { var node *v1.Node switch t := obj.(type) { @@ -411,6 +417,7 @@ func (sc *SchedulerCache) deletePodGroup(ss *kbv1.PodGroup) error { return nil } +// AddPodGroup add podgroup to scheduler cache func (sc *SchedulerCache) AddPodGroup(obj interface{}) { ss, ok := obj.(*kbv1.PodGroup) if !ok { @@ -430,6 +437,7 @@ func (sc *SchedulerCache) AddPodGroup(obj interface{}) { return } +// UpdatePodGroup add podgroup to scheduler cache func (sc *SchedulerCache) UpdatePodGroup(oldObj, newObj interface{}) { oldSS, ok := oldObj.(*kbv1.PodGroup) if !ok { @@ -453,6 +461,7 @@ func (sc *SchedulerCache) UpdatePodGroup(oldObj, newObj interface{}) { return } +// DeletePodGroup delete podgroup from scheduler cache func (sc *SchedulerCache) DeletePodGroup(obj interface{}) { var ss *kbv1.PodGroup switch t := obj.(type) { @@ -522,6 +531,7 @@ func (sc *SchedulerCache) deletePDB(pdb *policyv1.PodDisruptionBudget) error { return nil } +// AddPDB add pdb to scheduler cache func (sc *SchedulerCache) AddPDB(obj interface{}) { pdb, ok := obj.(*policyv1.PodDisruptionBudget) if !ok { @@ -540,6 +550,7 @@ func (sc *SchedulerCache) AddPDB(obj interface{}) { return } +//UpdatePDB update pdb to scheduler cache func (sc *SchedulerCache) UpdatePDB(oldObj, newObj interface{}) { oldPDB, ok := oldObj.(*policyv1.PodDisruptionBudget) if !ok { @@ -563,6 +574,7 @@ func (sc *SchedulerCache) UpdatePDB(oldObj, newObj interface{}) { return } +//DeletePDB delete pdb from scheduler cache func (sc *SchedulerCache) DeletePDB(obj interface{}) { var pdb *policyv1.PodDisruptionBudget switch t := obj.(type) { @@ -591,6 +603,7 @@ func (sc *SchedulerCache) DeletePDB(obj interface{}) { return } +//AddQueue add queue to scheduler cache func (sc *SchedulerCache) AddQueue(obj interface{}) { ss, ok := obj.(*kbv1.Queue) if !ok { @@ -610,6 +623,7 @@ func (sc *SchedulerCache) AddQueue(obj interface{}) { return } +//UpdateQueue update queue to scheduler cache func (sc *SchedulerCache) UpdateQueue(oldObj, newObj interface{}) { oldSS, ok := oldObj.(*kbv1.Queue) if !ok { @@ -633,6 +647,7 @@ func (sc *SchedulerCache) UpdateQueue(oldObj, newObj interface{}) { return } +//DeleteQueue delete queue from the scheduler cache func (sc *SchedulerCache) DeleteQueue(obj interface{}) { var ss *kbv1.Queue switch t := obj.(type) { @@ -682,6 +697,7 @@ func (sc *SchedulerCache) deleteQueue(queue *kbv1.Queue) error { return nil } +//DeletePriorityClass delete priorityclass from the scheduler cache func (sc *SchedulerCache) DeletePriorityClass(obj interface{}) { var ss *v1beta1.PriorityClass switch t := obj.(type) { @@ -705,6 +721,7 @@ func (sc *SchedulerCache) DeletePriorityClass(obj interface{}) { sc.deletePriorityClass(ss) } +//UpdatePriorityClass update priorityclass to scheduler cache func (sc *SchedulerCache) UpdatePriorityClass(oldObj, newObj interface{}) { oldSS, ok := oldObj.(*v1beta1.PriorityClass) if !ok { @@ -729,6 +746,7 @@ func (sc *SchedulerCache) UpdatePriorityClass(oldObj, newObj interface{}) { sc.addPriorityClass(newSS) } +//AddPriorityClass add priorityclass to scheduler cache func (sc *SchedulerCache) AddPriorityClass(obj interface{}) { var ss *v1beta1.PriorityClass switch t := obj.(type) { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go index 38b991974a..b664ea20a6 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/interface.go @@ -17,10 +17,9 @@ limitations under the License. package cache import ( - "k8s.io/api/core/v1" - "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" + v1 "k8s.io/api/core/v1" ) // Cache collects pods/nodes/queues information @@ -56,15 +55,18 @@ type Cache interface { BindVolumes(task *api.TaskInfo) error } +// VolumeBinder interface for allocate and bind volumes type VolumeBinder interface { AllocateVolumes(task *api.TaskInfo, hostname string) error BindVolumes(task *api.TaskInfo) error } +//Binder interface for binding task and hostname type Binder interface { Bind(task *v1.Pod, hostname string) error } +// Evictor interface for evict pods type Evictor interface { Evict(pod *v1.Pod) error } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/util.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/util.go index 411b47552d..7fdad4f2f2 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/util.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache/util.go @@ -17,7 +17,7 @@ limitations under the License. package cache import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -26,7 +26,7 @@ import ( ) const ( - shadowPodGroupKey = "kube-batch/shadow-pod-group" + shadowPodGroupKey = "volcano/shadow-pod-group" ) func shadowPodGroup(pg *v1alpha1.PodGroup) bool { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf/scheduler_conf.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf/scheduler_conf.go index 84124c7533..516e07a9a5 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf/scheduler_conf.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf/scheduler_conf.go @@ -33,22 +33,24 @@ type Tier struct { type PluginOption struct { // The name of Plugin Name string `yaml:"name"` - // JobOrderDisabled defines whether jobOrderFn is disabled - JobOrderDisabled bool `yaml:"disableJobOrder"` - // JobReadyDisabled defines whether jobReadyFn is disabled - JobReadyDisabled bool `yaml:"disableJobReady"` - // TaskOrderDisabled defines whether taskOrderFn is disabled - TaskOrderDisabled bool `yaml:"disableTaskOrder"` - // PreemptableDisabled defines whether preemptableFn is disabled - PreemptableDisabled bool `yaml:"disablePreemptable"` - // ReclaimableDisabled defines whether reclaimableFn is disabled - ReclaimableDisabled bool `yaml:"disableReclaimable"` - // QueueOrderDisabled defines whether queueOrderFn is disabled - QueueOrderDisabled bool `yaml:"disableQueueOrder"` - // PredicateDisabled defines whether predicateFn is disabled - PredicateDisabled bool `yaml:"disablePredicate"` - // NodeOrderDisabled defines whether NodeOrderFn is disabled - NodeOrderDisabled bool `yaml:"disableNodeOrder"` + // EnabledJobOrder defines whether jobOrderFn is enabled + EnabledJobOrder *bool `yaml:"enableJobOrder"` + // EnabledJobReady defines whether jobReadyFn is enabled + EnabledJobReady *bool `yaml:"enableJobReady"` + // EnabledJobPipelined defines whether jobPipelinedFn is enabled + EnabledJobPipelined *bool `yaml:"enableJobPipelined"` + // EnabledTaskOrder defines whether taskOrderFn is enabled + EnabledTaskOrder *bool `yaml:"enableTaskOrder"` + // EnabledPreemptable defines whether preemptableFn is enabled + EnabledPreemptable *bool `yaml:"enablePreemptable"` + // EnabledReclaimable defines whether reclaimableFn is enabled + EnabledReclaimable *bool `yaml:"enableReclaimable"` + // EnabledQueueOrder defines whether queueOrderFn is enabled + EnabledQueueOrder *bool `yaml:"enableQueueOrder"` + // EnabledPredicate defines whether predicateFn is enabled + EnabledPredicate *bool `yaml:"enablePredicate"` + // EnabledNodeOrder defines whether NodeOrderFn is enabled + EnabledNodeOrder *bool `yaml:"enableNodeOrder"` // Arguments defines the different arguments that can be given to different plugins Arguments map[string]string `yaml:"arguments"` } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/sort.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/arguments.go similarity index 57% rename from vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/sort.go rename to vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/arguments.go index d4d808338c..d968e7ae32 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/sort.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/arguments.go @@ -14,24 +14,33 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package framework import ( - "sort" + "strconv" - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" + "github.com/golang/glog" ) -func SelectBestNode(nodeScores map[int][]*api.NodeInfo) []*api.NodeInfo { - var nodesInorder []*api.NodeInfo - var keys []int - for key := range nodeScores { - keys = append(keys, key) +// Arguments map +type Arguments map[string]string + +//GetInt get the integer value from string +func (a Arguments) GetInt(ptr *int, key string) { + if ptr == nil { + return } - sort.Sort(sort.Reverse(sort.IntSlice(keys))) - for _, key := range keys { - nodes := nodeScores[key] - nodesInorder = append(nodesInorder, nodes...) + + argv, ok := a[key] + if !ok || argv == "" { + return } - return nodesInorder + + value, err := strconv.Atoi(argv) + if err != nil { + glog.Warningf("Could not parse argument: %s for key %s, with err %v", argv, key, err) + return + } + + *ptr = value } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/event.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/event.go index b2f12729d3..428f3ab4da 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/event.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/event.go @@ -20,10 +20,12 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +// Event structure type Event struct { Task *api.TaskInfo } +// EventHandler structure type EventHandler struct { AllocateFunc func(event *Event) DeallocateFunc func(event *Event) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/framework.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/framework.go index f0affdf610..79520ad0f7 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/framework.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/framework.go @@ -26,6 +26,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) +// OpenSession start the session func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { ssn := openSession(cache) ssn.Tiers = tiers @@ -50,6 +51,7 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier) *Session { return ssn } +// CloseSession close the session func CloseSession(ssn *Session) { for _, plugin := range ssn.plugins { onSessionCloseStart := time.Now() diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/interface.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/interface.go index d7abc7d6d1..78878878f8 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/interface.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/interface.go @@ -31,6 +31,7 @@ type Action interface { UnInitialize() } +// Plugin is the interface of scheduler plugin type Plugin interface { // The unique name of Plugin. Name() string diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/plugins.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/plugins.go index ee0ecf0571..7d5c736fc0 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/plugins.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/plugins.go @@ -20,18 +20,21 @@ import "sync" var pluginMutex sync.Mutex -type PluginBuilder func(map[string]string) Plugin +// PluginBuilder plugin management +type PluginBuilder func(Arguments) Plugin // Plugin management var pluginBuilders = map[string]PluginBuilder{} -func RegisterPluginBuilder(name string, pc func(map[string]string) Plugin) { +// RegisterPluginBuilder register the plugin +func RegisterPluginBuilder(name string, pc PluginBuilder) { pluginMutex.Lock() defer pluginMutex.Unlock() pluginBuilders[name] = pc } +// CleanupPluginBuilders cleans up all the plugin func CleanupPluginBuilders() { pluginMutex.Lock() defer pluginMutex.Unlock() @@ -39,6 +42,7 @@ func CleanupPluginBuilders() { pluginBuilders = map[string]PluginBuilder{} } +// GetPluginBuilder get the pluginbuilder by name func GetPluginBuilder(name string) (PluginBuilder, bool) { pluginMutex.Lock() defer pluginMutex.Unlock() @@ -50,6 +54,7 @@ func GetPluginBuilder(name string) (PluginBuilder, bool) { // Action management var actionMap = map[string]Action{} +// RegisterAction register action func RegisterAction(act Action) { pluginMutex.Lock() defer pluginMutex.Unlock() @@ -57,6 +62,7 @@ func RegisterAction(act Action) { actionMap[act.Name()] = act } +// GetAction get the action by name func GetAction(name string) (Action, bool) { pluginMutex.Lock() defer pluginMutex.Unlock() diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go index 399f7c4177..c1b4d049b3 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session.go @@ -33,6 +33,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) +// Session information for the current session type Session struct { UID types.UID @@ -44,18 +45,19 @@ type Session struct { Backlog []*api.JobInfo Tiers []conf.Tier - plugins map[string]Plugin - eventHandlers []*EventHandler - jobOrderFns map[string]api.CompareFn - queueOrderFns map[string]api.CompareFn - taskOrderFns map[string]api.CompareFn - predicateFns map[string]api.PredicateFn - nodeOrderFns map[string]api.NodeOrderFn - preemptableFns map[string]api.EvictableFn - reclaimableFns map[string]api.EvictableFn - overusedFns map[string]api.ValidateFn - jobReadyFns map[string]api.ValidateFn - jobValidFns map[string]api.ValidateExFn + plugins map[string]Plugin + eventHandlers []*EventHandler + jobOrderFns map[string]api.CompareFn + queueOrderFns map[string]api.CompareFn + taskOrderFns map[string]api.CompareFn + predicateFns map[string]api.PredicateFn + nodeOrderFns map[string]api.NodeOrderFn + preemptableFns map[string]api.EvictableFn + reclaimableFns map[string]api.EvictableFn + overusedFns map[string]api.ValidateFn + jobReadyFns map[string]api.ValidateFn + jobPipelinedFns map[string]api.ValidateFn + jobValidFns map[string]api.ValidateExFn } func openSession(cache cache.Cache) *Session { @@ -67,17 +69,18 @@ func openSession(cache cache.Cache) *Session { Nodes: map[string]*api.NodeInfo{}, Queues: map[api.QueueID]*api.QueueInfo{}, - plugins: map[string]Plugin{}, - jobOrderFns: map[string]api.CompareFn{}, - queueOrderFns: map[string]api.CompareFn{}, - taskOrderFns: map[string]api.CompareFn{}, - predicateFns: map[string]api.PredicateFn{}, - nodeOrderFns: map[string]api.NodeOrderFn{}, - preemptableFns: map[string]api.EvictableFn{}, - reclaimableFns: map[string]api.EvictableFn{}, - overusedFns: map[string]api.ValidateFn{}, - jobReadyFns: map[string]api.ValidateFn{}, - jobValidFns: map[string]api.ValidateExFn{}, + plugins: map[string]Plugin{}, + jobOrderFns: map[string]api.CompareFn{}, + queueOrderFns: map[string]api.CompareFn{}, + taskOrderFns: map[string]api.CompareFn{}, + predicateFns: map[string]api.PredicateFn{}, + nodeOrderFns: map[string]api.NodeOrderFn{}, + preemptableFns: map[string]api.EvictableFn{}, + reclaimableFns: map[string]api.EvictableFn{}, + overusedFns: map[string]api.ValidateFn{}, + jobReadyFns: map[string]api.ValidateFn{}, + jobPipelinedFns: map[string]api.ValidateFn{}, + jobValidFns: map[string]api.ValidateExFn{}, } snapshot := cache.Snapshot() @@ -180,12 +183,14 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus { return status } +// Statement returns new statement object func (ssn *Session) Statement() *Statement { return &Statement{ ssn: ssn, } } +// Pipeline the task to the node in the session func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { // Only update status in session job, found := ssn.Jobs[task.Job] @@ -228,6 +233,7 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { return nil } +//Allocate the task to the node in the session func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := ssn.cache.AllocateVolumes(task, hostname); err != nil { return err @@ -311,6 +317,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { return nil } +//Evict the task in the session func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { if err := ssn.cache.Evict(reclaimee, reason); err != nil { return err @@ -350,7 +357,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { return nil } -// UpdateJobStatus update job condition accordingly. +// UpdateJobCondition update job condition accordingly. func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *v1alpha1.PodGroupCondition) error { job, ok := ssn.Jobs[jobInfo.UID] if !ok { @@ -375,10 +382,12 @@ func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *v1alpha1.PodG return nil } +// AddEventHandler add event handlers func (ssn *Session) AddEventHandler(eh *EventHandler) { ssn.eventHandlers = append(ssn.eventHandlers, eh) } +//String return nodes and jobs information in the session func (ssn Session) String() string { msg := fmt.Sprintf("Session %v: \n", ssn.UID) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go index 3520a2c2f3..230844017d 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/session_plugins.go @@ -20,53 +20,69 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +// AddJobOrderFn add job order function func (ssn *Session) AddJobOrderFn(name string, cf api.CompareFn) { ssn.jobOrderFns[name] = cf } +// AddQueueOrderFn add queue order function func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) { ssn.queueOrderFns[name] = qf } +// AddTaskOrderFn add task order function func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { ssn.taskOrderFns[name] = cf } +// AddPreemptableFn add preemptable function func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) { ssn.preemptableFns[name] = cf } +// AddReclaimableFn add Reclaimable function func (ssn *Session) AddReclaimableFn(name string, rf api.EvictableFn) { ssn.reclaimableFns[name] = rf } +// AddJobReadyFn add JobReady function func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) { ssn.jobReadyFns[name] = vf } +// AddJobPipelinedFn add pipelined function +func (ssn *Session) AddJobPipelinedFn(name string, vf api.ValidateFn) { + ssn.jobPipelinedFns[name] = vf +} + +// AddPredicateFn add Predicate function func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) { ssn.predicateFns[name] = pf } +// AddNodeOrderFn add Node order function func (ssn *Session) AddNodeOrderFn(name string, pf api.NodeOrderFn) { ssn.nodeOrderFns[name] = pf } +// AddOverusedFn add overused function func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) { ssn.overusedFns[name] = fn } +// AddJobValidFn add jobvalid function func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) { ssn.jobValidFns[name] = fn } +// Reclaimable invoke reclaimable function of the plugins func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo var init bool for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.ReclaimableDisabled { + if !isEnabled(plugin.EnabledReclaimable) { continue } rf, found := ssn.reclaimableFns[plugin.Name] @@ -101,13 +117,14 @@ func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskI return victims } +// Preemptable invoke preemptable function of the plugins func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo var init bool for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.PreemptableDisabled { + if !isEnabled(plugin.EnabledPreemptable) { continue } @@ -143,6 +160,7 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI return victims } +// Overused invoke overused function of the plugins func (ssn *Session) Overused(queue *api.QueueInfo) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { @@ -159,10 +177,11 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool { return false } +// JobReady invoke jobready function of the plugins func (ssn *Session) JobReady(obj interface{}) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.JobReadyDisabled { + if !isEnabled(plugin.EnabledJobReady) { continue } jrf, found := ssn.jobReadyFns[plugin.Name] @@ -179,6 +198,28 @@ func (ssn *Session) JobReady(obj interface{}) bool { return true } +// JobPipelined invoke pipelined function of the plugins +func (ssn *Session) JobPipelined(obj interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledJobPipelined) { + continue + } + jrf, found := ssn.jobPipelinedFns[plugin.Name] + if !found { + continue + } + + if !jrf(obj) { + return false + } + } + } + + return true +} + +// JobValid invoke jobvalid function of the plugins func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { @@ -197,10 +238,11 @@ func (ssn *Session) JobValid(obj interface{}) *api.ValidateResult { return nil } +// JobOrderFn invoke joborder function of the plugins func (ssn *Session) JobOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.JobOrderDisabled { + if !isEnabled(plugin.EnabledJobOrder) { continue } jof, found := ssn.jobOrderFns[plugin.Name] @@ -218,15 +260,16 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { rv := r.(*api.JobInfo) if lv.CreationTimestamp.Equal(&rv.CreationTimestamp) { return lv.UID < rv.UID - } else { - return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } + return lv.CreationTimestamp.Before(&rv.CreationTimestamp) + } +// QueueOrderFn invoke queueorder function of the plugins func (ssn *Session) QueueOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.QueueOrderDisabled { + if !isEnabled(plugin.EnabledQueueOrder) { continue } qof, found := ssn.queueOrderFns[plugin.Name] @@ -245,15 +288,16 @@ func (ssn *Session) QueueOrderFn(l, r interface{}) bool { rv := r.(*api.QueueInfo) if lv.Queue.CreationTimestamp.Equal(&rv.Queue.CreationTimestamp) { return lv.UID < rv.UID - } else { - return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp) } + return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp) + } +// TaskCompareFns invoke taskorder function of the plugins func (ssn *Session) TaskCompareFns(l, r interface{}) int { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.TaskOrderDisabled { + if !isEnabled(plugin.EnabledTaskOrder) { continue } tof, found := ssn.taskOrderFns[plugin.Name] @@ -269,6 +313,7 @@ func (ssn *Session) TaskCompareFns(l, r interface{}) int { return 0 } +// TaskOrderFn invoke taskorder function of the plugins func (ssn *Session) TaskOrderFn(l, r interface{}) bool { if res := ssn.TaskCompareFns(l, r); res != 0 { return res < 0 @@ -279,15 +324,16 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { rv := r.(*api.TaskInfo) if lv.Pod.CreationTimestamp.Equal(&rv.Pod.CreationTimestamp) { return lv.UID < rv.UID - } else { - return lv.Pod.CreationTimestamp.Before(&rv.Pod.CreationTimestamp) } + return lv.Pod.CreationTimestamp.Before(&rv.Pod.CreationTimestamp) + } +// PredicateFn invoke predicate function of the plugins func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.PredicateDisabled { + if !isEnabled(plugin.EnabledPredicate) { continue } pfn, found := ssn.predicateFns[plugin.Name] @@ -303,11 +349,12 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { return nil } -func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (int, error) { - priorityScore := 0 +// NodeOrderFn invoke node order function of the plugins +func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + priorityScore := 0.0 for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { - if plugin.NodeOrderDisabled { + if !isEnabled(plugin.EnabledNodeOrder) { continue } pfn, found := ssn.nodeOrderFns[plugin.Name] @@ -317,10 +364,14 @@ func (ssn *Session) NodeOrderFn(task *api.TaskInfo, node *api.NodeInfo) (int, er score, err := pfn(task, node) if err != nil { return 0, err - } else { - priorityScore = priorityScore + score } + priorityScore = priorityScore + score + } } return priorityScore, nil } + +func isEnabled(enabled *bool) bool { + return enabled != nil && *enabled +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/statement.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/statement.go index 9ed6ea1fdd..8909081a0b 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/statement.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework/statement.go @@ -22,6 +22,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +// Statement structure type Statement struct { operations []operation ssn *Session @@ -32,6 +33,7 @@ type operation struct { args []interface{} } +//Evict the pod func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error { // Update status in session job, found := s.ssn.Jobs[reclaimee.Job] @@ -107,6 +109,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error { return nil } +// Pipeline the task for the node func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { // Only update status in session job, found := s.ssn.Jobs[task.Job] @@ -191,6 +194,7 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error { return nil } +// Discard operation for evict and pipeline func (s *Statement) Discard() { glog.V(3).Info("Discarding operations ...") for i := len(s.operations) - 1; i >= 0; i-- { @@ -204,6 +208,7 @@ func (s *Statement) Discard() { } } +// Commit operation for evict and pipeline func (s *Statement) Commit() { glog.V(3).Info("Committing operations ...") for _, op := range s.operations { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics/metrics.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics/metrics.go index 4e634c65f6..db61c49f20 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics/metrics.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics/metrics.go @@ -24,8 +24,8 @@ import ( ) const ( - // KubeBatchNamespace - namespace in prometheus used by kube-batch - KubeBatchNamespace = "kube_batch" + // VolcanoNamespace - namespace in prometheus used by volcano + VolcanoNamespace = "volcano" // OnSessionOpen label OnSessionOpen = "OnSessionOpen" @@ -37,7 +37,7 @@ const ( var ( e2eSchedulingLatency = promauto.NewHistogram( prometheus.HistogramOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "e2e_scheduling_latency_milliseconds", Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)", Buckets: prometheus.ExponentialBuckets(5, 2, 10), @@ -46,7 +46,7 @@ var ( pluginSchedulingLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "plugin_scheduling_latency_microseconds", Help: "Plugin scheduling latency in microseconds", Buckets: prometheus.ExponentialBuckets(5, 2, 10), @@ -55,7 +55,7 @@ var ( actionSchedulingLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "action_scheduling_latency_microseconds", Help: "Action scheduling latency in microseconds", Buckets: prometheus.ExponentialBuckets(5, 2, 10), @@ -64,7 +64,7 @@ var ( taskSchedulingLatency = promauto.NewHistogram( prometheus.HistogramOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "task_scheduling_latency_microseconds", Help: "Task scheduling latency in microseconds", Buckets: prometheus.ExponentialBuckets(5, 2, 10), @@ -73,7 +73,7 @@ var ( scheduleAttempts = promauto.NewCounterVec( prometheus.CounterOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "schedule_attempts_total", Help: "Number of attempts to schedule pods, by the result. 'unschedulable' means a pod could not be scheduled, while 'error' means an internal scheduler problem.", }, []string{"result"}, @@ -81,7 +81,7 @@ var ( preemptionVictims = promauto.NewGauge( prometheus.GaugeOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "pod_preemption_victims", Help: "Number of selected preemption victims", }, @@ -89,7 +89,7 @@ var ( preemptionAttempts = promauto.NewCounter( prometheus.CounterOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "total_preemption_attempts", Help: "Total preemption attempts in the cluster till now", }, @@ -97,7 +97,7 @@ var ( unscheduleTaskCount = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "unschedule_task_count", Help: "Number of tasks could not be scheduled", }, []string{"job_id"}, @@ -105,7 +105,7 @@ var ( unscheduleJobCount = promauto.NewGauge( prometheus.GaugeOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "unschedule_job_count", Help: "Number of jobs could not be scheduled", }, @@ -113,7 +113,7 @@ var ( jobRetryCount = promauto.NewCounterVec( prometheus.CounterOpts{ - Subsystem: KubeBatchNamespace, + Subsystem: VolcanoNamespace, Name: "job_retry_counts", Help: "Number of retry counts for one job", }, []string{"job_id"}, diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/conformance/conformance.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/conformance/conformance.go index b35ebe903c..2a0e1e89e0 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/conformance/conformance.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/conformance/conformance.go @@ -17,7 +17,7 @@ limitations under the License. package conformance import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/apis/scheduling" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" @@ -26,10 +26,11 @@ import ( type conformancePlugin struct { // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } -func New(arguments map[string]string) framework.Plugin { +// New return conformance plugin +func New(arguments framework.Arguments) framework.Plugin { return &conformancePlugin{pluginArguments: arguments} } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/defaults.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/defaults.go new file mode 100644 index 0000000000..0eeab1afc1 --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/defaults.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" + +// ApplyPluginConfDefaults sets option's filed to its default value if not set +func ApplyPluginConfDefaults(option *conf.PluginOption) { + t := true + + if option.EnabledJobOrder == nil { + option.EnabledJobOrder = &t + } + if option.EnabledJobReady == nil { + option.EnabledJobReady = &t + } + if option.EnabledJobPipelined == nil { + option.EnabledJobPipelined = &t + } + if option.EnabledTaskOrder == nil { + option.EnabledTaskOrder = &t + } + if option.EnabledPreemptable == nil { + option.EnabledPreemptable = &t + } + if option.EnabledReclaimable == nil { + option.EnabledReclaimable = &t + } + if option.EnabledQueueOrder == nil { + option.EnabledQueueOrder = &t + } + if option.EnabledPredicate == nil { + option.EnabledPredicate = &t + } + if option.EnabledNodeOrder == nil { + option.EnabledNodeOrder = &t + } +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/drf/drf.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/drf/drf.go index 7ce2072dd7..9c02b2a28e 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/drf/drf.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/drf/drf.go @@ -41,10 +41,11 @@ type drfPlugin struct { jobOpts map[api.JobID]*drfAttr // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } -func New(arguments map[string]string) framework.Plugin { +// New return drf plugin +func New(arguments framework.Arguments) framework.Plugin { return &drfPlugin{ totalResource: api.EmptyResource(), jobOpts: map[api.JobID]*drfAttr{}, @@ -159,7 +160,7 @@ func (drf *drfPlugin) updateShare(attr *drfAttr) { func (drf *drfPlugin) calculateShare(allocated, totalResource *api.Resource) float64 { res := float64(0) - for _, rn := range api.ResourceNames() { + for _, rn := range totalResource.ResourceNames() { share := helpers.Share(allocated.Get(rn), totalResource.Get(rn)) if share > res { res = share diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go index 239c5155d9..f1c707b09f 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/gang/gang.go @@ -32,10 +32,11 @@ import ( type gangPlugin struct { // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } -func New(arguments map[string]string) framework.Plugin { +// New return gang plugin +func New(arguments framework.Arguments) framework.Plugin { return &gangPlugin{pluginArguments: arguments} } @@ -43,43 +44,6 @@ func (gp *gangPlugin) Name() string { return "gang" } -// readyTaskNum return the number of tasks that are ready to run. -func readyTaskNum(job *api.JobInfo) int32 { - occupid := 0 - for status, tasks := range job.TaskStatusIndex { - if api.AllocatedStatus(status) || - status == api.Succeeded || - status == api.Pipelined { - occupid = occupid + len(tasks) - } - } - - return int32(occupid) -} - -// validTaskNum return the number of tasks that are valid. -func validTaskNum(job *api.JobInfo) int32 { - occupied := 0 - for status, tasks := range job.TaskStatusIndex { - if api.AllocatedStatus(status) || - status == api.Succeeded || - status == api.Pipelined || - status == api.Pending { - occupied = occupied + len(tasks) - } - } - - return int32(occupied) -} - -func jobReady(obj interface{}) bool { - job := obj.(*api.JobInfo) - - occupied := readyTaskNum(job) - - return occupied >= job.MinAvailable -} - func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { validJobFn := func(obj interface{}) *api.ValidateResult { job, ok := obj.(*api.JobInfo) @@ -90,7 +54,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { } } - vtn := validTaskNum(job) + vtn := job.ValidTaskNum() if vtn < job.MinAvailable { return &api.ValidateResult{ Pass: false, @@ -109,7 +73,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { for _, preemptee := range preemptees { job := ssn.Jobs[preemptee.Job] - occupid := readyTaskNum(job) + occupid := job.ReadyTaskNum() preemptable := job.MinAvailable <= occupid-1 || job.MinAvailable == 1 if !preemptable { @@ -133,8 +97,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) - lReady := jobReady(lv) - rReady := jobReady(rv) + lReady := lv.Ready() + rReady := rv.Ready() glog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t", lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady) @@ -155,19 +119,26 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { } ssn.AddJobOrderFn(gp.Name(), jobOrderFn) - ssn.AddJobReadyFn(gp.Name(), jobReady) + ssn.AddJobReadyFn(gp.Name(), func(obj interface{}) bool { + ji := obj.(*api.JobInfo) + return ji.Ready() + }) + ssn.AddJobPipelinedFn(gp.Name(), func(obj interface{}) bool { + ji := obj.(*api.JobInfo) + return ji.Pipelined() + }) } func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) { var unreadyTaskCount int32 var unScheduleJobCount int for _, job := range ssn.Jobs { - if !jobReady(job) { - unreadyTaskCount = job.MinAvailable - readyTaskNum(job) + if !job.Ready() { + unreadyTaskCount = job.MinAvailable - job.ReadyTaskNum() msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", - job.MinAvailable-readyTaskNum(job), len(job.Tasks), job.FitError()) + job.MinAvailable-job.ReadyTaskNum(), len(job.Tasks), job.FitError()) - unScheduleJobCount += 1 + unScheduleJobCount++ metrics.UpdateUnscheduleTaskCount(job.Name, int(unreadyTaskCount)) metrics.RegisterJobRetries(job.Name) diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go index 9539556426..4441d33119 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/nodeorder/nodeorder.go @@ -18,19 +18,17 @@ package nodeorder import ( "fmt" - "strconv" "github.com/golang/glog" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/pkg/scheduler/algorithm" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util" ) const ( @@ -46,7 +44,7 @@ const ( type nodeOrderPlugin struct { // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.HostPriorityList) int { @@ -92,74 +90,8 @@ func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { return node.Node, nil } -type podLister struct { - session *framework.Session -} - -func (pl *podLister) List(selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod - for _, job := range pl.session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } - } - } - } - } - - return pods, nil -} - -func (pl *podLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod - for _, job := range pl.session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { - if task.NodeName != task.Pod.Spec.NodeName { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } else { - pods = append(pods, task.Pod) - } - } - } - } - } - - return pods, nil -} - -type nodeLister struct { - session *framework.Session -} - -func (nl *nodeLister) List() ([]*v1.Node, error) { - var nodes []*v1.Node - for _, node := range nl.session.Nodes { - nodes = append(nodes, node.Node) - } - return nodes, nil -} - //New function returns prioritizePlugin object -func New(aruguments map[string]string) framework.Plugin { +func New(aruguments framework.Arguments) framework.Plugin { return &nodeOrderPlugin{pluginArguments: aruguments} } @@ -174,7 +106,7 @@ type priorityWeight struct { balancedRescourceWeight int } -func calculateWeight(args map[string]string) priorityWeight { +func calculateWeight(args framework.Arguments) priorityWeight { /* User Should give priorityWeight in this format(nodeaffinity.weight, podaffinity.weight, leastrequested.weight, balancedresource.weight). Currently supported only for nodeaffinity, podaffinity, leastrequested, balancedresouce priorities. @@ -206,59 +138,31 @@ func calculateWeight(args map[string]string) priorityWeight { } // Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct. - if args[NodeAffinityWeight] != "" { - val, err := strconv.Atoi(args[NodeAffinityWeight]) - if err != nil { - glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[NodeAffinityWeight], err) - } else { - weight.nodeAffinityWeight = val - } - } + args.GetInt(&weight.nodeAffinityWeight, NodeAffinityWeight) // Checks whether podaffinity.weight is provided or not, if given, modifies the value in weight struct. - if args[PodAffinityWeight] != "" { - val, err := strconv.Atoi(args[PodAffinityWeight]) - if err != nil { - glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[PodAffinityWeight], err) - } else { - weight.podAffinityWeight = val - } - } + args.GetInt(&weight.podAffinityWeight, PodAffinityWeight) // Checks whether leastrequested.weight is provided or not, if given, modifies the value in weight struct. - if args[LeastRequestedWeight] != "" { - val, err := strconv.Atoi(args[LeastRequestedWeight]) - if err != nil { - glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[LeastRequestedWeight], err) - } else { - weight.leastReqWeight = val - } - } + args.GetInt(&weight.leastReqWeight, LeastRequestedWeight) // Checks whether balancedresource.weight is provided or not, if given, modifies the value in weight struct. - if args[BalancedResourceWeight] != "" { - val, err := strconv.Atoi(args[BalancedResourceWeight]) - if err != nil { - glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[BalancedResourceWeight], err) - } else { - weight.balancedRescourceWeight = val - } - } + args.GetInt(&weight.balancedRescourceWeight, BalancedResourceWeight) return weight } func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { - nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (int, error) { + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { weight := calculateWeight(pp.pluginArguments) - pl := &podLister{ - session: ssn, + pl := &util.PodLister{ + Session: ssn, } - nl := &nodeLister{ - session: ssn, + nl := &util.NodeLister{ + Session: ssn, } cn := &cachedNodeInfo{ @@ -273,7 +177,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { nodeInfo := cache.NewNodeInfo(node.Pods()...) nodeInfo.SetNode(node.Node) - var score = 0 + var score = 0.0 //TODO: Add ImageLocalityPriority Function once priorityMetadata is published //Issue: #74132 in kubernetes ( https://github.com/kubernetes/kubernetes/issues/74132 ) @@ -284,7 +188,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return 0, err } // If leastReqWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + (host.Score * weight.leastReqWeight) + score = score + float64(host.Score*weight.leastReqWeight) host, err = priorities.BalancedResourceAllocationMap(task.Pod, nil, nodeInfo) if err != nil { @@ -292,7 +196,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return 0, err } // If balancedRescourceWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + (host.Score * weight.balancedRescourceWeight) + score = score + float64(host.Score*weight.balancedRescourceWeight) host, err = priorities.CalculateNodeAffinityPriorityMap(task.Pod, nil, nodeInfo) if err != nil { @@ -300,7 +204,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { return 0, err } // If nodeAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + (host.Score * weight.nodeAffinityWeight) + score = score + float64(host.Score*weight.nodeAffinityWeight) mapFn := priorities.NewInterPodAffinityPriority(cn, nl, pl, v1.DefaultHardPodAffinitySymmetricWeight) interPodAffinityScore, err = mapFn(task.Pod, nodeMap, nodeSlice) @@ -310,7 +214,7 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) { } hostScore := getInterPodAffinityScore(node.Name, interPodAffinityScore) // If podAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score. - score = score + (hostScore * weight.podAffinityWeight) + score = score + float64(hostScore*weight.podAffinityWeight) glog.V(4).Infof("Total Score for that node is: %d", score) return score, nil diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go index d71d4cdc05..57d92f128b 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/predicates/predicates.go @@ -18,25 +18,26 @@ package predicates import ( "fmt" + "strings" "github.com/golang/glog" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/cache" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util" ) type predicatesPlugin struct { // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } -func New(arguments map[string]string) framework.Plugin { +// New return predicate plugin +func New(arguments framework.Arguments) framework.Plugin { return &predicatesPlugin{pluginArguments: arguments} } @@ -44,80 +45,22 @@ func (pp *predicatesPlugin) Name() string { return "predicates" } -type podLister struct { - session *framework.Session -} - -func (pl *podLister) List(selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod - for _, job := range pl.session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if selector.Matches(labels.Set(task.Pod.Labels)) { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } - } - } +func formatReason(reasons []algorithm.PredicateFailureReason) string { + reasonStrings := []string{} + for _, v := range reasons { + reasonStrings = append(reasonStrings, fmt.Sprintf("%v", v.GetReason())) } - return pods, nil -} - -func (pl *podLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { - var pods []*v1.Pod - for _, job := range pl.session.Jobs { - for status, tasks := range job.TaskStatusIndex { - if !api.AllocatedStatus(status) { - continue - } - - for _, task := range tasks { - if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { - pod := task.Pod.DeepCopy() - pod.Spec.NodeName = task.NodeName - pods = append(pods, pod) - } - } - } - } - - return pods, nil -} - -type cachedNodeInfo struct { - session *framework.Session -} - -func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { - node, found := c.session.Nodes[name] - if !found { - return nil, fmt.Errorf("failed to find node <%s>", name) - } - - return node.Node, nil -} - -// Check to see if node spec is set to Schedulable or not -func CheckNodeUnschedulable(pod *v1.Pod, nodeInfo *cache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - if nodeInfo.Node().Spec.Unschedulable { - return false, []algorithm.PredicateFailureReason{predicates.ErrNodeUnschedulable}, nil - } - return true, nil, nil + return strings.Join(reasonStrings, ", ") } func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { - pl := &podLister{ - session: ssn, + pl := &util.PodLister{ + Session: ssn, } - ni := &cachedNodeInfo{ - session: ssn, + ni := &util.CachedNodeInfo{ + Session: ssn, } ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { @@ -128,8 +71,36 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return fmt.Errorf("node <%s> can not allow more task running on it", node.Name) } + // CheckNodeCondition Predicate + fit, reasons, err := predicates.CheckNodeConditionPredicate(task.Pod, nil, nodeInfo) + if err != nil { + return err + } + + glog.V(4).Infof("CheckNodeCondition predicates Task <%s/%s> on Node <%s>: fit %t, err %v", + task.Namespace, task.Name, node.Name, fit, err) + + if !fit { + return fmt.Errorf("node <%s> are not available to schedule task <%s/%s>: %s", + node.Name, task.Namespace, task.Name, formatReason(reasons)) + } + + // CheckNodeUnschedulable Predicate + fit, _, err = predicates.CheckNodeUnschedulablePredicate(task.Pod, nil, nodeInfo) + if err != nil { + return err + } + + glog.V(4).Infof("CheckNodeUnschedulable Predicate Task <%s/%s> on Node <%s>: fit %t, err %v", + task.Namespace, task.Name, node.Name, fit, err) + + if !fit { + return fmt.Errorf("task <%s/%s> node <%s> set to unschedulable", + task.Namespace, task.Name, node.Name) + } + // NodeSelector Predicate - fit, _, err := predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) + fit, _, err = predicates.PodMatchNodeSelector(task.Pod, nil, nodeInfo) if err != nil { return err } @@ -156,32 +127,60 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { node.Name, task.Namespace, task.Name) } - // Check to see if node.Spec.Unschedulable is set - fit, _, err = CheckNodeUnschedulable(task.Pod, nodeInfo) + // Toleration/Taint Predicate + fit, _, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) if err != nil { return err } - glog.V(4).Infof("Check Unschedulable Task <%s/%s> on Node <%s>: fit %t, err %v", + glog.V(4).Infof("Toleration/Taint predicates Task <%s/%s> on Node <%s>: fit %t, err %v", task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> node <%s> set to unschedulable", + return fmt.Errorf("task <%s/%s> does not tolerate node <%s> taints", task.Namespace, task.Name, node.Name) } - // Toleration/Taint Predicate - fit, _, err = predicates.PodToleratesNodeTaints(task.Pod, nil, nodeInfo) + // CheckNodeMemoryPressurePredicate + fit, _, err = predicates.CheckNodeMemoryPressurePredicate(task.Pod, nil, nodeInfo) if err != nil { return err } - glog.V(4).Infof("Toleration/Taint predicates Task <%s/%s> on Node <%s>: fit %t, err %v", + glog.V(4).Infof("CheckNodeMemoryPressure predicates Task <%s/%s> on Node <%s>: fit %t, err %v", task.Namespace, task.Name, node.Name, fit, err) if !fit { - return fmt.Errorf("task <%s/%s> does not tolerate node <%s> taints", - task.Namespace, task.Name, node.Name) + return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Memory Pressure", + node.Name, task.Namespace, task.Name) + } + + // CheckNodeDiskPressurePredicate + fit, _, err = predicates.CheckNodeDiskPressurePredicate(task.Pod, nil, nodeInfo) + if err != nil { + return err + } + + glog.V(4).Infof("CheckNodeDiskPressure predicates Task <%s/%s> on Node <%s>: fit %t, err %v", + task.Namespace, task.Name, node.Name, fit, err) + + if !fit { + return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to Disk Pressure", + node.Name, task.Namespace, task.Name) + } + + // CheckNodePIDPressurePredicate + fit, _, err = predicates.CheckNodePIDPressurePredicate(task.Pod, nil, nodeInfo) + if err != nil { + return err + } + + glog.V(4).Infof("CheckNodePIDPressurePredicate predicates Task <%s/%s> on Node <%s>: fit %t, err %v", + task.Namespace, task.Name, node.Name, fit, err) + + if !fit { + return fmt.Errorf("node <%s> are not available to schedule task <%s/%s> due to PID Pressure", + node.Name, task.Namespace, task.Name) } // Pod Affinity/Anti-Affinity Predicate diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/priority/priority.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/priority/priority.go index b8972d5b98..8c14bcaf0a 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/priority/priority.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/priority/priority.go @@ -24,10 +24,11 @@ import ( type priorityPlugin struct { // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } -func New(arguments map[string]string) framework.Plugin { +// New return priority plugin +func New(arguments framework.Arguments) framework.Plugin { return &priorityPlugin{pluginArguments: arguments} } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go index 692808575a..2059f78de7 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/proportion/proportion.go @@ -28,7 +28,7 @@ type proportionPlugin struct { totalResource *api.Resource queueOpts map[api.QueueID]*queueAttr // Arguments given for the plugin - pluginArguments map[string]string + pluginArguments framework.Arguments } type queueAttr struct { @@ -42,7 +42,8 @@ type queueAttr struct { request *api.Resource } -func New(arguments map[string]string) framework.Plugin { +// New return proportion action +func New(arguments framework.Arguments) framework.Plugin { return &proportionPlugin{ totalResource: api.EmptyResource(), queueOpts: map[api.QueueID]*queueAttr{}, @@ -231,7 +232,7 @@ func (pp *proportionPlugin) updateShare(attr *queueAttr) { res := float64(0) // TODO(k82cn): how to handle fragment issues? - for _, rn := range api.ResourceNames() { + for _, rn := range attr.deserved.ResourceNames() { share := helpers.Share(attr.allocated.Get(rn), attr.deserved.Get(rn)) if share > res { res = share diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go new file mode 100644 index 0000000000..a8935afa23 --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins/util/util.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/pkg/scheduler/algorithm" + + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" +) + +// PodLister is used in predicate and nodeorder plugin +type PodLister struct { + Session *framework.Session +} + +// List method is used to list all the pods +func (pl *PodLister) List(selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, job := range pl.Session.Jobs { + for status, tasks := range job.TaskStatusIndex { + if !api.AllocatedStatus(status) { + continue + } + + for _, task := range tasks { + if selector.Matches(labels.Set(task.Pod.Labels)) { + if task.NodeName != task.Pod.Spec.NodeName { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + pods = append(pods, pod) + } else { + pods = append(pods, task.Pod) + } + } + } + } + } + + return pods, nil +} + +// FilteredList is used to list all the pods under filter condition +func (pl *PodLister) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, job := range pl.Session.Jobs { + for status, tasks := range job.TaskStatusIndex { + if !api.AllocatedStatus(status) { + continue + } + + for _, task := range tasks { + if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { + if task.NodeName != task.Pod.Spec.NodeName { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + pods = append(pods, pod) + } else { + pods = append(pods, task.Pod) + } + } + } + } + } + + return pods, nil +} + +// CachedNodeInfo is used in nodeorder and predicate plugin +type CachedNodeInfo struct { + Session *framework.Session +} + +// GetNodeInfo is used to get info of a particular node +func (c *CachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { + node, found := c.Session.Nodes[name] + if !found { + return nil, fmt.Errorf("failed to find node <%s>", name) + } + + return node.Node, nil +} + +// NodeLister is used in nodeorder plugin +type NodeLister struct { + Session *framework.Session +} + +// List is used to list all the nodes +func (nl *NodeLister) List() ([]*v1.Node, error) { + var nodes []*v1.Node + for _, node := range nl.Session.Nodes { + nodes = append(nodes, node.Node) + } + return nodes, nil +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/scheduler.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/scheduler.go index 52bbc4f157..308d6e7817 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/scheduler.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/scheduler.go @@ -17,7 +17,6 @@ limitations under the License. package scheduler import ( - "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" "time" "github.com/golang/glog" @@ -26,10 +25,13 @@ import ( "k8s.io/client-go/rest" schedcache "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/cache" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics" ) +// Scheduler watches for new unscheduled pods for kubebatch. It attempts to find +// nodes that they fit on and writes bindings back to the api server. type Scheduler struct { cache schedcache.Cache config *rest.Config @@ -39,6 +41,7 @@ type Scheduler struct { schedulePeriod time.Duration } +// NewScheduler returns a scheduler func NewScheduler( config *rest.Config, schedulerName string, @@ -56,6 +59,7 @@ func NewScheduler( return scheduler, nil } +// Run runs the Scheduler func (pc *Scheduler) Run(stopCh <-chan struct{}) { var err error diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util.go index 4b55f9b9f6..bcfc84753a 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util.go @@ -25,6 +25,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/conf" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/plugins" ) var defaultSchedulerConf = ` @@ -51,6 +52,14 @@ func loadSchedulerConf(confStr string) ([]framework.Action, []conf.Tier, error) if err := yaml.Unmarshal(buf, schedulerConf); err != nil { return nil, nil, err } + + // Set default settings for each plugin if not set + for i, tier := range schedulerConf.Tiers { + for j := range tier.Plugins { + plugins.ApplyPluginConfDefaults(&schedulerConf.Tiers[i].Plugins[j]) + } + } + actionNames := strings.Split(schedulerConf.Actions, ",") for _, actionName := range actionNames { if action, found := framework.GetAction(strings.TrimSpace(actionName)); found { diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/priority_queue.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/priority_queue.go index 91b3d72003..a1a12a1de4 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/priority_queue.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/priority_queue.go @@ -22,6 +22,7 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +//PriorityQueue implements a scheduling queue. type PriorityQueue struct { queue priorityQueue } @@ -31,6 +32,7 @@ type priorityQueue struct { lessFn api.LessFn } +// NewPriorityQueue returns a PriorityQueue func NewPriorityQueue(lessFn api.LessFn) *PriorityQueue { return &PriorityQueue{ queue: priorityQueue{ @@ -40,10 +42,12 @@ func NewPriorityQueue(lessFn api.LessFn) *PriorityQueue { } } +// Push pushes element in the priority Queue func (q *PriorityQueue) Push(it interface{}) { heap.Push(&q.queue, it) } +// Pop pops element in the priority Queue func (q *PriorityQueue) Pop() interface{} { if q.Len() == 0 { return nil @@ -52,10 +56,12 @@ func (q *PriorityQueue) Pop() interface{} { return heap.Pop(&q.queue) } +// Empty check if queue is empty func (q *PriorityQueue) Empty() bool { return q.queue.Len() == 0 } +// Len returns Len of the priority queue func (q *PriorityQueue) Len() int { return q.queue.Len() } diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go new file mode 100644 index 0000000000..2641ba6cc4 --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/scheduler_helper.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "math/rand" + "sort" + "sync" + + "github.com/golang/glog" + "k8s.io/client-go/util/workqueue" + + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" +) + +// PredicateNodes returns nodes that fit task +func PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) []*api.NodeInfo { + var predicateNodes []*api.NodeInfo + + var workerLock sync.Mutex + checkNode := func(index int) { + node := nodes[index] + glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>", + task.Namespace, task.Name, node.Name, task.Resreq, node.Idle) + + // TODO (k82cn): Enable eCache for performance improvement. + if err := fn(task, node); err != nil { + glog.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + return + } + + workerLock.Lock() + predicateNodes = append(predicateNodes, node) + workerLock.Unlock() + } + + workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode) + return predicateNodes +} + +// PrioritizeNodes returns a map whose key is node's score and value are corresponding nodes +func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.NodeOrderFn) map[float64][]*api.NodeInfo { + nodeScores := map[float64][]*api.NodeInfo{} + + var workerLock sync.Mutex + scoreNode := func(index int) { + node := nodes[index] + score, err := fn(task, node) + if err != nil { + glog.Errorf("Error in Calculating Priority for the node:%v", err) + return + } + + workerLock.Lock() + nodeScores[score] = append(nodeScores[score], node) + workerLock.Unlock() + } + workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), scoreNode) + return nodeScores +} + +// SortNodes returns nodes by order of score +func SortNodes(nodeScores map[float64][]*api.NodeInfo) []*api.NodeInfo { + var nodesInorder []*api.NodeInfo + var keys []float64 + for key := range nodeScores { + keys = append(keys, key) + } + sort.Sort(sort.Reverse(sort.Float64Slice(keys))) + for _, key := range keys { + nodes := nodeScores[key] + nodesInorder = append(nodesInorder, nodes...) + } + return nodesInorder +} + +// SelectBestNode returns best node whose score is highest, pick one randomly if there are many nodes with same score. +func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo { + var bestNodes []*api.NodeInfo + maxScore := -1.0 + for score, nodes := range nodeScores { + if score > maxScore { + maxScore = score + bestNodes = nodes + } + } + + return bestNodes[rand.Intn(len(bestNodes))] +} + +// GetNodeList returns values of the map 'nodes' +func GetNodeList(nodes map[string]*api.NodeInfo) []*api.NodeInfo { + result := make([]*api.NodeInfo, 0, len(nodes)) + for _, v := range nodes { + result = append(result, v) + } + return result +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/test_utils.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/test_utils.go new file mode 100644 index 0000000000..ea2f412edd --- /dev/null +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util/test_utils.go @@ -0,0 +1,163 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" +) + +// BuildResourceList builts resource list object +func BuildResourceList(cpu string, memory string) v1.ResourceList { + return v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + api.GPUResourceName: resource.MustParse("0"), + } +} + +// BuildResourceListWithGPU builts resource list with GPU +func BuildResourceListWithGPU(cpu string, memory string, GPU string) v1.ResourceList { + return v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + api.GPUResourceName: resource.MustParse(GPU), + } +} + +// BuildNode builts node object +func BuildNode(name string, alloc v1.ResourceList, labels map[string]string) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + }, + Status: v1.NodeStatus{ + Capacity: alloc, + Allocatable: alloc, + }, + } +} + +// BuildPod builts Pod object +func BuildPod(namespace, name, nodename string, p v1.PodPhase, req v1.ResourceList, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + Name: name, + Namespace: namespace, + Labels: labels, + Annotations: map[string]string{ + kbv1.GroupNameAnnotationKey: groupName, + }, + }, + Status: v1.PodStatus{ + Phase: p, + }, + Spec: v1.PodSpec{ + NodeName: nodename, + NodeSelector: selector, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: req, + }, + }, + }, + }, + } +} + +// FakeBinder is used as fake binder +type FakeBinder struct { + sync.Mutex + Binds map[string]string + Channel chan string +} + +// Bind used by fake binder struct to bind pods +func (fb *FakeBinder) Bind(p *v1.Pod, hostname string) error { + fb.Lock() + defer fb.Unlock() + + key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) + fb.Binds[key] = hostname + + fb.Channel <- key + + return nil +} + +// FakeEvictor is used as fake evictor +type FakeEvictor struct { + sync.Mutex + Evicts []string + Channel chan string +} + +// Evict is used by fake evictor to evict pods +func (fe *FakeEvictor) Evict(p *v1.Pod) error { + fe.Lock() + defer fe.Unlock() + + fmt.Println("PodName: ", p.Name) + key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) + fe.Evicts = append(fe.Evicts, key) + + fe.Channel <- key + + return nil +} + +// FakeStatusUpdater is used for fake status update +type FakeStatusUpdater struct { +} + +// UpdatePodCondition is a empty function +func (ftsu *FakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) { + // do nothing here + return nil, nil +} + +// UpdatePodGroup is a empty function +func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *kbv1.PodGroup) (*kbv1.PodGroup, error) { + // do nothing here + return nil, nil +} + +// FakeVolumeBinder is used as fake volume binder +type FakeVolumeBinder struct { +} + +// AllocateVolumes is a empty function +func (fvb *FakeVolumeBinder) AllocateVolumes(task *api.TaskInfo, hostname string) error { + return nil +} + +// BindVolumes is a empty function +func (fvb *FakeVolumeBinder) BindVolumes(task *api.TaskInfo) error { + return nil +} diff --git a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/version/version.go b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/version/version.go index 6db1560c25..52b83d6f8d 100644 --- a/vendor/github.com/kubernetes-sigs/kube-batch/pkg/version/version.go +++ b/vendor/github.com/kubernetes-sigs/kube-batch/pkg/version/version.go @@ -23,9 +23,9 @@ import ( ) var ( - // Version shows the version of kube-batch. + // Version shows the version of kube batch. Version = "Not provided." - // GitSHA shoows the git commit id of kube-batch. + // GitSHA shoows the git commit id of kube batch. GitSHA = "Not provided." // Built shows the built time of the binary. Built = "Not provided."