Skip to content

Commit

Permalink
Refactor events/actions.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Jan 28, 2020
1 parent eb0521f commit 666e470
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 137 deletions.
61 changes: 5 additions & 56 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package v1alpha1
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/common"
)

// +genclient
Expand Down Expand Up @@ -118,76 +120,23 @@ const (
JobStatusError JobEvent = "JobStatusError"
)

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
// AnyEvent means all event
AnyEvent Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// JobUnknownEvent These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"

// Note: events below are used internally, should not be used by users.

// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
)

// Action is the action that Job controller will take according to the event.
type Action string

const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"

// Note: actions below are only used internally, should not be used by users.

// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
// EnqueueAction is the action to sync Job inqueue status.
EnqueueAction Action = "EnqueueJob"
)

// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
// The action that will be taken to the PodGroup according to Event.
// One of "Restart", "None".
// Default to None.
// +optional
Action Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"`
Action common.Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"`

// The Event recorded by scheduler; the controller takes actions
// according to this Event.
// +optional
Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`
Event common.Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`

// The Events recorded by scheduler; the controller takes actions
// according to this Events.
// +optional
Events []Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"`
Events []common.Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"`

// The exit code of the pod container, controller will take action
// according to this code.
Expand Down
32 changes: 0 additions & 32 deletions pkg/apis/scheduling/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,6 @@ const (
NotEnoughPodsReason string = "NotEnoughTasks"
)

// QueueEvent represent the phase of queue
type QueueEvent string

const (
// QueueOutOfSyncEvent is triggered if PodGroup/Queue were updated
QueueOutOfSyncEvent QueueEvent = "OutOfSync"
// QueueCommandIssuedEvent is triggered if a command is raised by user
QueueCommandIssuedEvent QueueEvent = "CommandIssued"
)

// QueueAction is the action that queue controller will take according to the event.
type QueueAction string

