Skip to content

Commit

Permalink
Merge pull request volcano-sh#2 from k82cn/update_job_api
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaus Ma authored Jan 14, 2019
2 parents fdc7770 + d3ef54e commit cbe6397
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 356 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Volcano

![travis](https://travis-ci.com/k82cn/volcano.svg?token=Qogbzw9M3Q6qBwA56otg&branch=master)

A Kubernetes-based system for high performance workload.
24 changes: 13 additions & 11 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ type VolumeSpec struct {
v1.VolumeMount `json:",inline"`

// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
}

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
// AllEvent means all event
AllEvents Event = "*"
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
PodEvictedEvent Event = "PodEvicted"
// JobUnschedulableEvent is triggered if part of pod can be scheduled
// when gang-scheduling enabled
JobUnschedulableEvent Event = "Unschedulable"
Expand All @@ -91,12 +91,12 @@ type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work togther with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
Expand Down Expand Up @@ -143,13 +143,15 @@ type JobPhase string

const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
Pending JobPhase = "Pending"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarting
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
Completed JobPhase = "Completed"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

244 changes: 0 additions & 244 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ limitations under the License.
package job

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -36,7 +32,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
Expand Down Expand Up @@ -181,242 +176,3 @@ func (cc *Controller) worker() {
cc.eventQueue.Add(job)
}
}

func (cc *Controller) syncJob(j *vkapi.Job) error {
job, err := cc.jobLister.Jobs(j.Namespace).Get(j.Name)
if err != nil {
if apierrors.IsNotFound(err) {
glog.V(3).Infof("Job has been deleted: %v", j.Name)
return nil
}
return err
}

pods, err := cc.getPodsForJob(job)
if err != nil {
return err
}

return cc.manageJob(job, pods)
}

func (cc *Controller) getPodsForJob(job *vkapi.Job) (map[string]map[string]*v1.Pod, error) {
pods := map[string]map[string]*v1.Pod{}

// TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface.
ps, err := cc.podListr.Pods(job.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}

for _, pod := range ps {
if !metav1.IsControlledBy(pod, job) {
continue
}
if len(pod.Annotations) == 0 {
glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name)
continue
}
tsName, found := pod.Annotations[vkapi.TaskSpecKey]
if found {
// Hash by TaskSpec.Template.Name
if _, exist := pods[tsName]; !exist {
pods[tsName] = make(map[string]*v1.Pod)
}
pods[tsName][pod.Name] = pod
}
}

return pods, nil
}

// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v1.Pod) error {
var err error

if job.DeletionTimestamp != nil {
glog.Infof("Job <%s/%s> is terminating, skip management process.",
job.Namespace, job.Name)
return nil
}

glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name)

// TODO(k82cn): add WebHook to validate job.
if err := validate(job); err != nil {
glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err)
}

// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
pg := &kbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: kbv1.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
},
}

if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)

return e
}
}

if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: job.Name,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
vkapi.JobNameKey: job.Name,
vkapi.JobNamespaceKey: job.Namespace,
},
},
}

if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)

return e
}
}

var podToCreate []*v1.Pod
var podToDelete []*v1.Pod

var running, pending, succeeded, failed int32

for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
name = vkapi.DefaultTaskSpec
}

pods, found := podsMap[name]
if !found {
pods = map[string]*v1.Pod{}
}

for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf("%s-%s-%d", job.Name, name, i)
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, &ts.Template, i)
podToCreate = append(podToCreate, newPod)
} else {
switch pod.Status.Phase {
case v1.PodPending:
pending ++
case v1.PodRunning:
running++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}

for _, pod := range pods {
podToDelete = append(podToDelete, pod)
}

var creationErrs []error
waitCreationGroup := sync.WaitGroup{}
waitCreationGroup.Add(len(podToCreate))
for _, pod := range podToCreate {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
} else {
glog.V(3).Infof("Created Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitCreationGroup.Wait()

if len(creationErrs) != 0 {
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}

// Delete unnecessary pods.
var deletionErrs []error
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err != nil {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
deletionErrs = append(deletionErrs, err)
} else {
glog.V(3).Infof("Deleted Task <%d> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
waitDeletionGroup.Wait()

if len(deletionErrs) != 0 {
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
}

job.Status = vkapi.JobStatus{
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
MinAvailable: int32(job.Spec.MinAvailable),
}

// TODO(k82cn): replaced it with `UpdateStatus` or `Patch`
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}

return err
}
Loading

0 comments on commit cbe6397

Please sign in to comment.