Skip to content

Commit

Permalink
Update adhoc scheduling logic
Browse files Browse the repository at this point in the history
It no longer removes the existing scheduled task, and it no longer
schedules a new task in its place. Also, the throttling on enqueue was
removed, and tasks will be skipped if there is already one in progress.
  • Loading branch information
thomasmitchell committed May 7, 2020
1 parent dacdb62 commit 5837e02
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 43 deletions.
78 changes: 48 additions & 30 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ func (t *taskQueue) next() managerTask {
}

ret := t.dequeueNoLock()
t.running = append(t.running, ret)

t.lock.Unlock()
return ret
}
Expand All @@ -96,14 +94,11 @@ func (t *taskQueue) next() managerTask {
//replaced with this new one atomically.
func (t *taskQueue) enqueue(task managerTask) {
t.lock.Lock()
t.removeExistingNoLock(task.source, task.kind)
task.id = t.nextTaskID
t.nextTaskID++
t.log.WriteF("Enqueuing new %s %s task for backend `%s' with id %d", task.reason, task.kind, task.source.Core.Name, task.id)
t.data = append(t.data, task)
t.data.sort()
t.log.WriteF("task enqueued")
t.log.WriteF("scheduler state:\n%s", t.dumpStateNoLock().String())
t.lock.Unlock()

time.AfterFunc(time.Until(task.runTime), func() {
Expand Down Expand Up @@ -156,6 +151,18 @@ func (t managerTasks) findTaskWithID(id uint) *managerTask {
return ret
}

func (t managerTasks) idxWithSourceAndKind(sourceName string, taskType taskKind) int {
var ret int = -1
for i := range t {
if t[i].source.Core.Name == sourceName && t[i].kind == taskType {
ret = i
break
}
}

return ret
}

func (t *managerTasks) deleteTaskWithID(id uint) {
if idx := t.idxWithID(id); idx >= 0 {
(*t)[idx] = (*t)[len(*t)-1]
Expand All @@ -173,22 +180,6 @@ func (t *taskQueue) dequeueNoLock() managerTask {
return ret
}

func (t *taskQueue) removeExistingNoLock(source *Source, taskType taskKind) {
t.log.WriteF("Searching for %s task for backend `%s' to remove", taskType, source.Core.Name)
t.log.WriteF("scheduler state:\n%s", t.dumpStateNoLock().String())
for i := range t.data {
if t.data[i].source.Core.Name == source.Core.Name && t.data[i].kind == taskType {

t.log.WriteF("Removing %s of `%s' task with id `%d'",
t.data[i].kind, t.data[i].source.Core.Name, t.data[i].id)
t.data[i] = t.data[len(t.data)-1]
t.data = t.data[:len(t.data)-1]
t.data.sort()
return
}
}
}

func (t *taskQueue) empty() bool {
return len(t.data) == 0
}
Expand All @@ -199,20 +190,54 @@ func (t *taskQueue) start() {
next := t.next()
t.log.WriteF("Scheduler running %s %s of `%s'", next.reason, next.kind, next.source.Core.Name)
t.run(next)
t.scheduleNextRunOf(next)
}
}()
}

func (t *taskQueue) run(task managerTask) {

t.lock.Lock()

currentIdx := t.running.idxWithSourceAndKind(task.source.Core.Name, task.kind)
if currentIdx >= 0 {
t.log.WriteF("Skipping %s %s task run of `%s' because same task already in progress", task.reason, task.kind, task.source.Core.Name)

t.lock.Unlock()
return
}
t.running = append(t.running, task)

t.lock.Unlock()

defer func() {
t.lock.Lock()
t.running.deleteTaskWithID(task.id)
t.lock.Unlock()
}()

switch task.kind {
case queueTaskKindAuth:
task.source.Auth(t.log)

case queueTaskKindRefresh:
task.source.Refresh(t.globalCache, t.log)
}
}

func (t *taskQueue) scheduleNextRunOf(task managerTask) {
if task.reason == runReasonAdhoc {
return
}

var nextTime time.Time
var skipSched bool

switch task.kind {
case queueTaskKindAuth:
task.source.Auth(t.log)
nextTime, skipSched = task.source.CalcNextAuth()

case queueTaskKindRefresh:
task.source.Refresh(t.globalCache, t.log)
nextTime = task.source.CalcNextRefresh()
}

Expand All @@ -221,13 +246,6 @@ func (t *taskQueue) run(task managerTask) {
return
}

t.lock.Lock()
t.running.deleteTaskWithID(task.id)
t.lock.Unlock()

task.runTime = nextTime
task.reason = runReasonSchedule

t.enqueue(managerTask{
source: task.source,
runTime: nextTime,
Expand Down
19 changes: 6 additions & 13 deletions server/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const (
ExpiredRetryInterval = 5 * time.Minute
)

const AdHocThrottle = 5 * time.Second

type SourceManager struct {
sources []Source
queue *taskQueue
Expand Down Expand Up @@ -97,18 +95,13 @@ func (s *SourceManager) Data() doomsday.CacheItems {

func (s *SourceManager) RefreshAll() {
now := time.Now()
cutoff := now.Add(-AdHocThrottle)
for i := range s.sources {
if s.sources[i].refreshStatus.LastRun.StartedAt.IsZero() || //it was never run?
(!s.sources[i].refreshStatus.LastRun.FinishedAt.IsZero() && //if FinishedAt is zero, its currently in progress
s.sources[i].refreshStatus.LastRun.FinishedAt.Before(cutoff)) {
s.queue.enqueue(managerTask{
source: &s.sources[i],
kind: queueTaskKindRefresh,
runTime: now,
reason: runReasonAdhoc,
})
}
s.queue.enqueue(managerTask{
source: &s.sources[i],
kind: queueTaskKindRefresh,
runTime: now,
reason: runReasonAdhoc,
})
}
}

Expand Down

0 comments on commit 5837e02

Please sign in to comment.