const (
// SyncQueueAction is the action to sync queue status.
SyncQueueAction QueueAction = "SyncQueue"
// OpenQueueAction is the action to open queue
OpenQueueAction QueueAction = "OpenQueue"
// CloseQueueAction is the action to close queue
CloseQueueAction QueueAction = "CloseQueue"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down Expand Up @@ -280,13 +258,3 @@ type QueueList struct {
Items []Queue `json:"items" protobuf:"bytes,2,rep,name=items"`
}

// QueueRequest struct
type QueueRequest struct {
// Name is queue name
Name string

// Event is event of queue
Event QueueEvent
// Action is action to be performed
Action QueueAction
}
10 changes: 5 additions & 5 deletions pkg/controllers/apis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package apis
import (
"fmt"

batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/common"
)

//Request struct
Expand All @@ -29,15 +29,15 @@ type Request struct {
TaskName string
QueueName string

Event batch.Event
Event common.Event
ExitCode int32
Action batch.Action
Action common.Action
JobVersion int32
}

//String function returns the request in string format
func (r Request) String() string {
return fmt.Sprintf(
"Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
"Queue: %s, Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.QueueName, r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
}
19 changes: 10 additions & 9 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"
"time"
"volcano.sh/volcano/pkg/controllers/apis"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -81,10 +82,10 @@ type Controller struct {
// queue name -> podgroup namespace/name
podGroups map[string]map[string]struct{}

syncHandler func(req *schedulingv1alpha2.QueueRequest) error
syncHandler func(req *apis.Request) error
syncCommandHandler func(cmd *busv1alpha1.Command) error

enqueueQueue func(req *schedulingv1alpha2.QueueRequest)
enqueueQueue func(req *apis.Request)

recorder record.EventRecorder
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (c *Controller) processNextWorkItem() bool {
}
defer c.queue.Done(obj)

req, ok := obj.(*schedulingv1alpha2.QueueRequest)
req, ok := obj.(*apis.Request)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
Expand All @@ -217,20 +218,20 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error {
func (c *Controller) handleQueue(req *apis.Request) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing queue %s (%v).", req.Name, time.Since(startTime))
klog.V(4).Infof("Finished syncing queue %s (%v).", req.QueueName, time.Since(startTime))
}()

queue, err := c.queueLister.Get(req.Name)
queue, err := c.queueLister.Get(req.QueueName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Queue %s has been deleted.", req.Name)
klog.V(4).Infof("Queue %s has been deleted.", req.QueueName)
return nil
}

return fmt.Errorf("get queue %s failed for %v", req.Name, err)
return fmt.Errorf("get queue %s failed for %v", req.QueueName, err)
}

queueState := queuestate.NewState(queue)
Expand All @@ -240,7 +241,7 @@ func (c *Controller) handleQueue(req *schedulingv1alpha2.QueueRequest) error {

if err := queueState.Execute(req.Action); err != nil {
return fmt.Errorf("sync queue %s failed for %v, event is %v, action is %s",
req.Name, err, req.Event, req.Action)
req.QueueName, err, req.Event, req.Action)
}

return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package queue
import (
"fmt"
"reflect"
"volcano.sh/volcano/pkg/apis/common"

schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
"volcano.sh/volcano/pkg/controllers/queue/state"
Expand Down Expand Up @@ -87,12 +88,12 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st

if queue.Spec.State != newQueue.Spec.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.OpenQueueAction),
fmt.Sprintf("Open queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeNormal, string(common.OpenQueueAction),
fmt.Sprintf("Open queue succeed"))
} else {
return nil
Expand All @@ -112,7 +113,7 @@ func (c *Controller) openQueue(queue *schedulingv1alpha2.Queue, updateStateFn st

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.OpenQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.OpenQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
Expand All @@ -130,12 +131,12 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s

if queue.Spec.State != newQueue.Spec.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().Update(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.CloseQueueAction),
fmt.Sprintf("Close queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeNormal, string(common.CloseQueueAction),
fmt.Sprintf("Close queue succeed"))
} else {
return nil
Expand All @@ -156,7 +157,7 @@ func (c *Controller) closeQueue(queue *schedulingv1alpha2.Queue, updateStateFn s

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(schedulingv1alpha2.CloseQueueAction),
c.recorder.Event(newQueue, v1.EventTypeWarning, string(common.CloseQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
Expand Down
34 changes: 18 additions & 16 deletions pkg/controllers/queue/queue_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@ limitations under the License.
package queue

import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/apis/common"
schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
"volcano.sh/volcano/pkg/controllers/apis"

"k8s.io/client-go/tools/cache"

"k8s.io/klog"
)

func (c *Controller) enqueue(req *schedulingv1alpha2.QueueRequest) {
func (c *Controller) enqueue(req *apis.Request) {
c.queue.Add(req)
}

func (c *Controller) addQueue(obj interface{}) {
queue := obj.(*schedulingv1alpha2.Queue)

req := &schedulingv1alpha2.QueueRequest{
Name: queue.Name,
req := &apis.Request{
QueueName: queue.Name,

Event: schedulingv1alpha2.QueueOutOfSyncEvent,
Action: schedulingv1alpha2.SyncQueueAction,
Event: common.OutOfSyncEvent,
Action: common.SyncQueueAction,
}

c.enqueue(req)
Expand Down Expand Up @@ -96,11 +98,11 @@ func (c *Controller) addPodGroup(obj interface{}) {
}
c.podGroups[pg.Spec.Queue][key] = struct{}{}

req := &schedulingv1alpha2.QueueRequest{
Name: pg.Spec.Queue,
req := &apis.Request{
QueueName: pg.Spec.Queue,

Event: schedulingv1alpha2.QueueOutOfSyncEvent,
Action: schedulingv1alpha2.SyncQueueAction,
Event: common.OutOfSyncEvent,
Action: common.SyncQueueAction,
}

c.enqueue(req)
Expand Down Expand Up @@ -139,11 +141,11 @@ func (c *Controller) deletePodGroup(obj interface{}) {

delete(c.podGroups[pg.Spec.Queue], key)

req := &schedulingv1alpha2.QueueRequest{
Name: pg.Spec.Queue,
req := &apis.Request{
QueueName: pg.Spec.Queue,

Event: schedulingv1alpha2.QueueOutOfSyncEvent,
Action: schedulingv1alpha2.SyncQueueAction,
Event: common.OutOfSyncEvent,
Action: common.SyncQueueAction,
}

c.enqueue(req)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/queue/state/closed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ limitations under the License.
package state

import (
"volcano.sh/volcano/pkg/apis/common"
"volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)

type closedState struct {
queue *v1alpha2.Queue
}

func (cs *closedState) Execute(action v1alpha2.QueueAction) error {
func (cs *closedState) Execute(action common.Action) error {
switch action {
case v1alpha2.OpenQueueAction:
case common.OpenQueueAction:
return OpenQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) {
status.State = v1alpha2.QueueStateOpen
return
})
case v1alpha2.CloseQueueAction:
case common.CloseQueueAction:
return SyncQueue(cs.queue, func(status *v1alpha2.QueueStatus, podGroupList []string) {
status.State = v1alpha2.QueueStateClosed
return
Expand Down
Loading

0 comments on commit 666e470

Please sign in to comment.