diff --git a/README.md b/README.md index 5441690c72..960dbc4f83 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # Volcano +![travis](https://travis-ci.com/k82cn/volcano.svg?token=Qogbzw9M3Q6qBwA56otg&branch=master) + A Kubernetes-based system for high performance workload. diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index bd8b0a5ee5..92c1e211f1 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -67,7 +67,7 @@ type VolumeSpec struct { v1.VolumeMount `json:",inline"` // VolumeClaim defines the PVC used by the VolumeMount. - VolumeClaim *v1.PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"` + VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"` } // Event represent the phase of Job, e.g. pod-failed. @@ -75,11 +75,11 @@ type Event string const ( // AllEvent means all event - AllEvents Event = "*" + AllEvents Event = "*" // PodFailedEvent is triggered if Pod was failed - PodFailedEvent Event = "PodFailed" + PodFailedEvent Event = "PodFailed" // PodEvictedEvent is triggered if Pod was deleted - PodEvictedEvent Event = "PodEvicted" + PodEvictedEvent Event = "PodEvicted" // JobUnschedulableEvent is triggered if part of pod can be scheduled // when gang-scheduling enabled JobUnschedulableEvent Event = "Unschedulable" @@ -91,12 +91,12 @@ type Action string const ( // AbortJobAction if this action is set, the whole job will be aborted: // all Pod of Job will be evicted, and no Pod will be recreated - AbortJobAction Action = "AbortJob" + AbortJobAction Action = "AbortJob" // RestartJobAction if this action is set, the whole job will be restarted - RestartJobAction Action = "RestartJob" + RestartJobAction Action = "RestartJob" // RestartTaskAction if this action is set, only the task will be restarted; default action. // This action can not work togther with job level events, e.g. JobUnschedulable - RestartTaskAction Action = "RestartTask" + RestartTaskAction Action = "RestartTask" // TerminateJobAction if this action is set, the whole job wil be terminated // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated. TerminateJobAction Action = "TerminateJob" @@ -143,13 +143,15 @@ type JobPhase string const ( // Pending is the phase that job is pending in the queue, waiting for scheduling decision - Pending JobPhase = "Pending" + Pending JobPhase = "Pending" // Aborted is the phase that job is aborted by user or error handling - Aborted JobPhase = "Aborted" + Aborted JobPhase = "Aborted" // Running is the phase that minimal available tasks of Job are running - Running JobPhase = "Running" + Running JobPhase = "Running" + // Restarting is the phase that the Job is restarting + Restarting JobPhase = "Restarting" // Completed is the phase that all tasks of Job are completed successfully - Completed JobPhase = "Completed" + Completed JobPhase = "Completed" // Teriminated is the phase that the job is finished unexpected, e.g. events Teriminated JobPhase = "Terminated" ) diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 4ec8bef8f4..aaf8760431 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -219,7 +219,7 @@ func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) { in.VolumeMount.DeepCopyInto(&out.VolumeMount) if in.VolumeClaim != nil { in, out := &in.VolumeClaim, &out.VolumeClaim - *out = new(corev1.PersistentVolumeClaim) + *out = new(corev1.PersistentVolumeClaimSpec) (*in).DeepCopyInto(*out) } return diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 20f55d868e..3418ac2a52 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -17,16 +17,12 @@ limitations under the License. package job import ( - "fmt" - "sync" "time" "github.com/golang/glog" "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -36,7 +32,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions" kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" @@ -181,242 +176,3 @@ func (cc *Controller) worker() { cc.eventQueue.Add(job) } } - -func (cc *Controller) syncJob(j *vkapi.Job) error { - job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name) - if err != nil { - if apierrors.IsNotFound(err) { - glog.V(3).Infof("Job has been deleted: %v", j.Name) - return nil - } - return err - } - - pods, err := cc.getPodsForJob(job) - if err != nil { - return err - } - - return cc.manageJob(job, pods) -} - -func (cc *Controller) getPodsForJob(job *vkapi.Job) (map[string]map[string]*v1.Pod, error) { - pods := map[string]map[string]*v1.Pod{} - - // TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface. - ps, err := cc.podListr.Pods(job.Namespace).List(labels.Everything()) - if err != nil { - return nil, err - } - - for _, pod := range ps { - if !metav1.IsControlledBy(pod, job) { - continue - } - if len(pod.Annotations) == 0 { - glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name) - continue - } - tsName, found := pod.Annotations[vkapi.TaskSpecKey] - if found { - // Hash by TaskSpec.Template.Name - if _, exist := pods[tsName]; !exist { - pods[tsName] = make(map[string]*v1.Pod) - } - pods[tsName][pod.Name] = pod - } - } - - return pods, nil -} - -// manageJob is the core method responsible for managing the number of running -// pods according to what is specified in the job.Spec. -func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v1.Pod) error { - var err error - - if job.DeletionTimestamp != nil { - glog.Infof("Job <%s/%s> is terminating, skip management process.", - job.Namespace, job.Name) - return nil - } - - glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name) - - // TODO(k82cn): add WebHook to validate job. - if err := validate(job); err != nil { - glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err) - } - - // If PodGroup does not exist, create one for Job. - if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - pg := &kbv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: job.Name, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: kbv1.PodGroupSpec{ - MinMember: job.Spec.MinAvailable, - }, - } - - if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil { - glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - - return e - } - } - - if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - - svc := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: job.Name, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - }, - Spec: v1.ServiceSpec{ - ClusterIP: "None", - Selector: map[string]string{ - vkapi.JobNameKey: job.Name, - vkapi.JobNamespaceKey: job.Namespace, - }, - }, - } - - if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil { - glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - - return e - } - } - - var podToCreate []*v1.Pod - var podToDelete []*v1.Pod - - var running, pending, succeeded, failed int32 - - for _, ts := range job.Spec.Tasks { - name := ts.Template.Name - // TODO(k82cn): the template name should be set in default func. - if len(name) == 0 { - name = vkapi.DefaultTaskSpec - } - - pods, found := podsMap[name] - if !found { - pods = map[string]*v1.Pod{} - } - - for i := 0; i < int(ts.Replicas); i++ { - podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i) - if pod, found := pods[podName]; !found { - newPod := createJobPod(job, &ts.Template, i) - podToCreate = append(podToCreate, newPod) - } else { - switch pod.Status.Phase { - case v1.PodPending: - pending ++ - case v1.PodRunning: - running++ - case v1.PodSucceeded: - succeeded++ - case v1.PodFailed: - failed++ - } - delete(pods, podName) - } - } - - for _, pod := range pods { - podToDelete = append(podToDelete, pod) - } - - var creationErrs []error - waitCreationGroup := sync.WaitGroup{} - waitCreationGroup.Add(len(podToCreate)) - for _, pod := range podToCreate { - go func(pod *v1.Pod) { - defer waitCreationGroup.Done() - _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod) - if err != nil { - // Failed to create Pod, waitCreationGroup a moment and then create it again - // This is to ensure all podsMap under the same Job created - // So gang-scheduling could schedule the Job successfully - glog.Errorf("Failed to create pod %s for Job %s, err %#v", - pod.Name, job.Name, err) - creationErrs = append(creationErrs, err) - } else { - glog.V(3).Infof("Created Task <%d> of Job <%s/%s>", - pod.Name, job.Namespace, job.Name) - } - }(pod) - } - waitCreationGroup.Wait() - - if len(creationErrs) != 0 { - return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) - } - - // Delete unnecessary pods. - var deletionErrs []error - waitDeletionGroup := sync.WaitGroup{} - waitDeletionGroup.Add(len(podToDelete)) - for _, pod := range podToDelete { - go func(pod *v1.Pod) { - defer waitDeletionGroup.Done() - err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) - if err != nil { - // Failed to create Pod, waitCreationGroup a moment and then create it again - // This is to ensure all podsMap under the same Job created - // So gang-scheduling could schedule the Job successfully - glog.Errorf("Failed to delete pod %s for Job %s, err %#v", - pod.Name, job.Name, err) - deletionErrs = append(deletionErrs, err) - } else { - glog.V(3).Infof("Deleted Task <%d> of Job <%s/%s>", - pod.Name, job.Namespace, job.Name) - } - }(pod) - } - waitDeletionGroup.Wait() - - if len(deletionErrs) != 0 { - return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete)) - } - } - - job.Status = vkapi.JobStatus{ - Pending: pending, - Running: running, - Succeeded: succeeded, - Failed: failed, - MinAvailable: int32(job.Spec.MinAvailable), - } - - // TODO(k82cn): replaced it with `UpdateStatus` or `Patch` - if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { - glog.Errorf("Failed to update status of Job %v/%v: %v", - job.Namespace, job.Name, err) - return err - } - - return err -} diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go new file mode 100644 index 0000000000..30636d7770 --- /dev/null +++ b/pkg/controllers/job/job_controller_actions.go @@ -0,0 +1,273 @@ +/* +Copyright 2019 The Vulcan Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "sync" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + + vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/apis/helpers" +) + +func (cc *Controller) syncJob(j *vkapi.Job) error { + job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name) + if err != nil { + if apierrors.IsNotFound(err) { + glog.V(3).Infof("Job has been deleted: %v", j.Name) + return nil + } + return err + } + + pods, err := cc.getPodsForJob(job) + if err != nil { + return err + } + + return cc.manageJob(job, pods) +} + +func (cc *Controller) getPodsForJob(job *vkapi.Job) (map[string]map[string]*v1.Pod, error) { + pods := map[string]map[string]*v1.Pod{} + + // TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface. + ps, err := cc.podListr.Pods(job.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for _, pod := range ps { + if !metav1.IsControlledBy(pod, job) { + continue + } + if len(pod.Annotations) == 0 { + glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name) + continue + } + tsName, found := pod.Annotations[vkapi.TaskSpecKey] + if found { + // Hash by TaskSpec.Template.Name + if _, exist := pods[tsName]; !exist { + pods[tsName] = make(map[string]*v1.Pod) + } + pods[tsName][pod.Name] = pod + } + } + + return pods, nil +} + +// manageJob is the core method responsible for managing the number of running +// pods according to what is specified in the job.Spec. +func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v1.Pod) error { + var err error + + if job.DeletionTimestamp != nil { + glog.Infof("Job <%s/%s> is terminating, skip management process.", + job.Namespace, job.Name) + return nil + } + + glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name) + + // TODO(k82cn): add WebHook to validate job. + if err := validate(job); err != nil { + glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err) + } + + // If PodGroup does not exist, create one for Job. + if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + pg := &kbv1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: job.Name, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: kbv1.PodGroupSpec{ + MinMember: job.Spec.MinAvailable, + }, + } + + if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil { + glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + + return e + } + } + + if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: job.Name, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + }, + Spec: v1.ServiceSpec{ + ClusterIP: "None", + Selector: map[string]string{ + vkapi.JobNameKey: job.Name, + vkapi.JobNamespaceKey: job.Namespace, + }, + }, + } + + if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil { + glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + + return e + } + } + + var podToCreate []*v1.Pod + var podToDelete []*v1.Pod + + var running, pending, succeeded, failed int32 + + for _, ts := range job.Spec.Tasks { + name := ts.Template.Name + // TODO(k82cn): the template name should be set in default func. + if len(name) == 0 { + name = vkapi.DefaultTaskSpec + } + + pods, found := podsMap[name] + if !found { + pods = map[string]*v1.Pod{} + } + + for i := 0; i < int(ts.Replicas); i++ { + podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i) + if pod, found := pods[podName]; !found { + newPod := createJobPod(job, &ts.Template, i) + podToCreate = append(podToCreate, newPod) + } else { + switch pod.Status.Phase { + case v1.PodPending: + pending++ + case v1.PodRunning: + running++ + case v1.PodSucceeded: + succeeded++ + case v1.PodFailed: + failed++ + } + delete(pods, podName) + } + } + + for _, pod := range pods { + podToDelete = append(podToDelete, pod) + } + + var creationErrs []error + waitCreationGroup := sync.WaitGroup{} + waitCreationGroup.Add(len(podToCreate)) + for _, pod := range podToCreate { + go func(pod *v1.Pod) { + defer waitCreationGroup.Done() + _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod) + if err != nil { + // Failed to create Pod, waitCreationGroup a moment and then create it again + // This is to ensure all podsMap under the same Job created + // So gang-scheduling could schedule the Job successfully + glog.Errorf("Failed to create pod %s for Job %s, err %#v", + pod.Name, job.Name, err) + creationErrs = append(creationErrs, err) + } else { + glog.V(3).Infof("Created Task <%d> of Job <%s/%s>", + pod.Name, job.Namespace, job.Name) + } + }(pod) + } + waitCreationGroup.Wait() + + if len(creationErrs) != 0 { + return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) + } + + // Delete unnecessary pods. + var deletionErrs []error + waitDeletionGroup := sync.WaitGroup{} + waitDeletionGroup.Add(len(podToDelete)) + for _, pod := range podToDelete { + go func(pod *v1.Pod) { + defer waitDeletionGroup.Done() + err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) + if err != nil { + // Failed to create Pod, waitCreationGroup a moment and then create it again + // This is to ensure all podsMap under the same Job created + // So gang-scheduling could schedule the Job successfully + glog.Errorf("Failed to delete pod %s for Job %s, err %#v", + pod.Name, job.Name, err) + deletionErrs = append(deletionErrs, err) + } else { + glog.V(3).Infof("Deleted Task <%d> of Job <%s/%s>", + pod.Name, job.Namespace, job.Name) + } + }(pod) + } + waitDeletionGroup.Wait() + + if len(deletionErrs) != 0 { + return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete)) + } + } + + job.Status = vkapi.JobStatus{ + Pending: pending, + Running: running, + Succeeded: succeeded, + Failed: failed, + MinAvailable: int32(job.Spec.MinAvailable), + } + + // TODO(k82cn): replaced it with `UpdateStatus` or `Patch` + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } + + return err +} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 30cd983f15..f9571549d2 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -21,7 +21,7 @@ import ( "github.com/golang/glog" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index a6d6821ca7..6ba4183f26 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -19,10 +19,17 @@ package job import ( "fmt" - vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + + vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/apis/helpers" ) -func validate(job *vkapi.Job) error { +func validate(job *vkv1.Job) error { tsNames := map[string]string{} for _, ts := range job.Spec.Tasks { @@ -35,3 +42,59 @@ func validate(job *vkapi.Job) error { return nil } + +func eventKey(obj interface{}) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + return string(accessor.GetUID()), nil +} + +func createJobPod(job *vkv1.Job, template *corev1.PodTemplateSpec, ix int) *corev1.Pod { + templateCopy := template.DeepCopy() + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix), + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), + }, + Labels: templateCopy.Labels, + Annotations: templateCopy.Annotations, + }, + Spec: templateCopy.Spec, + } + + if len(pod.Annotations) == 0 { + pod.Annotations = make(map[string]string) + } + + tsKey := templateCopy.Name + if len(tsKey) == 0 { + tsKey = vkv1.DefaultTaskSpec + } + pod.Annotations[vkv1.TaskSpecKey] = tsKey + + if len(pod.Annotations) == 0 { + pod.Annotations = make(map[string]string) + } + + pod.Annotations[kbapi.GroupNameAnnotationKey] = job.Name + + if len(pod.Labels) == 0 { + pod.Labels = make(map[string]string) + } + + // Set pod labels for Service. + pod.Labels[vkv1.JobNameKey] = job.Name + pod.Labels[vkv1.JobNamespaceKey] = job.Namespace + + // we fill the schedulerName in the pod definition with the one specified in the QJ template + if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" { + pod.Spec.SchedulerName = job.Spec.SchedulerName + } + return pod +} diff --git a/pkg/controllers/job/job_utils.go b/pkg/controllers/job/job_utils.go deleted file mode 100644 index 7d287c1dff..0000000000 --- a/pkg/controllers/job/job_utils.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2018 The Volcano Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package job - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - - vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" - "hpw.cloud/volcano/pkg/apis/helpers" -) - -// filterPods returns pods based on their phase. -func filterPods(pods []*corev1.Pod, phase corev1.PodPhase) int { - result := 0 - for i := range pods { - if phase == pods[i].Status.Phase { - result++ - } - } - return result -} - -func eventKey(obj interface{}) (string, error) { - accessor, err := meta.Accessor(obj) - if err != nil { - return "", err - } - - return string(accessor.GetUID()), nil -} - -func createJobPod(job *vkv1.Job, template *corev1.PodTemplateSpec, ix int) *corev1.Pod { - templateCopy := template.DeepCopy() - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s-%d", job.Name, template.Name, ix), - Namespace: job.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, - Labels: templateCopy.Labels, - Annotations: templateCopy.Annotations, - }, - Spec: templateCopy.Spec, - } - - if len(pod.Annotations) == 0 { - pod.Annotations = make(map[string]string) - } - - tsKey := templateCopy.Name - if len(tsKey) == 0 { - tsKey = vkv1.DefaultTaskSpec - } - pod.Annotations[vkv1.TaskSpecKey] = tsKey - - if len(pod.Annotations) == 0 { - pod.Annotations = make(map[string]string) - } - - pod.Annotations[kbapi.GroupNameAnnotationKey] = job.Name - - if len(pod.Labels) == 0 { - pod.Labels = make(map[string]string) - } - - // Set pod labels for Service. - pod.Labels[vkv1.JobNameKey] = job.Name - pod.Labels[vkv1.JobNamespaceKey] = job.Namespace - - // we fill the schedulerName in the pod definition with the one specified in the QJ template - if job.Spec.SchedulerName != "" && pod.Spec.SchedulerName == "" { - pod.Spec.SchedulerName = job.Spec.SchedulerName - } - return pod -}