Skip to content

Commit

Permalink
Implement task skip logic
Browse files Browse the repository at this point in the history
A task will be marked as to-skip if, when it would be marked as ready,
there is already another of the same task which is ready or running. A
task which is marked as to-skip is still picked up by the scheduler, but
it is not run. This is so that it runs through the same schedule-next
path as a task that should run, such that a scheduled task doesn't proc
a future scheduled task (that would be bad).
  • Loading branch information
thomasmitchell committed May 14, 2020
1 parent bb7e119 commit fd3cc53
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 83 deletions.
2 changes: 1 addition & 1 deletion client/doomsday/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type GetSchedulerTask struct {
Reason string `json:"reason"`
Kind string `json:"kind"`
ID uint `json:"id"`
Ready bool `json:"ready"`
State string `json:"state"`
}

func (c *Client) GetSchedulerState() (*GetSchedulerResponse, error) {
Expand Down
11 changes: 2 additions & 9 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/doomsday-project/doomsday/client/doomsday"
"github.com/olekukonko/tablewriter"
"github.com/starkandwayne/goutils/ansi"
)

type schedulerCmd struct{}
Expand Down Expand Up @@ -42,25 +41,19 @@ func printSchedTaskList(tasks []doomsday.GetSchedulerTask) {
table.SetRowLine(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(false)
table.SetHeader([]string{"ID", "At", "Backend", "Kind", "Reason", "Ready"})
table.SetHeader([]string{"ID", "At", "Backend", "Kind", "Reason", "State"})
table.SetAlignment(tablewriter.ALIGN_RIGHT)

readyStr := ansi.Sprintf("@G{YES}")
notReadyStr := ansi.Sprintf("@R{NO}")
now := time.Now()
for _, task := range tasks {
timeUntilStr := time.Unix(task.At, 0).Sub(now).Truncate(100 * time.Millisecond).String()
readyOutStr := notReadyStr
if task.Ready {
readyOutStr = readyStr
}
table.Append([]string{
strconv.FormatUint(uint64(task.ID), 10),
timeUntilStr,
task.Backend,
task.Kind,
task.Reason,
readyOutStr,
task.State,
})
}
table.Render()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/robfig/cron v1.2.0
github.com/starkandwayne/goutils v0.0.0-20190115202530-896b8a6904be
github.com/thomasmitchell/go-shout v0.0.0-20200117221442-ad6c1a8d1669
github.com/thomasmmitchell/go-shout v0.0.0-20180822165752-cac99ad33707
golang.org/x/crypto v0.0.0-20180319061731-c3a3ad6d03f7
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.2.7
)
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ github.com/cloudfoundry/socks5-proxy v0.2.0 h1:ZRXcJxUqOyKmah+ytXh52K7m7S7SyuBac
github.com/cloudfoundry/socks5-proxy v0.2.0/go.mod h1:0a+Ghg38uB86Dx+de84dFSkILTnBHzCpFMRnjHgSzi4=
github.com/cppforlife/go-patch v0.1.0 h1:I0fT+gFTSW4xWwvaTaUUVjr9xxjNXJ4naGc01BeQjwY=
github.com/cppforlife/go-patch v0.1.0/go.mod h1:67a7aIi94FHDZdoeGSJRRFDp66l9MhaAG1yGxpUoFD8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.6.0 h1:66qjqZk8kalYAvDRtM1AdAJQI0tj4Wrue3Eq3B3pmFU=
github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/golang/protobuf v1.0.0 h1:lsek0oXi8iFE9L+EXARyHIjU5rlWIhhTkjDz3vHhWWQ=
github.com/golang/protobuf v1.0.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand All @@ -49,27 +51,30 @@ github.com/onsi/gomega v1.3.0 h1:yPHEatyQC4jN3vdfvqJXG7O9vfC6LhaAV1NEdYpP+h0=
github.com/onsi/gomega v1.3.0/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/starkandwayne/goutils v0.0.0-20190115202530-896b8a6904be h1:vV6o1C8iPioC0Ahi3e9Bs9vVPW9/YN3uwgA6EFahAws=
github.com/starkandwayne/goutils v0.0.0-20190115202530-896b8a6904be/go.mod h1:Py4V645l0xZXsyvSR6WIcsGhNQEiIFDlmJ4Xwd6UCws=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/thomasmitchell/go-shout v0.0.0-20200117221442-ad6c1a8d1669 h1:0yNQACaupniwG7arYMieo4CGe8+LTV1goOH2xAsTOfM=
github.com/thomasmitchell/go-shout v0.0.0-20200117221442-ad6c1a8d1669/go.mod h1:TU91eTbR12yCLqWKTy3ULmVZGIkneRJX1cIbJZZFCW4=
github.com/thomasmmitchell/go-shout v0.0.0-20180822165752-cac99ad33707 h1:+5yXj0YeB4QMIr4oHJTkj5J8dSn2UUlW7IVAvAa/rUk=
github.com/thomasmmitchell/go-shout v0.0.0-20180822165752-cac99ad33707/go.mod h1:gevMa2dWHqBkWGpMtALcTd/+SwaETg5ZWD2QGCX2F2M=
golang.org/x/crypto v0.0.0-20180319061731-c3a3ad6d03f7 h1:ryKu9k3oWWgQUTahNaa+lDY2fruNO/7fqQVKQfl0Vmc=
golang.org/x/crypto v0.0.0-20180319061731-c3a3ad6d03f7/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180319151425-92b859f39abd h1:2sKGOpMUjIt+fJ/nFpDa8IAMMryxtVhHGq0ezUTczhU=
golang.org/x/net v0.0.0-20180319151425-92b859f39abd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180319153314-d8e400bc7db4 h1:anxQt+SXhc1OlQkU08HQ+Tw+uhVacLM7oL6RZsUgbCw=
golang.org/x/sys v0.0.0-20180319153314-d8e400bc7db4/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.0.0-20190319135612-7b8349ac747c h1:nsJYChHWxeFA6+48RmvBXEvQNanNyh933kZYWiu2HBE=
gopkg.in/yaml.v2 v2.0.0-20190319135612-7b8349ac747c/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
165 changes: 97 additions & 68 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ import (
type taskKind uint

const (
//right now, the order of these actually matters, because it is used for
//the order of these actually matters, because it is used for
// prioritization when sorting the queue
queueTaskKindAuth taskKind = iota
queueTaskKindRefresh
)

func (t taskKind) String() string {
if t == queueTaskKindAuth {
switch t {
case queueTaskKindAuth:
return "auth"
case queueTaskKindRefresh:
return "refresh"
default:
return "unknown"
}

return "refresh"
}

type runReason uint
Expand All @@ -35,11 +38,37 @@ const (
)

func (r runReason) String() string {
if r == runReasonSchedule {
switch r {
case runReasonSchedule:
return "scheduled"
case runReasonAdhoc:
return "adhoc"
default:
return "unknown"
}
}

return "adhoc"
type taskState uint

const (
//the order of these actually matters, because it is used for
// prioritization when sorting the queue
queueTaskStatePending = iota
queueTaskStateReady
queueTaskStateSkip
)

func (s taskState) String() string {
switch s {
case queueTaskStatePending:
return "pending"
case queueTaskStateReady:
return "ready"
case queueTaskStateSkip:
return "skipping"
default:
return "unknown"
}
}

type managerTask struct {
Expand All @@ -48,13 +77,23 @@ type managerTask struct {
source *Source
runTime time.Time
reason runReason
ready bool
state taskState
}

func (m *managerTask) durationUntil() time.Duration {
return m.runTime.Sub(time.Now())
}

func (m *managerTask) run(cache *Cache, log *logger.Logger) {
switch m.kind {
case queueTaskKindAuth:
m.source.Auth(log)

case queueTaskKindRefresh:
m.source.Refresh(cache, log)
}
}

type managerTasks []managerTask

type taskQueue struct {
Expand All @@ -79,15 +118,32 @@ func newTaskQueue(cache *Cache, log *logger.Logger) *taskQueue {

//next blocks until there is a task for this thread to handle. it then dequeues
// and returns that task.
func (t *taskQueue) next() managerTask {
func (t *taskQueue) runNext() managerTask {
t.lock.Lock()

for t.empty() || !t.data[0].ready {
for t.empty() || t.data[0].state == queueTaskStatePending {
t.cond.Wait()
}

ret := t.dequeueNoLock()

if ret.state == queueTaskStateSkip {
t.lock.Unlock()
t.log.WriteF("Scheduler skipping %s %s of `%s'", ret.reason, ret.kind, ret.source.Core.Name)
return ret
}

t.running = append(t.running, ret)
t.lock.Unlock()

t.log.WriteF("Scheduler running %s %s of `%s'", ret.reason, ret.kind, ret.source.Core.Name)

ret.run(t.globalCache, t.log)

t.lock.Lock()
t.running.deleteTaskWithID(ret.id)
t.lock.Unlock()

return ret
}

Expand All @@ -114,11 +170,17 @@ func (t *taskQueue) enqueue(task managerTask) {
return
}

t.log.WriteF("Marking %s %s task for backend `%s' as ready (id %d)",
foundTask.reason, foundTask.kind, foundTask.source.Core.Name, task.id)
foundTask.ready = true
t.data.sort()
if t.data.sameTaskExistsAndReady(foundTask) || t.running.sameTaskExistsAndReady(foundTask) {
t.log.WriteF("Marking %s %s task for backend `%s' as to skip (id %d)",
foundTask.reason, foundTask.kind, foundTask.source.Core.Name, task.id)
foundTask.state = queueTaskStateSkip
} else {
t.log.WriteF("Marking %s %s task for backend `%s' as ready (id %d)",
foundTask.reason, foundTask.kind, foundTask.source.Core.Name, task.id)
foundTask.state = queueTaskStateReady
}

t.data.sort()
t.cond.Signal()
})
}
Expand All @@ -136,30 +198,25 @@ func (t managerTasks) idxWithID(id uint) int {
}

//sort priority:
// 1. readiness
// 1. skip<ready<pending
// 2. auth before refresh if both are ready
// 3. scheduled time
// 4. id
func (t managerTasks) sort() {
sort.Slice(t, func(i, j int) bool {
if t[i].ready && !t[j].ready {
return true
if t[i].state != t[j].state {
return t[i].state > t[j].state
}

if t[j].ready && !t[i].ready {
return false
if t[i].kind != t[j].kind {
return t[i].kind < t[j].kind
}

if t[i].ready && t[j].ready {
if t[i].kind != t[j].kind {
return t[i].kind < t[j].kind
}
if !t[i].runTime.Equal(t[j].runTime) {
return t[i].runTime.Before(t[j].runTime)
}

if t[i].runTime.Equal(t[j].runTime) {
return t[i].id < t[j].id
}
return t[i].runTime.Before(t[j].runTime)
return t[i].id < t[j].id
})
}

Expand All @@ -175,16 +232,20 @@ func (t managerTasks) findTaskWithID(id uint) *managerTask {
return ret
}

func (t managerTasks) idxWithSourceAndKind(sourceName string, taskType taskKind) int {
var ret int = -1
//no lock
//considered the same task if the associated source core has the same name, and
// the kind of task is the same. Considered ready if the ready member of the task
// is true.
func (t managerTasks) sameTaskExistsAndReady(task *managerTask) bool {
for i := range t {
if t[i].source.Core.Name == sourceName && t[i].kind == taskType {
ret = i
break
if t[i].source.Core.Name == task.source.Core.Name &&
t[i].kind == task.kind &&
t[i].state == queueTaskStateReady {
return true
}
}

return ret
return false
}

func (t *managerTasks) deleteTaskWithID(id uint) {
Expand All @@ -211,44 +272,12 @@ func (t *taskQueue) empty() bool {
func (t *taskQueue) start() {
go func() {
for {
next := t.next()
t.log.WriteF("Scheduler running %s %s of `%s'", next.reason, next.kind, next.source.Core.Name)
t.run(next)
next := t.runNext()
t.scheduleNextRunOf(next)
}
}()
}

func (t *taskQueue) run(task managerTask) {

t.lock.Lock()

currentIdx := t.running.idxWithSourceAndKind(task.source.Core.Name, task.kind)
if currentIdx >= 0 {
t.log.WriteF("Skipping %s %s task run of `%s' because same task already in progress", task.reason, task.kind, task.source.Core.Name)

t.lock.Unlock()
return
}
t.running = append(t.running, task)

t.lock.Unlock()

defer func() {
t.lock.Lock()
t.running.deleteTaskWithID(task.id)
t.lock.Unlock()
}()

switch task.kind {
case queueTaskKindAuth:
task.source.Auth(t.log)

case queueTaskKindRefresh:
task.source.Refresh(t.globalCache, t.log)
}
}

func (t *taskQueue) scheduleNextRunOf(task managerTask) {
if task.reason == runReasonAdhoc {
return
Expand Down Expand Up @@ -296,7 +325,7 @@ type SchedulerTask struct {
Backend string `json:"backend"`
Reason string `json:"reason"`
Kind string `json:"kind"`
Ready bool `json:"ready"`
State string `json:"state"`
}

func (t *taskQueue) dumpState() SchedulerState {
Expand All @@ -318,7 +347,7 @@ func (t *taskQueue) dumpStateNoLock() SchedulerState {
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
Ready: task.ready,
State: task.state.String(),
})
}

Expand All @@ -329,7 +358,7 @@ func (t *taskQueue) dumpStateNoLock() SchedulerState {
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
Ready: task.ready,
State: task.state.String(),
})
}

Expand Down
Loading

0 comments on commit fd3cc53

Please sign in to comment.