Skip to content

Commit

Permalink
Implement multithread scheduler
Browse files Browse the repository at this point in the history
Scheduler now uses a worker system that allows for multiple tasks to be
picked up at once. The state of the workers can be grabbed from the
scheduler endpoint.
  • Loading branch information
thomasmitchell committed Jun 19, 2020
1 parent fd3cc53 commit 478b6f4
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 113 deletions.
20 changes: 14 additions & 6 deletions client/doomsday/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,25 @@ func (c *Client) Info() (*InfoResponse, error) {
}

type GetSchedulerResponse struct {
Running []GetSchedulerTask `json:"running"`
Pending []GetSchedulerTask `json:"pending"`
Running []GetSchedulerTask `json:"running"`
Pending []GetSchedulerTask `json:"pending"`
Workers []GetSchedulerWorker `json:"workers"`
}

type GetSchedulerTask struct {
At int64 `json:"at"`
Backend string `json:"backend"`
Reason string `json:"reason"`
Kind string `json:"kind"`
At int64 `json:"at"`
Backend string `json:"backend"`
Reason string `json:"reason"`
Kind string `json:"kind"`
ID uint `json:"id"`
State string `json:"state"`
WorkerID int `json:"worker_id"`
}

type GetSchedulerWorker struct {
ID uint `json:"id"`
State string `json:"state"`
StateAt int64 `json:"state_at"`
}

func (c *Client) GetSchedulerState() (*GetSchedulerResponse, error) {
Expand Down
48 changes: 42 additions & 6 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,78 @@ func (*schedulerCmd) Run() error {
}

header := tablewriter.NewWriter(os.Stdout)
header.SetHeader([]string{"WORKERS"})
header.SetHeaderLine(false)
header.Render()

printWorkerList(state.Workers)

header = tablewriter.NewWriter(os.Stdout)
header.SetHeader([]string{"RUNNING"})
header.SetHeaderLine(false)
header.Render()

printSchedTaskList(state.Running)
printSchedTaskList(state.Running, true)

header = tablewriter.NewWriter(os.Stdout)
header.SetHeader([]string{"PENDING"})
header.SetHeaderLine(false)
header.Render()

printSchedTaskList(state.Pending)
printSchedTaskList(state.Pending, false)
return nil
}

