From dbca3b747208bb2150667ede1a681e2a5f40d7cb Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 28 Jan 2020 12:44:07 +0800 Subject: [PATCH] Refactor events/actions. Signed-off-by: Klaus Ma --- pkg/apis/batch/v1alpha1/job.go | 60 ++---------------- .../batch/v1alpha1/zz_generated.deepcopy.go | 3 +- pkg/apis/bus/v1alpha1/actions.go | 61 +++++++++++++++++++ .../bus/v1alpha1/{types.go => commands.go} | 0 pkg/apis/bus/v1alpha1/events.go | 48 +++++++++++++++ pkg/apis/scheduling/v1alpha2/types.go | 33 ---------- .../v1alpha2/zz_generated.deepcopy.go | 16 ----- pkg/cli/job/resume.go | 2 +- pkg/cli/job/suspend.go | 2 +- pkg/cli/job/util.go | 3 +- pkg/cli/queue/operate.go | 12 ++-- pkg/cli/queue/util.go | 3 +- pkg/cli/util/util.go | 3 +- pkg/cli/vresume/resume.go | 2 +- pkg/cli/vsuspend/suspend.go | 2 +- pkg/controllers/apis/request.go | 11 ++-- pkg/controllers/job/job_controller.go | 7 +-- pkg/controllers/job/job_controller_handler.go | 25 ++++---- pkg/controllers/job/job_controller_util.go | 19 +++--- pkg/controllers/job/state/aborted.go | 5 +- pkg/controllers/job/state/aborting.go | 5 +- pkg/controllers/job/state/completing.go | 3 +- pkg/controllers/job/state/factory.go | 4 +- pkg/controllers/job/state/finished.go | 4 +- pkg/controllers/job/state/pending.go | 11 ++-- pkg/controllers/job/state/restarting.go | 3 +- pkg/controllers/job/state/running.go | 11 ++-- pkg/controllers/job/state/terminating.go | 3 +- pkg/controllers/queue/queue_controller.go | 32 +++++----- .../queue/queue_controller_action.go | 14 ++--- .../queue/queue_controller_handler.go | 34 +++++------ pkg/controllers/queue/state/closed.go | 7 ++- pkg/controllers/queue/state/closing.go | 7 ++- pkg/controllers/queue/state/factory.go | 3 +- pkg/controllers/queue/state/open.go | 7 ++- pkg/controllers/queue/state/unknown.go | 7 ++- pkg/webhooks/admission/jobs/validate/util.go | 55 +++++++++-------- 37 files changed, 272 insertions(+), 255 deletions(-) create mode 100644 pkg/apis/bus/v1alpha1/actions.go rename pkg/apis/bus/v1alpha1/{types.go => commands.go} (100%) create mode 100644 pkg/apis/bus/v1alpha1/events.go diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 027d3c8098e..2eecd0c28c0 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -19,6 +19,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) // +genclient @@ -118,76 +119,23 @@ const ( JobStatusError JobEvent = "JobStatusError" ) -// Event represent the phase of Job, e.g. pod-failed. -type Event string - -const ( - // AnyEvent means all event - AnyEvent Event = "*" - // PodFailedEvent is triggered if Pod was failed - PodFailedEvent Event = "PodFailed" - // PodEvictedEvent is triggered if Pod was deleted - PodEvictedEvent Event = "PodEvicted" - // JobUnknownEvent These below are several events can lead to job 'Unknown' - // 1. Task Unschedulable, this is triggered when part of - // pods can't be scheduled while some are already running in gang-scheduling case. - JobUnknownEvent Event = "Unknown" - // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed - TaskCompletedEvent Event = "TaskCompleted" - - // Note: events below are used internally, should not be used by users. - - // OutOfSyncEvent is triggered if Pod/Job were updated - OutOfSyncEvent Event = "OutOfSync" - // CommandIssuedEvent is triggered if a command is raised by user - CommandIssuedEvent Event = "CommandIssued" -) - -// Action is the action that Job controller will take according to the event. -type Action string - -const ( - // AbortJobAction if this action is set, the whole job will be aborted: - // all Pod of Job will be evicted, and no Pod will be recreated - AbortJobAction Action = "AbortJob" - // RestartJobAction if this action is set, the whole job will be restarted - RestartJobAction Action = "RestartJob" - // RestartTaskAction if this action is set, only the task will be restarted; default action. - // This action can not work together with job level events, e.g. JobUnschedulable - RestartTaskAction Action = "RestartTask" - // TerminateJobAction if this action is set, the whole job wil be terminated - // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. - TerminateJobAction Action = "TerminateJob" - // CompleteJobAction if this action is set, the unfinished pods will be killed, job completed. - CompleteJobAction Action = "CompleteJob" - // ResumeJobAction is the action to resume an aborted job. - ResumeJobAction Action = "ResumeJob" - - // Note: actions below are only used internally, should not be used by users. - - // SyncJobAction is the action to sync Job/Pod status. - SyncJobAction Action = "SyncJob" - // EnqueueAction is the action to sync Job inqueue status. - EnqueueAction Action = "EnqueueJob" -) - // LifecyclePolicy specifies the lifecycle and error handling of task and job. type LifecyclePolicy struct { // The action that will be taken to the PodGroup according to Event. // One of "Restart", "None". // Default to None. // +optional - Action Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"` + Action v1alpha1.Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"` // The Event recorded by scheduler; the controller takes actions // according to this Event. // +optional - Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` + Event v1alpha1.Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` // The Events recorded by scheduler; the controller takes actions // according to this Events. // +optional - Events []Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"` + Events []v1alpha1.Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"` // The exit code of the pod container, controller will take action // according to this code. diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 5bd49486e38..bb262684778 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -190,7 +191,7 @@ func (in *LifecyclePolicy) DeepCopyInto(out *LifecyclePolicy) { *out = *in if in.Events != nil { in, out := &in.Events, &out.Events - *out = make([]Event, len(*in)) + *out = make([]busv1alpha1.Event, len(*in)) copy(*out, *in) } if in.ExitCode != nil { diff --git a/pkg/apis/bus/v1alpha1/actions.go b/pkg/apis/bus/v1alpha1/actions.go new file mode 100644 index 00000000000..03167fd7bd1 --- /dev/null +++ b/pkg/apis/bus/v1alpha1/actions.go @@ -0,0 +1,61 @@ +/* +Copyright 2020 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +// Action is the action that Job controller will take according to the event. +type Action string + +const ( + + // AbortJobAction if this action is set, the whole job will be aborted: + // all Pod of Job will be evicted, and no Pod will be recreated + AbortJobAction Action = "AbortJob" + + // RestartJobAction if this action is set, the whole job will be restarted + RestartJobAction Action = "RestartJob" + + // RestartTaskAction if this action is set, only the task will be restarted; default action. + // This action can not work together with job level events, e.g. JobUnschedulable + RestartTaskAction Action = "RestartTask" + + // TerminateJobAction if this action is set, the whole job wil be terminated + // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. + TerminateJobAction Action = "TerminateJob" + + // CompleteJobAction if this action is set, the unfinished pods will be killed, job completed. + CompleteJobAction Action = "CompleteJob" + + // ResumeJobAction is the action to resume an aborted job. + ResumeJobAction Action = "ResumeJob" + + // Note: actions below are only used internally, should not be used by users. + + // SyncJobAction is the action to sync Job/Pod status. + SyncJobAction Action = "SyncJob" + + // EnqueueAction is the action to sync Job inqueue status. + EnqueueAction Action = "EnqueueJob" + + // SyncQueueAction is the action to sync queue status. + SyncQueueAction Action = "SyncQueue" + + // OpenQueueAction is the action to open queue + OpenQueueAction Action = "OpenQueue" + + // CloseQueueAction is the action to close queue + CloseQueueAction Action = "CloseQueue" +) diff --git a/pkg/apis/bus/v1alpha1/types.go b/pkg/apis/bus/v1alpha1/commands.go similarity index 100% rename from pkg/apis/bus/v1alpha1/types.go rename to pkg/apis/bus/v1alpha1/commands.go diff --git a/pkg/apis/bus/v1alpha1/events.go b/pkg/apis/bus/v1alpha1/events.go new file mode 100644 index 00000000000..a085110d174 --- /dev/null +++ b/pkg/apis/bus/v1alpha1/events.go @@ -0,0 +1,48 @@ +/* +Copyright 2020 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +// Event represent the phase of Job, e.g. pod-failed. + +type Event string + +const ( + + // AnyEvent means all event + AnyEvent Event = "*" + + // PodFailedEvent is triggered if Pod was failed + PodFailedEvent Event = "PodFailed" + + // PodEvictedEvent is triggered if Pod was deleted + PodEvictedEvent Event = "PodEvicted" + + // JobUnknownEvent These below are several events can lead to job 'Unknown' + // 1. Task Unschedulable, this is triggered when part of + // pods can't be scheduled while some are already running in gang-scheduling case. + JobUnknownEvent Event = "Unknown" + + // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed + TaskCompletedEvent Event = "TaskCompleted" + + // Note: events below are used internally, should not be used by users. + // OutOfSyncEvent is triggered if Pod/Job were updated + OutOfSyncEvent Event = "OutOfSync" + + // CommandIssuedEvent is triggered if a command is raised by user + CommandIssuedEvent Event = "CommandIssued" +) diff --git a/pkg/apis/scheduling/v1alpha2/types.go b/pkg/apis/scheduling/v1alpha2/types.go index 947c59e8ea7..e78d636153a 100644 --- a/pkg/apis/scheduling/v1alpha2/types.go +++ b/pkg/apis/scheduling/v1alpha2/types.go @@ -113,28 +113,6 @@ const ( NotEnoughPodsReason string = "NotEnoughTasks" ) -// QueueEvent represent the phase of queue -type QueueEvent string - -const ( - // QueueOutOfSyncEvent is triggered if PodGroup/Queue were updated - QueueOutOfSyncEvent QueueEvent = "OutOfSync" - // QueueCommandIssuedEvent is triggered if a command is raised by user - QueueCommandIssuedEvent QueueEvent = "CommandIssued" -) - -// QueueAction is the action that queue controller will take according to the event. -type QueueAction string - -const ( - // SyncQueueAction is the action to sync queue status. - SyncQueueAction QueueAction = "SyncQueue" - // OpenQueueAction is the action to open queue - OpenQueueAction QueueAction = "OpenQueue" - // CloseQueueAction is the action to close queue - CloseQueueAction QueueAction = "CloseQueue" -) - // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -279,14 +257,3 @@ type QueueList struct { // items is the list of PodGroup Items []Queue `json:"items" protobuf:"bytes,2,rep,name=items"` } - -// QueueRequest struct -type QueueRequest struct { - // Name is queue name - Name string - - // Event is event of queue - Event QueueEvent - // Action is action to be performed - Action QueueAction -} diff --git a/pkg/apis/scheduling/v1alpha2/zz_generated.deepcopy.go b/pkg/apis/scheduling/v1alpha2/zz_generated.deepcopy.go index ed63d86a295..091fe396bec 100644 --- a/pkg/apis/scheduling/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/apis/scheduling/v1alpha2/zz_generated.deepcopy.go @@ -215,22 +215,6 @@ func (in *QueueList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *QueueRequest) DeepCopyInto(out *QueueRequest) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueRequest. -func (in *QueueRequest) DeepCopy() *QueueRequest { - if in == nil { - return nil - } - out := new(QueueRequest) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *QueueSpec) DeepCopyInto(out *QueueSpec) { *out = *in diff --git a/pkg/cli/job/resume.go b/pkg/cli/job/resume.go index da573ee1b91..dc9f542ae8e 100644 --- a/pkg/cli/job/resume.go +++ b/pkg/cli/job/resume.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/cobra" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) type resumeFlags struct { diff --git a/pkg/cli/job/suspend.go b/pkg/cli/job/suspend.go index b9f9030bc79..57652f80ad1 100644 --- a/pkg/cli/job/suspend.go +++ b/pkg/cli/job/suspend.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/cobra" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) type suspendFlags struct { diff --git a/pkg/cli/job/util.go b/pkg/cli/job/util.go index 1064de71f86..6c99946f67b 100644 --- a/pkg/cli/job/util.go +++ b/pkg/cli/job/util.go @@ -29,7 +29,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vcbus "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/client/clientset/versioned" @@ -71,7 +70,7 @@ func populateResourceListV1(spec string) (v1.ResourceList, error) { return result, nil } -func createJobCommand(config *rest.Config, ns, name string, action vcbatch.Action) error { +func createJobCommand(config *rest.Config, ns, name string, action vcbus.Action) error { jobClient := versioned.NewForConfigOrDie(config) job, err := jobClient.BatchV1alpha1().Jobs(ns).Get(name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/cli/queue/operate.go b/pkg/cli/queue/operate.go index be66adab8fd..a079e301fbb 100644 --- a/pkg/cli/queue/operate.go +++ b/pkg/cli/queue/operate.go @@ -19,12 +19,12 @@ package queue import ( "fmt" - schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" - "volcano.sh/volcano/pkg/client/clientset/versioned" - "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/types" + + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" + "volcano.sh/volcano/pkg/client/clientset/versioned" ) const ( @@ -70,13 +70,13 @@ func OperateQueue() error { return fmt.Errorf("Queue name must be specified") } - var action schedulingv1alpha2.QueueAction + var action v1alpha1.Action switch operateQueueFlags.Action { case ActionOpen: - action = schedulingv1alpha2.OpenQueueAction + action = v1alpha1.OpenQueueAction case ActionClose: - action = schedulingv1alpha2.CloseQueueAction + action = v1alpha1.CloseQueueAction case ActionUpdate: if operateQueueFlags.Weight == 0 { return fmt.Errorf("When %s queue %s, weight must be specified, "+ diff --git a/pkg/cli/queue/util.go b/pkg/cli/queue/util.go index f862ac642df..1d007f9d35e 100644 --- a/pkg/cli/queue/util.go +++ b/pkg/cli/queue/util.go @@ -23,7 +23,6 @@ import ( busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" - schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/client/clientset/versioned" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,7 +43,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return clientcmd.BuildConfigFromFlags(master, kubeconfig) } -func createQueueCommand(config *rest.Config, action schedulingv1alpha2.QueueAction) error { +func createQueueCommand(config *rest.Config, action busv1alpha1.Action) error { queueClient := versioned.NewForConfigOrDie(config) queue, err := queueClient.SchedulingV1alpha2().Queues().Get(operateQueueFlags.Name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/cli/util/util.go b/pkg/cli/util/util.go index 3a73c38afb6..d606e49be3e 100644 --- a/pkg/cli/util/util.go +++ b/pkg/cli/util/util.go @@ -31,7 +31,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vcbus "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/client/clientset/versioned" @@ -94,7 +93,7 @@ func PopulateResourceListV1(spec string) (v1.ResourceList, error) { } // CreateJobCommand executes a command such as resume/suspend -func CreateJobCommand(config *rest.Config, ns, name string, action vcbatch.Action) error { +func CreateJobCommand(config *rest.Config, ns, name string, action vcbus.Action) error { jobClient := versioned.NewForConfigOrDie(config) job, err := jobClient.BatchV1alpha1().Jobs(ns).Get(name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/cli/vresume/resume.go b/pkg/cli/vresume/resume.go index 730d50b56c5..43e8ecc608e 100644 --- a/pkg/cli/vresume/resume.go +++ b/pkg/cli/vresume/resume.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/cobra" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/cli/util" ) diff --git a/pkg/cli/vsuspend/suspend.go b/pkg/cli/vsuspend/suspend.go index 1bed84e1262..3f47f788821 100644 --- a/pkg/cli/vsuspend/suspend.go +++ b/pkg/cli/vsuspend/suspend.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/cobra" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/cli/util" ) diff --git a/pkg/controllers/apis/request.go b/pkg/controllers/apis/request.go index 03e6f7bc861..7e741d77215 100644 --- a/pkg/controllers/apis/request.go +++ b/pkg/controllers/apis/request.go @@ -18,8 +18,7 @@ package apis import ( "fmt" - - batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) //Request struct @@ -29,15 +28,15 @@ type Request struct { TaskName string QueueName string - Event batch.Event + Event v1alpha1.Event ExitCode int32 - Action batch.Action + Action v1alpha1.Action JobVersion int32 } //String function returns the request in string format func (r Request) String() string { return fmt.Sprintf( - "Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d", - r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion) + "Queue: %s, Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d", + r.QueueName, r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion) } diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 22792c3ee8b..4e7919a0ee2 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -20,9 +20,6 @@ import ( "fmt" "hash" "hash/fnv" - "sync" - "time" - "k8s.io/api/core/v1" "k8s.io/api/scheduling/v1beta1" "k8s.io/apimachinery/pkg/util/wait" @@ -37,6 +34,8 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "sync" + "time" batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" @@ -331,7 +330,7 @@ func (cc *Controller) processNextReq(count uint32) bool { klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.", action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st) - if action != batchv1alpha1.SyncJobAction { + if action != busv1alpha1.SyncJobAction { cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( "Start to execute action %s ", action)) } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 335f842a08d..088829d5d1c 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -18,14 +18,13 @@ package job import ( "fmt" - "reflect" - "strconv" - "k8s.io/api/core/v1" "k8s.io/api/scheduling/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/klog" + "reflect" + "strconv" batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" bus "volcano.sh/volcano/pkg/apis/bus/v1alpha1" @@ -57,7 +56,7 @@ func (cc *Controller) addJob(obj interface{}) { Namespace: job.Namespace, JobName: job.Name, - Event: batch.OutOfSyncEvent, + Event: bus.OutOfSyncEvent, } // TODO(k82cn): if failed to add job, the cache should be refresh @@ -99,7 +98,7 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { Namespace: newJob.Namespace, JobName: newJob.Name, - Event: batch.OutOfSyncEvent, + Event: bus.OutOfSyncEvent, } key := jobhelpers.GetJobKeyByReq(&req) @@ -170,7 +169,7 @@ func (cc *Controller) addPod(obj interface{}) { Namespace: pod.Namespace, JobName: jobName, - Event: batch.OutOfSyncEvent, + Event: bus.OutOfSyncEvent, JobVersion: int32(dVersion), } @@ -243,11 +242,11 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { newPod.Namespace, newPod.Name, err) } - event := batch.OutOfSyncEvent + event := bus.OutOfSyncEvent var exitCode int32 if oldPod.Status.Phase != v1.PodFailed && newPod.Status.Phase == v1.PodFailed { - event = batch.PodFailedEvent + event = bus.PodFailedEvent // TODO: currently only one container pod is supported by volcano // Once multi containers pod is supported, update accordingly. if len(newPod.Status.ContainerStatuses) > 0 && newPod.Status.ContainerStatuses[0].State.Terminated != nil { @@ -258,7 +257,7 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { if oldPod.Status.Phase != v1.PodSucceeded && newPod.Status.Phase == v1.PodSucceeded { if cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { - event = batch.TaskCompletedEvent + event = bus.TaskCompletedEvent } } @@ -331,7 +330,7 @@ func (cc *Controller) deletePod(obj interface{}) { JobName: jobName, TaskName: taskName, - Event: batch.PodEvictedEvent, + Event: bus.PodEvictedEvent, JobVersion: int32(dVersion), } @@ -383,8 +382,8 @@ func (cc *Controller) processNextCommand() bool { req := apis.Request{ Namespace: cmd.Namespace, JobName: cmd.TargetObject.Name, - Event: batch.CommandIssuedEvent, - Action: batch.Action(cmd.Action), + Event: bus.CommandIssuedEvent, + Action: bus.Action(cmd.Action), } key := jobhelpers.GetJobKeyByReq(&req) @@ -420,7 +419,7 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) { } switch newPG.Status.Phase { case scheduling.PodGroupUnknown: - req.Event = batch.JobUnknownEvent + req.Event = bus.JobUnknownEvent } key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 4cfdb08f6e9..4429d103423 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,19 +112,19 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod return pod } -func applyPolicies(job *batch.Job, req *apis.Request) batch.Action { +func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action { if len(req.Action) != 0 { return req.Action } - if req.Event == batch.OutOfSyncEvent { - return batch.SyncJobAction + if req.Event == v1alpha1.OutOfSyncEvent { + return v1alpha1.SyncJobAction } // For all the requests triggered from discarded job resources will perform sync action instead if req.JobVersion < job.Status.Version { klog.Infof("Request %s is outdated, will perform sync instead.", req) - return batch.SyncJobAction + return v1alpha1.SyncJobAction } // Overwrite Job level policies @@ -135,7 +136,7 @@ func applyPolicies(job *batch.Job, req *apis.Request) batch.Action { policyEvents := getEventlist(policy) if len(policyEvents) > 0 && len(req.Event) > 0 { - if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, batch.AnyEvent) { + if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { return policy.Action } } @@ -155,7 +156,7 @@ func applyPolicies(job *batch.Job, req *apis.Request) batch.Action { policyEvents := getEventlist(policy) if len(policyEvents) > 0 && len(req.Event) > 0 { - if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, batch.AnyEvent) { + if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { return policy.Action } } @@ -166,10 +167,10 @@ func applyPolicies(job *batch.Job, req *apis.Request) batch.Action { } } - return batch.SyncJobAction + return v1alpha1.SyncJobAction } -func getEventlist(policy batch.LifecyclePolicy) []batch.Event { +func getEventlist(policy batch.LifecyclePolicy) []v1alpha1.Event { policyEventsList := policy.Events if len(policy.Event) > 0 { policyEventsList = append(policyEventsList, policy.Event) @@ -177,7 +178,7 @@ func getEventlist(policy batch.LifecyclePolicy) []batch.Event { return policyEventsList } -func checkEventExist(policyEvents []batch.Event, reqEvent batch.Event) bool { +func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool { for _, event := range policyEvents { if event == reqEvent { return true diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index 0fd12c8aa7a..d68e8b40954 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,9 +26,9 @@ type abortedState struct { job *apis.JobInfo } -func (as *abortedState) Execute(action vcbatch.Action) error { +func (as *abortedState) Execute(action v1alpha1.Action) error { switch action { - case vcbatch.ResumeJobAction: + case v1alpha1.ResumeJobAction: return KillJob(as.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting status.RetryCount++ diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index bd5a4feeece..4524477e827 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -18,6 +18,7 @@ package state import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" @@ -27,9 +28,9 @@ type abortingState struct { job *apis.JobInfo } -func (ps *abortingState) Execute(action vcbatch.Action) error { +func (ps *abortingState) Execute(action v1alpha1.Action) error { switch action { - case vcbatch.ResumeJobAction: + case v1alpha1.ResumeJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting status.RetryCount++ diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index 2614a8530a2..c0f828ba3e4 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,7 +26,7 @@ type completingState struct { job *apis.JobInfo } -func (ps *completingState) Execute(action vcbatch.Action) error { +func (ps *completingState) Execute(action v1alpha1.Action) error { return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { // If any "alive" pods, still in Completing phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index cd94d46916c..a1a3d8dbb25 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -18,8 +18,8 @@ package state import ( "k8s.io/api/core/v1" - vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -54,7 +54,7 @@ var ( //State interface type State interface { // Execute executes the actions based on current state. - Execute(act vcbatch.Action) error + Execute(act v1alpha1.Action) error } //NewState gets the state from the volcano job Phase diff --git a/pkg/controllers/job/state/finished.go b/pkg/controllers/job/state/finished.go index 4337e8680da..2abd47bdbca 100644 --- a/pkg/controllers/job/state/finished.go +++ b/pkg/controllers/job/state/finished.go @@ -17,7 +17,7 @@ limitations under the License. package state import ( - vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,7 +25,7 @@ type finishedState struct { job *apis.JobInfo } -func (ps *finishedState) Execute(action vcbatch.Action) error { +func (ps *finishedState) Execute(action v1alpha1.Action) error { // In finished state, e.g. Completed, always kill the whole job. return KillJob(ps.job, PodRetainPhaseSoft, nil) } diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 69524941c16..550042c5267 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,26 +26,26 @@ type pendingState struct { job *apis.JobInfo } -func (ps *pendingState) Execute(action vcbatch.Action) error { +func (ps *pendingState) Execute(action v1alpha1.Action) error { switch action { - case vcbatch.RestartJobAction: + case v1alpha1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { status.RetryCount++ status.State.Phase = vcbatch.Restarting return true }) - case vcbatch.AbortJobAction: + case v1alpha1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Aborting return true }) - case vcbatch.CompleteJobAction: + case v1alpha1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Completing return true }) - case vcbatch.TerminateJobAction: + case v1alpha1.TerminateJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Terminating return true diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index 60d2b21f8dc..fb3bdbf6e08 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,7 +26,7 @@ type restartingState struct { job *apis.JobInfo } -func (ps *restartingState) Execute(action vcbatch.Action) error { +func (ps *restartingState) Execute(action v1alpha1.Action) error { return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { // Get the maximum number of retries. maxRetry := DefaultMaxRetry diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 38f2ac00694..f87e007c6e7 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,25 +26,25 @@ type runningState struct { job *apis.JobInfo } -func (ps *runningState) Execute(action vcbatch.Action) error { +func (ps *runningState) Execute(action v1alpha1.Action) error { switch action { - case vcbatch.RestartJobAction: + case v1alpha1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting status.RetryCount++ return true }) - case vcbatch.AbortJobAction: + case v1alpha1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Aborting return true }) - case vcbatch.TerminateJobAction: + case v1alpha1.TerminateJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Terminating return true }) - case vcbatch.CompleteJobAction: + case v1alpha1.CompleteJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Completing return true diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go index eaed6b3cbbc..2fa01772be8 100644 --- a/pkg/controllers/job/state/terminating.go +++ b/pkg/controllers/job/state/terminating.go @@ -18,6 +18,7 @@ package state import ( vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,7 +26,7 @@ type terminatingState struct { job *apis.JobInfo } -func (ps *terminatingState) Execute(action vcbatch.Action) error { +func (ps *terminatingState) Execute(action v1alpha1.Action) error { return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { // If any "alive" pods, still in Terminating phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index 2a3f23afcf5..93d73eb1546 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -33,7 +33,6 @@ import ( "k8s.io/klog" busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" - schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" versionedscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" informerfactory "volcano.sh/volcano/pkg/client/informers/externalversions" @@ -41,6 +40,7 @@ import ( schedulinginformer "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" busv1alpha1lister "volcano.sh/volcano/pkg/client/listers/bus/v1alpha1" schedulinglister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2" + "volcano.sh/volcano/pkg/controllers/apis" queuestate "volcano.sh/volcano/pkg/controllers/queue/state" ) @@ -81,10 +81,10 @@ type Controller struct { // queue name -> podgroup namespace/name podGroups map[string]map[string]struct{} - syncHandler func(req *schedulingv1alpha2.QueueRequest) error + syncHandler func(req *apis.Request) error syncCommandHandler func(cmd *busv1alpha1.Command) error - enqueueQueue func(req *schedulingv1alpha2.QueueRequest) + enqueueQueue func(req *apis.Request) recorder record.EventRecorder } @@ -205,7 +205,7 @@ func (c *Controller) processNextWorkItem() bool { } defer c.queue.Done(obj) - req, ok := obj.(*schedulingv1alpha2.QueueRequest) + req, ok := obj.(*apis.Request) if !ok { klog.Errorf("%v is not a valid queue request struct.", obj) return true @@ -217,20 +217,20 @@ func (c *Controller) processNextWorkItem() bool { return true } -func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error { +func (c *Controller) handleQueue(req *apis.Request) error { startTime := time.Now() defer func() { - klog.V(4).Infof("Finished syncing queue %s (%v).", req.Name, time.Since(startTime)) + klog.V(4).Infof("Finished syncing queue %s (%v).", req.QueueName, time.Since(startTime)) }() - queue, err := c.queueLister.Get(req.Name) + queue, err := c.queueLister.Get(req.QueueName) if err != nil { if apierrors.IsNotFound(err) { - klog.V(4).Infof("Queue %s has been deleted.", req.Name) + klog.V(4).Infof("Queue %s has been deleted.", req.QueueName) return nil } - return fmt.Errorf("get queue %s failed for %v", req.Name, err) + return fmt.Errorf("get queue %s failed for %v", req.QueueName, err) } queueState := queuestate.NewState(queue) @@ -240,7 +240,7 @@ func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error { if err := queueState.Execute(req.Action); err != nil { return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s", - req.Name, err, req.Event, req.Action) + req.QueueName, err, req.Event, req.Action) } return nil @@ -258,8 +258,8 @@ func (c *Controller) handleQueueErr(err error, obj interface{}) { return } - req, _ := obj.(*schedulingv1alpha2.QueueRequest) - c.recordEventsForQueue(req.Name, v1.EventTypeWarning, string(req.Action), + req, _ := obj.(*apis.Request) + c.recordEventsForQueue(req.QueueName, v1.EventTypeWarning, string(req.Action), fmt.Sprintf("%v queue failed for %v", req.Action, err)) klog.V(2).Infof("Dropping queue request %v out of the queue for %v.", obj, err) c.queue.Forget(obj) @@ -304,10 +304,10 @@ func (c *Controller) handleCommand(cmd *busv1alpha1.Command) error { return fmt.Errorf("failed to delete command <%s/%s> for %v", cmd.Namespace, cmd.Name, err) } - req := &schedulingv1alpha2.QueueRequest{ - Name: cmd.TargetObject.Name, - Event: schedulingv1alpha2.QueueCommandIssuedEvent, - Action: schedulingv1alpha2.QueueAction(cmd.Action), + req := &apis.Request{ + QueueName: cmd.TargetObject.Name, + Event: busv1alpha1.CommandIssuedEvent, + Action: busv1alpha1.Action(cmd.Action), } c.enqueueQueue(req) diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go index c405f58f482..e211335e7ba 100644 --- a/pkg/controllers/queue/queue_controller_action.go +++ b/pkg/controllers/queue/queue_controller_action.go @@ -19,7 +19,7 @@ package queue import ( "fmt" "reflect" - + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/controllers/queue/state" @@ -87,12 +87,12 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st if queue.Spec.State != newQueue.Spec.State { if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction), + c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), fmt.Sprintf("Open queue failed for %v", err)) return err } - c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction), + c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), fmt.Sprintf("Open queue succeed")) } else { return nil @@ -112,7 +112,7 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st if queue.Status.State != newQueue.Status.State { if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction), + c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), fmt.Sprintf("Update queue status from %s to %s failed for %v", queue.Status.State, newQueue.Status.State, err)) return err @@ -130,12 +130,12 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s if queue.Spec.State != newQueue.Spec.State { if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction), + c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), fmt.Sprintf("Close queue failed for %v", err)) return err } - c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.CloseQueueAction), + c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), fmt.Sprintf("Close queue succeed")) } else { return nil @@ -156,7 +156,7 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s if queue.Status.State != newQueue.Status.State { if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction), + c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), fmt.Sprintf("Update queue status from %s to %s failed for %v", queue.Status.State, newQueue.Status.State, err)) return err diff --git a/pkg/controllers/queue/queue_controller_handler.go b/pkg/controllers/queue/queue_controller_handler.go index 7165f8a2156..009c73cfb88 100644 --- a/pkg/controllers/queue/queue_controller_handler.go +++ b/pkg/controllers/queue/queue_controller_handler.go @@ -17,26 +17,26 @@ limitations under the License. package queue import ( - busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" - schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" - "k8s.io/client-go/tools/cache" - "k8s.io/klog" + + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + "volcano.sh/volcano/pkg/controllers/apis" ) -func (c *Controller) enqueue(req *schedulingv1alpha2.QueueRequest) { +func (c *Controller) enqueue(req *apis.Request) { c.queue.Add(req) } func (c *Controller) addQueue(obj interface{}) { queue := obj.(*schedulingv1alpha2.Queue) - req := &schedulingv1alpha2.QueueRequest{ - Name: queue.Name, + req := &apis.Request{ + QueueName: queue.Name, - Event: schedulingv1alpha2.QueueOutOfSyncEvent, - Action: schedulingv1alpha2.SyncQueueAction, + Event: busv1alpha1.OutOfSyncEvent, + Action: busv1alpha1.SyncQueueAction, } c.enqueue(req) @@ -96,11 +96,11 @@ func (c *Controller) addPodGroup(obj interface{}) { } c.podGroups[pg.Spec.Queue][key] = struct{}{} - req := &schedulingv1alpha2.QueueRequest{ - Name: pg.Spec.Queue, + req := &apis.Request{ + QueueName: pg.Spec.Queue, - Event: schedulingv1alpha2.QueueOutOfSyncEvent, - Action: schedulingv1alpha2.SyncQueueAction, + Event: busv1alpha1.OutOfSyncEvent, + Action: busv1alpha1.SyncQueueAction, } c.enqueue(req) @@ -139,11 +139,11 @@ func (c *Controller) deletePodGroup(obj interface{}) { delete(c.podGroups[pg.Spec.Queue], key) - req := &schedulingv1alpha2.QueueRequest{ - Name: pg.Spec.Queue, + req := &apis.Request{ + QueueName: pg.Spec.Queue, - Event: schedulingv1alpha2.QueueOutOfSyncEvent, - Action: schedulingv1alpha2.SyncQueueAction, + Event: busv1alpha1.OutOfSyncEvent, + Action: busv1alpha1.SyncQueueAction, } c.enqueue(req) diff --git a/pkg/controllers/queue/state/closed.go b/pkg/controllers/queue/state/closed.go index 91e402d87ce..485770471d1 100644 --- a/pkg/controllers/queue/state/closed.go +++ b/pkg/controllers/queue/state/closed.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -24,14 +25,14 @@ type closedState struct { queue *v1alpha2.Queue } -func (cs *closedState) Execute(action v1alpha2.QueueAction) error { +func (cs *closedState) Execute(action v1alpha1.Action) error { switch action { - case v1alpha2.OpenQueueAction: + case v1alpha1.OpenQueueAction: return OpenQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { status.State = v1alpha2.QueueStateOpen return }) - case v1alpha2.CloseQueueAction: + case v1alpha1.CloseQueueAction: return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { status.State = v1alpha2.QueueStateClosed return diff --git a/pkg/controllers/queue/state/closing.go b/pkg/controllers/queue/state/closing.go index afcbdd6b37c..a5ec55100f0 100644 --- a/pkg/controllers/queue/state/closing.go +++ b/pkg/controllers/queue/state/closing.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -24,14 +25,14 @@ type closingState struct { queue *v1alpha2.Queue } -func (cs *closingState) Execute(action v1alpha2.QueueAction) error { +func (cs *closingState) Execute(action v1alpha1.Action) error { switch action { - case v1alpha2.OpenQueueAction: + case v1alpha1.OpenQueueAction: return OpenQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { status.State = v1alpha2.QueueStateOpen return }) - case v1alpha2.CloseQueueAction: + case v1alpha1.CloseQueueAction: return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { if len(podGroupList) == 0 { status.State = v1alpha2.QueueStateClosed diff --git a/pkg/controllers/queue/state/factory.go b/pkg/controllers/queue/state/factory.go index 07212a075e4..236f8a20ffc 100644 --- a/pkg/controllers/queue/state/factory.go +++ b/pkg/controllers/queue/state/factory.go @@ -17,13 +17,14 @@ limitations under the License. package state import ( + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) // State interface type State interface { // Execute executes the actions based on current state. - Execute(action v1alpha2.QueueAction) error + Execute(action v1alpha1.Action) error } // UpdateQueueStatusFn updates the queue status diff --git a/pkg/controllers/queue/state/open.go b/pkg/controllers/queue/state/open.go index 4d7ad898f9e..156e541df12 100644 --- a/pkg/controllers/queue/state/open.go +++ b/pkg/controllers/queue/state/open.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -24,14 +25,14 @@ type openState struct { queue *v1alpha2.Queue } -func (os *openState) Execute(action v1alpha2.QueueAction) error { +func (os *openState) Execute(action v1alpha1.Action) error { switch action { - case v1alpha2.OpenQueueAction: + case v1alpha1.OpenQueueAction: return SyncQueue(os.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { status.State = v1alpha2.QueueStateOpen return }) - case v1alpha2.CloseQueueAction: + case v1alpha1.CloseQueueAction: return CloseQueue(os.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { if len(podGroupList) == 0 { status.State = v1alpha2.QueueStateClosed diff --git a/pkg/controllers/queue/state/unknown.go b/pkg/controllers/queue/state/unknown.go index 89bf6bebd96..52b3230d6fc 100644 --- a/pkg/controllers/queue/state/unknown.go +++ b/pkg/controllers/queue/state/unknown.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + "volcano.sh/volcano/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -24,14 +25,14 @@ type unknownState struct { queue *v1alpha2.Queue } -func (us *unknownState) Execute(action v1alpha2.QueueAction) error { +func (us *unknownState) Execute(action v1alpha1.Action) error { switch action { - case v1alpha2.OpenQueueAction: + case v1alpha1.OpenQueueAction: return OpenQueue(us.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { status.State = v1alpha2.QueueStateOpen return }) - case v1alpha2.CloseQueueAction: + case v1alpha1.CloseQueueAction: return CloseQueue(us.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) { if len(podGroupList) == 0 { status.State = v1alpha2.QueueStateClosed diff --git a/pkg/webhooks/admission/jobs/validate/util.go b/pkg/webhooks/admission/jobs/validate/util.go index 3c986bb555f..8ffc52ba11a 100644 --- a/pkg/webhooks/admission/jobs/validate/util.go +++ b/pkg/webhooks/admission/jobs/validate/util.go @@ -25,34 +25,35 @@ import ( "k8s.io/kubernetes/pkg/apis/core/validation" batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" ) // policyEventMap defines all policy events and whether to allow external use -var policyEventMap = map[batchv1alpha1.Event]bool{ - batchv1alpha1.AnyEvent: true, - batchv1alpha1.PodFailedEvent: true, - batchv1alpha1.PodEvictedEvent: true, - batchv1alpha1.JobUnknownEvent: true, - batchv1alpha1.TaskCompletedEvent: true, - batchv1alpha1.OutOfSyncEvent: false, - batchv1alpha1.CommandIssuedEvent: false, +var policyEventMap = map[busv1alpha1.Event]bool{ + busv1alpha1.AnyEvent: true, + busv1alpha1.PodFailedEvent: true, + busv1alpha1.PodEvictedEvent: true, + busv1alpha1.JobUnknownEvent: true, + busv1alpha1.TaskCompletedEvent: true, + busv1alpha1.OutOfSyncEvent: false, + busv1alpha1.CommandIssuedEvent: false, } // policyActionMap defines all policy actions and whether to allow external use -var policyActionMap = map[batchv1alpha1.Action]bool{ - batchv1alpha1.AbortJobAction: true, - batchv1alpha1.RestartJobAction: true, - batchv1alpha1.RestartTaskAction: true, - batchv1alpha1.TerminateJobAction: true, - batchv1alpha1.CompleteJobAction: true, - batchv1alpha1.ResumeJobAction: true, - batchv1alpha1.SyncJobAction: false, - batchv1alpha1.EnqueueAction: false, +var policyActionMap = map[busv1alpha1.Action]bool{ + busv1alpha1.AbortJobAction: true, + busv1alpha1.RestartJobAction: true, + busv1alpha1.RestartTaskAction: true, + busv1alpha1.TerminateJobAction: true, + busv1alpha1.CompleteJobAction: true, + busv1alpha1.ResumeJobAction: true, + busv1alpha1.SyncJobAction: false, + busv1alpha1.EnqueueAction: false, } func validatePolicies(policies []batchv1alpha1.LifecyclePolicy, fldPath *field.Path) error { var err error - policyEvents := map[batchv1alpha1.Event]struct{}{} + policyEvents := map[busv1alpha1.Event]struct{}{} exitCodes := map[int32]struct{}{} for _, policy := range policies { @@ -107,14 +108,14 @@ func validatePolicies(policies []batchv1alpha1.LifecyclePolicy, fldPath *field.P } } - if _, found := policyEvents[batchv1alpha1.AnyEvent]; found && len(policyEvents) > 1 { + if _, found := policyEvents[busv1alpha1.AnyEvent]; found && len(policyEvents) > 1 { err = multierror.Append(err, fmt.Errorf("if there's * here, no other policy should be here")) } return err } -func getEventList(policy batchv1alpha1.LifecyclePolicy) []batchv1alpha1.Event { +func getEventList(policy batchv1alpha1.LifecyclePolicy) []busv1alpha1.Event { policyEventsList := policy.Events if len(policy.Event) > 0 { policyEventsList = append(policyEventsList, policy.Event) @@ -123,9 +124,9 @@ func getEventList(policy batchv1alpha1.LifecyclePolicy) []batchv1alpha1.Event { return uniquePolicyEventlist } -func removeDuplicates(EventList []batchv1alpha1.Event) []batchv1alpha1.Event { - keys := make(map[batchv1alpha1.Event]bool) - list := []batchv1alpha1.Event{} +func removeDuplicates(EventList []busv1alpha1.Event) []busv1alpha1.Event { + keys := make(map[busv1alpha1.Event]bool) + list := []busv1alpha1.Event{} for _, val := range EventList { if _, value := keys[val]; !value { keys[val] = true @@ -135,8 +136,8 @@ func removeDuplicates(EventList []batchv1alpha1.Event) []batchv1alpha1.Event { return list } -func getValidEvents() []batchv1alpha1.Event { - var events []batchv1alpha1.Event +func getValidEvents() []busv1alpha1.Event { + var events []busv1alpha1.Event for e, allow := range policyEventMap { if allow { events = append(events, e) @@ -146,8 +147,8 @@ func getValidEvents() []batchv1alpha1.Event { return events } -func getValidActions() []batchv1alpha1.Action { - var actions []batchv1alpha1.Action +func getValidActions() []busv1alpha1.Action { + var actions []busv1alpha1.Action for a, allow := range policyActionMap { if allow { actions = append(actions, a)