Skip to content

Commit

Permalink
Merge pull request #1 from k82cn/update_job_api
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaus Ma authored Jan 14, 2019
2 parents 18defb3 + 03c8f04 commit fdc7770
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 66 deletions.
69 changes: 64 additions & 5 deletions config/crds/batch_v1alpha1_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,70 @@ spec:
description: Specification of the desired behavior of a cron job, including
the minAvailable
properties:
input:
description: The volume mount for input of Job
properties:
claim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- name
- mountPath
type: object
minAvailable:
description: The minimal available pods to run for this Job
format: int32
type: integer
output:
description: The volume mount for output of Job
properties:
claim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- name
- mountPath
type: object
policies:
description: Specifies the default lifecycle of tasks
items:
Expand All @@ -56,9 +116,12 @@ spec:
description: SchedulerName is the default value of `taskSpecs.template.spec.schedulerName`.
type: string
taskSpecs:
description: TaskSpecs specifies the task specification of Job
description: Tasks specifies the task specification of Job
items:
properties:
name:
description: Name specifies the name of tasks
type: string
policies:
description: Specifies the lifecycle of task
items:
Expand All @@ -84,10 +147,6 @@ spec:
in Job
format: int32
type: integer
selector:
description: A label query over pods that should match the pod
count. Normally, the system sets this field for you.
type: object
template:
description: Specifies the pod that will be created for this TaskSpec
when executing a Job
Expand Down
72 changes: 50 additions & 22 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

import (
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -47,31 +47,59 @@ type JobSpec struct {
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`

// TaskSpecs specifies the task specification of Job
// The volume mount for input of Job
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`

// The volume mount for output of Job
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`

// Tasks specifies the task specification of Job
// +optional
TaskSpecs []TaskSpec `json:"taskSpecs,omitempty" protobuf:"bytes,3,opt,name=taskSpecs"`
Tasks []TaskSpec `json:"taskSpecs,omitempty" protobuf:"bytes,5,opt,name=taskSpecs"`

// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"`
}

// VolumeSpec defines the specification of Volume, e.g. PVC
type VolumeSpec struct {
v1.VolumeMount `json:",inline"`

// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
}

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
PodFailed Event = "PodFailed"
PodEvicted Event = "PodEvicted"
JobUnschedulable Event = "Unschedulable"
// AllEvent means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// JobUnschedulableEvent is triggered if part of pod can be scheduled
// when gang-scheduling enabled
JobUnschedulableEvent Event = "Unschedulable"
)

// Action is the action that Job controller will take according to the event.
type Action string

const (
AbortJob Action = "AbortJob"
RestartJob Action = "RestartJob"
RestartTask Action = "RestartTask"
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work togther with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
)

// LifecyclePolicy specifies the lifecycle and error handling of task and job.
Expand All @@ -96,10 +124,8 @@ type LifecyclePolicy struct {

// TaskSpec specifies the task specification of Job
type TaskSpec struct {
// A label query over pods that should match the pod count.
// Normally, the system sets this field for you.
// +optional
Selector *metav1.LabelSelector `json:"selector,omitempty" protobuf:"bytes,1,opt,name=selector"`
// Name specifies the name of tasks
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`

// Replicas specifies the replicas of this TaskSpec in Job
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
Expand All @@ -116,14 +142,16 @@ type TaskSpec struct {
type JobPhase string

const (
Pending JobPhase = "Pending"
Aborted JobPhase = "Aborted"
Running JobPhase = "Running"
Restarting JobPhase = "Restarting"
Completed JobPhase = "Completed"
Failed JobPhase = "Failed"
Error JobPhase = "Error"
Unschedulable JobPhase = "Unschedulable"
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)

// JobConditionType is a valid value for JobCondition.Type
Expand Down
44 changes: 36 additions & 8 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 2 additions & 18 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ limitations under the License.
package helpers

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"

vulcanv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
)

var JobKind = vulcanv1.SchemeGroupVersion.WithKind("Job")
var JobKind = vkv1.SchemeGroupVersion.WithKind("Job")

func GetController(obj interface{}) types.UID {
accessor, err := meta.Accessor(obj)
Expand All @@ -43,15 +39,3 @@ func GetController(obj interface{}) types.UID {

return ""
}

func GenerateUUID() string {
id := uuid.NewUUID()

return fmt.Sprintf("%s", id)
}

func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
}
2 changes: 1 addition & 1 deletion pkg/cli/job/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func ListJobs() error {
"Name", "Creation", "Replicas", "Min", "Pending", "Running", "Succeeded", "Failed")
for _, job := range jobs.Items {
replicas := int32(0)
for _, ts := range job.Spec.TaskSpecs {
for _, ts := range job.Spec.Tasks {
replicas += ts.Replicas
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/cli/job/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vuclanapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
vkapi "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
"hpw.cloud/volcano/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -63,20 +63,15 @@ func RunJob() error {
return err
}

job := &vuclanapi.Job{
job := &vkapi.Job{
ObjectMeta: metav1.ObjectMeta{
Name: launchJobFlags.Name,
Namespace: launchJobFlags.Namespace,
},
Spec: vuclanapi.JobSpec{
Spec: vkapi.JobSpec{
MinAvailable: int32(launchJobFlags.MinAvailable),
TaskSpecs: []vuclanapi.TaskSpec{
Tasks: []vkapi.TaskSpec{
{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
jobName: launchJobFlags.Name,
},
},
Replicas: int32(launchJobFlags.Replicas),

Template: v1.PodTemplateSpec{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (cc *Controller) manageJob(job *vkapi.Job, podsMap map[string]map[string]*v

var running, pending, succeeded, failed int32

for _, ts := range job.Spec.TaskSpecs {
for _, ts := range job.Spec.Tasks {
name := ts.Template.Name
// TODO(k82cn): the template name should be set in default func.
if len(name) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func validate(job *vkapi.Job) error {
tsNames := map[string]string{}

for _, ts := range job.Spec.TaskSpecs {
for _, ts := range job.Spec.Tasks {
if _, found := tsNames[ts.Template.Name]; found {
return fmt.Errorf("duplicated TaskSpec")
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func createJob(context *context, jobSpec *jobSpec) (*vkv1.Job) {
},
}

job.Spec.TaskSpecs = append(job.Spec.TaskSpecs, ts)
job.Spec.Tasks = append(job.Spec.Tasks, ts)

min += task.min
}
Expand Down

0 comments on commit fdc7770

Please sign in to comment.