func printSchedTaskList(tasks []doomsday.GetSchedulerTask) {
func printWorkerList(workers []doomsday.GetSchedulerWorker) {
fmt.Printf("\n")
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetRowLine(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(false)
table.SetHeader([]string{"ID", "State", "For"})
table.SetAlignment(tablewriter.ALIGN_RIGHT)

now := time.Now()
for _, worker := range workers {
timeSinceStr := now.Sub(time.Unix(worker.StateAt, 0)).Truncate(100 * time.Millisecond).String()
table.Append([]string{strconv.FormatUint(uint64(worker.ID), 10), worker.State, timeSinceStr})
}
table.Render()
fmt.Printf("\n")
}

func printSchedTaskList(tasks []doomsday.GetSchedulerTask, showWorker bool) {
fmt.Printf("\n")
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetRowLine(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(false)
table.SetHeader([]string{"ID", "At", "Backend", "Kind", "Reason", "State"})
headers := []string{"ID", "At", "Backend", "Kind", "Reason", "State"}
if showWorker {
headers = append(headers, "Worker")
}
table.SetHeader(headers)
table.SetAlignment(tablewriter.ALIGN_RIGHT)

now := time.Now()
for _, task := range tasks {
timeUntilStr := time.Unix(task.At, 0).Sub(now).Truncate(100 * time.Millisecond).String()
table.Append([]string{
values := []string{
strconv.FormatUint(uint64(task.ID), 10),
timeUntilStr,
task.Backend,
task.Kind,
task.Reason,
task.State,
})
}

if showWorker {
values = append(values, strconv.FormatInt(int64(task.WorkerID), 10))
}

table.Append(values)
}
table.Render()
fmt.Printf("\n")
Expand Down
138 changes: 50 additions & 88 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ func (s taskState) String() string {
}

type managerTask struct {
id uint
kind taskKind
source *Source
runTime time.Time
reason runReason
state taskState
id uint
kind taskKind
source *Source
runTime time.Time
reason runReason
state taskState
assignedWorker *taskWorker
}

func (m *managerTask) durationUntil() time.Duration {
Expand All @@ -103,50 +104,22 @@ type taskQueue struct {
cond *sync.Cond
log *logger.Logger
globalCache *Cache
numWorkers uint
workers []*taskWorker
nextTaskID uint
}

func newTaskQueue(cache *Cache, log *logger.Logger) *taskQueue {
func newTaskQueue(cache *Cache, numWorkers uint, log *logger.Logger) *taskQueue {
lock := &sync.Mutex{}
return &taskQueue{
lock: lock,
log: log,
cond: sync.NewCond(lock),
globalCache: cache,
numWorkers: numWorkers,
}
}

//next blocks until there is a task for this thread to handle. it then dequeues
// and returns that task.
func (t *taskQueue) runNext() managerTask {
t.lock.Lock()

for t.empty() || t.data[0].state == queueTaskStatePending {
t.cond.Wait()
}

ret := t.dequeueNoLock()

if ret.state == queueTaskStateSkip {
t.lock.Unlock()
t.log.WriteF("Scheduler skipping %s %s of `%s'", ret.reason, ret.kind, ret.source.Core.Name)
return ret
}

t.running = append(t.running, ret)
t.lock.Unlock()

t.log.WriteF("Scheduler running %s %s of `%s'", ret.reason, ret.kind, ret.source.Core.Name)

ret.run(t.globalCache, t.log)

t.lock.Lock()
t.running.deleteTaskWithID(ret.id)
t.lock.Unlock()

return ret
}

//enqueue puts a task into the queue, unique by the tuple source, taskType. If
//there already exists a task for this source/taskType, it will be removed and
//replaced with this new one atomically.
Expand Down Expand Up @@ -270,46 +243,17 @@ func (t *taskQueue) empty() bool {
}

func (t *taskQueue) start() {
go func() {
for {
next := t.runNext()
t.scheduleNextRunOf(next)
}
}()
}

func (t *taskQueue) scheduleNextRunOf(task managerTask) {
if task.reason == runReasonAdhoc {
return
workerFactory := newTaskWorkerFactory(t, t.globalCache, t.log)
for i := uint(0); i < t.numWorkers; i++ {
t.workers = append(t.workers, workerFactory.newWorker())
t.workers[i].consumeScheduler()
}

var nextTime time.Time
var skipSched bool

switch task.kind {
case queueTaskKindAuth:
nextTime, skipSched = task.source.CalcNextAuth()

case queueTaskKindRefresh:
nextTime = task.source.CalcNextRefresh()
}

if skipSched {
t.log.WriteF("Skipping further scheduling of `%s' for `%s'", task.kind.String(), task.source.Core.Name)
return
}

t.enqueue(managerTask{
source: task.source,
runTime: nextTime,
reason: runReasonSchedule,
kind: task.kind,
})
}

type SchedulerState struct {
Running []SchedulerTask `json:"running"`
Pending []SchedulerTask `json:"pending"`
Workers []WorkerDump `json:"workers"`
}

func (s SchedulerState) String() string {
Expand All @@ -320,12 +264,19 @@ func (s SchedulerState) String() string {
}

type SchedulerTask struct {
ID uint `json:"id"`
At time.Time `json:"at"`
Backend string `json:"backend"`
Reason string `json:"reason"`
Kind string `json:"kind"`
State string `json:"state"`
WorkerID int `json:"worker"`
}

type WorkerDump struct {
ID uint `json:"id"`
At time.Time `json:"at"`
Backend string `json:"backend"`
Reason string `json:"reason"`
Kind string `json:"kind"`
State string `json:"state"`
StateAt time.Time `json:"state_at"`
}

func (t *taskQueue) dumpState() SchedulerState {
Expand All @@ -342,23 +293,34 @@ func (t *taskQueue) dumpStateNoLock() SchedulerState {

for _, task := range t.running {
ret.Running = append(ret.Running, SchedulerTask{
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
State: task.state.String(),
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
State: task.state.String(),
WorkerID: int(task.assignedWorker.id),
})
}

for _, task := range t.data {
ret.Pending = append(ret.Pending, SchedulerTask{
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
State: task.state.String(),
ID: task.id,
At: task.runTime,
Backend: task.source.Core.Name,
Reason: task.reason.String(),
Kind: task.kind.String(),
State: task.state.String(),
WorkerID: -1,
})
}

for _, worker := range t.workers {
state, stateAt := worker.State()
ret.Workers = append(ret.Workers, WorkerDump{
ID: worker.id,
State: state.String(),
StateAt: stateAt,
})
}

Expand Down
34 changes: 22 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,35 @@ func getScheduler(manager *SourceManager) func(w http.ResponseWriter, r *http.Re
Pending: []doomsday.GetSchedulerTask{},
}

for i := range schedData.Workers {
respRaw.Workers = append(respRaw.Workers, doomsday.GetSchedulerWorker{
ID: schedData.Workers[i].ID,
State: schedData.Workers[i].State,
StateAt: schedData.Workers[i].StateAt.Unix(),
})
}

for i := range schedData.Running {
respRaw.Running = append(respRaw.Running, doomsday.GetSchedulerTask{
At: schedData.Running[i].At.Unix(),
Backend: schedData.Running[i].Backend,
Reason: schedData.Running[i].Reason,
Kind: schedData.Running[i].Kind,
ID: schedData.Running[i].ID,
State: schedData.Running[i].State,
At: schedData.Running[i].At.Unix(),
Backend: schedData.Running[i].Backend,
Reason: schedData.Running[i].Reason,
Kind: schedData.Running[i].Kind,
ID: schedData.Running[i].ID,
State: schedData.Running[i].State,
WorkerID: schedData.Running[i].WorkerID,
})
}

for i := range schedData.Pending {
respRaw.Pending = append(respRaw.Pending, doomsday.GetSchedulerTask{
At: schedData.Pending[i].At.Unix(),
Backend: schedData.Pending[i].Backend,
Reason: schedData.Pending[i].Reason,
Kind: schedData.Pending[i].Kind,
ID: schedData.Pending[i].ID,
State: schedData.Pending[i].State,
At: schedData.Pending[i].At.Unix(),
Backend: schedData.Pending[i].Backend,
Reason: schedData.Pending[i].Reason,
Kind: schedData.Pending[i].Kind,
ID: schedData.Pending[i].ID,
State: schedData.Pending[i].State,
WorkerID: -1,
})
}

Expand Down
3 changes: 2 additions & 1 deletion server/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func NewSourceManager(sources []Source, log *logger.Logger) *SourceManager {
}

globalCache := NewCache()
queue := newTaskQueue(globalCache, log)
//TODO: Make the number of workers configurable
queue := newTaskQueue(globalCache, 4, log)

return &SourceManager{
sources: sources,
Expand Down
Loading

0 comments on commit 478b6f4

Please sign in to comment.