Skip to content

Commit

Permalink
Added running task state to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasmitchell committed May 7, 2020
1 parent e48ac3d commit dacdb62
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 33 deletions.
3 changes: 2 additions & 1 deletion client/doomsday/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (c *Client) Info() (*InfoResponse, error) {
}

type GetSchedulerResponse struct {
Tasks []GetSchedulerTask `json:"tasks"`
Running []GetSchedulerTask `json:"running"`
Pending []GetSchedulerTask `json:"pending"`
}

type GetSchedulerTask struct {
Expand Down
14 changes: 13 additions & 1 deletion cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@ func (*schedulerCmd) Run() error {
return err
}

printSchedTaskList(state.Tasks)
header := tablewriter.NewWriter(os.Stdout)
header.SetHeader([]string{"RUNNING"})
header.SetHeaderLine(false)
header.Render()

printSchedTaskList(state.Running)

header = tablewriter.NewWriter(os.Stdout)
header.SetHeader([]string{"PENDING"})
header.SetHeaderLine(false)
header.Render()

printSchedTaskList(state.Pending)
return nil
}

Expand Down
85 changes: 62 additions & 23 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func (m *managerTask) durationUntil() time.Duration {
return m.runTime.Sub(time.Now())
}

type managerTasks []managerTask

type taskQueue struct {
data []managerTask
data managerTasks
running managerTasks
lock *sync.Mutex
cond *sync.Cond
log *logger.Logger
Expand Down Expand Up @@ -82,6 +85,8 @@ func (t *taskQueue) next() managerTask {
}

ret := t.dequeueNoLock()
t.running = append(t.running, ret)

t.lock.Unlock()
return ret
}
Expand All @@ -96,7 +101,7 @@ func (t *taskQueue) enqueue(task managerTask) {
t.nextTaskID++
t.log.WriteF("Enqueuing new %s %s task for backend `%s' with id %d", task.reason, task.kind, task.source.Core.Name, task.id)
t.data = append(t.data, task)
t.sort()
t.data.sort()
t.log.WriteF("task enqueued")
t.log.WriteF("scheduler state:\n%s", t.dumpStateNoLock().String())
t.lock.Unlock()
Expand All @@ -105,7 +110,7 @@ func (t *taskQueue) enqueue(task managerTask) {
t.lock.Lock()
defer t.lock.Unlock()

foundTask := t.findTaskWithIDNoLock(task.id)
foundTask := t.data.findTaskWithID(task.id)
if foundTask != nil {
t.log.WriteF("Marking %s %s task for backend `%s' as ready (id %d)",
foundTask.reason, foundTask.kind, foundTask.source.Core.Name, task.id)
Expand All @@ -118,39 +123,56 @@ func (t *taskQueue) enqueue(task managerTask) {
})
}

func (t managerTasks) idxWithID(id uint) int {
var ret int = -1
for i := range t {
if t[i].id == id {
ret = i
break
}
}

return ret
}

func (t managerTasks) sort() {
sort.Slice(t, func(i, j int) bool {
if t[i].runTime.Equal(t[j].runTime) {
return t[i].id < t[j].id
}
return t[i].runTime.Before(t[j].runTime)
})
}

//If the queue order is shuffled in any way after a call to this function, the
// returned pointer is invalidated. Therefore, you should only call this and
// manipulate the returned object while you are holding the lock
func (t *taskQueue) findTaskWithIDNoLock(id uint) *managerTask {
func (t managerTasks) findTaskWithID(id uint) *managerTask {
var ret *managerTask
for i := range t.data {
if t.data[i].id == id {
ret = &t.data[i]
break
}
if idx := t.idxWithID(id); idx >= 0 {
ret = &t[idx]
}

return ret
}

func (t *managerTasks) deleteTaskWithID(id uint) {
if idx := t.idxWithID(id); idx >= 0 {
(*t)[idx] = (*t)[len(*t)-1]
*t = (*t)[:len(*t)-1]
(*t).sort()
}
}

func (t *taskQueue) dequeueNoLock() managerTask {
ret := t.data[0]
t.data[0] = t.data[len(t.data)-1]
t.data = t.data[:len(t.data)-1]
t.sort()
t.data.sort()

return ret
}

func (t *taskQueue) sort() {
sort.Slice(t.data, func(i, j int) bool {
if t.data[i].runTime.Equal(t.data[j].runTime) {
return t.data[i].id < t.data[j].id
}
return t.data[i].runTime.Before(t.data[j].runTime)
})
}

func (t *taskQueue) removeExistingNoLock(source *Source, taskType taskKind) {
t.log.WriteF("Searching for %s task for backend `%s' to remove", taskType, source.Core.Name)
t.log.WriteF("scheduler state:\n%s", t.dumpStateNoLock().String())
Expand All @@ -161,7 +183,7 @@ func (t *taskQueue) removeExistingNoLock(source *Source, taskType taskKind) {
t.data[i].kind, t.data[i].source.Core.Name, t.data[i].id)
t.data[i] = t.data[len(t.data)-1]
t.data = t.data[:len(t.data)-1]
t.sort()
t.data.sort()
return
}
}
Expand Down Expand Up @@ -199,6 +221,10 @@ func (t *taskQueue) run(task managerTask) {
return
}

t.lock.Lock()
t.running.deleteTaskWithID(task.id)
t.lock.Unlock()

task.runTime = nextTime
task.reason = runReasonSchedule

Expand All @@ -211,7 +237,8 @@ func (t *taskQueue) run(task managerTask) {
}

type SchedulerState struct {
Tasks []SchedulerTask `json:"tasks"`
Running []SchedulerTask `json:"running"`
Pending []SchedulerTask `json:"pending"`
}

func (s SchedulerState) String() string {
Expand All @@ -238,11 +265,23 @@ func (t *taskQueue) dumpState() SchedulerState {

func (t *taskQueue) dumpStateNoLock() SchedulerState {
ret := SchedulerState{
Tasks: []SchedulerTask{},
Running: []SchedulerTask{},
Pending: []SchedulerTask{},
}

for _, task := range t.running {
ret.Running = append(ret.Running, SchedulerTask{
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
Ready: task.ready,
})
}

for _, task := range t.data {
ret.Tasks = append(ret.Tasks, SchedulerTask{
ret.Pending = append(ret.Pending, SchedulerTask{
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Expand Down
27 changes: 19 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,27 @@ func getScheduler(manager *SourceManager) func(w http.ResponseWriter, r *http.Re
return func(w http.ResponseWriter, r *http.Request) {
schedData := manager.SchedulerState()
respRaw := doomsday.GetSchedulerResponse{
Tasks: []doomsday.GetSchedulerTask{},
Running: []doomsday.GetSchedulerTask{},
Pending: []doomsday.GetSchedulerTask{},
}

for i := range schedData.Tasks {
respRaw.Tasks = append(respRaw.Tasks, doomsday.GetSchedulerTask{
At: schedData.Tasks[i].At.Unix(),
Backend: schedData.Tasks[i].Backend,
Reason: schedData.Tasks[i].Reason,
Kind: schedData.Tasks[i].Kind,
Ready: schedData.Tasks[i].Ready,
for i := range schedData.Running {
respRaw.Running = append(respRaw.Running, doomsday.GetSchedulerTask{
At: schedData.Running[i].At.Unix(),
Backend: schedData.Running[i].Backend,
Reason: schedData.Running[i].Reason,
Kind: schedData.Running[i].Kind,
Ready: schedData.Running[i].Ready,
})
}

for i := range schedData.Pending {
respRaw.Pending = append(respRaw.Pending, doomsday.GetSchedulerTask{
At: schedData.Pending[i].At.Unix(),
Backend: schedData.Pending[i].Backend,
Reason: schedData.Pending[i].Reason,
Kind: schedData.Pending[i].Kind,
Ready: schedData.Pending[i].Ready,
})
}

Expand Down

0 comments on commit dacdb62

Please sign in to comment.