Skip to content

Commit

Permalink
Added schedule introspector for debugging
Browse files Browse the repository at this point in the history
source manager now has function for dumping schedule
scheduler now has function for dumping its own state synchronously
server now has /v1/scheduler endpoint for getting data
client library now has function for calling /v1/scheduler and getting
 data into a struct
cli now has `scheduler` or `sched` command (hidden from help) for
looking at scheduler state easily.

Scheduler is still borked though, and with the help of these tools, I
can see that.
  • Loading branch information
thomasmitchell committed May 6, 2020
1 parent 7c7431c commit 1a01df8
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 22 deletions.
17 changes: 17 additions & 0 deletions client/doomsday/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,20 @@ func (c *Client) Info() (*InfoResponse, error) {
err := c.doRequest("GET", "/v1/info", nil, &resp)
return &resp, err
}

type GetSchedulerResponse struct {
Tasks []GetSchedulerTask `json:"tasks"`
}

type GetSchedulerTask struct {
At int64 `json:"at"`
Reason string `json:"reason"`
Kind string `json:"kind"`
Ready bool `json:"ready"`
}

func (c *Client) GetSchedulerState() (*GetSchedulerResponse, error) {
resp := GetSchedulerResponse{}
err := c.doRequest("GET", "/v1/scheduler", nil, &resp)
return &resp, err
}
4 changes: 4 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func registerCommands(app *kingpin.Application) {
cmdIndex["dashboard"] = &dashboardCmd{}
cmdIndex["dash"] = cmdIndex["dashboard"]

_ = app.Command("scheduler", "View the current state of the doomsday scheduler").Alias("sched").Hidden()
cmdIndex["scheduler"] = &schedulerCmd{}
cmdIndex["sched"] = cmdIndex["scheduler"]

_ = app.Command("refresh", "Refresh the servers cache")
cmdIndex["refresh"] = &refreshCmd{}

Expand Down
52 changes: 52 additions & 0 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"fmt"
"os"
"time"

"github.com/doomsday-project/doomsday/client/doomsday"
"github.com/olekukonko/tablewriter"
"github.com/starkandwayne/goutils/ansi"
)

type schedulerCmd struct{}

func (*schedulerCmd) Run() error {
state, err := client.GetSchedulerState()
if err != nil {
return err
}

printSchedTaskList(state.Tasks)
return nil
}

func printSchedTaskList(tasks []doomsday.GetSchedulerTask) {
fmt.Printf("\n")
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetRowLine(true)
table.SetAutoWrapText(false)
table.SetReflowDuringAutoWrap(false)
table.SetHeader([]string{"At", "Reason", "Kind", "Ready"})

readyStr := ansi.Sprintf("@G{YES}")
notReadyStr := ansi.Sprintf("@R{NO}")
now := time.Now()
for _, task := range tasks {
timeUntilStr := time.Unix(task.At, 0).Sub(now).String()
readyOutStr := notReadyStr
if task.Ready {
readyOutStr = readyStr
}
table.Append([]string{
timeUntilStr,
task.Reason,
task.Kind,
readyOutStr,
})
}
table.Render()
fmt.Printf("\n")
}
37 changes: 34 additions & 3 deletions server/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ func (t *taskQueue) empty() bool {
func (t *taskQueue) start() {
go func() {
for {
t.run(t.next())
next := t.next()
t.log.WriteF("Scheduler running %s %s of `%s'", next.reason, next.kind, next.source.Core.Name)
t.run(next)
}
}()
}
Expand All @@ -143,12 +145,10 @@ func (t *taskQueue) run(task managerTask) {
var skipSched bool
switch task.kind {
case queueTaskKindAuth:
log.WriteF("Starting authentication for `%s'", task.source.Core.Name)
task.source.Auth(t.log)
nextTime, skipSched = task.source.CalcNextAuth()

case queueTaskKindRefresh:
t.log.WriteF("Running %s populate of `%s'", task.reason.String(), task.source.Core.Name)
task.source.Refresh(t.globalCache, t.log)
nextTime = task.source.CalcNextRefresh()
}
Expand All @@ -168,3 +168,34 @@ func (t *taskQueue) run(task managerTask) {
kind: task.kind,
})
}

type SchedulerState struct {
Tasks []SchedulerTask
}

type SchedulerTask struct {
At time.Time
Reason string
Kind string
Ready bool
}

func (t *taskQueue) dumpState() SchedulerState {
ret := SchedulerState{
Tasks: []SchedulerTask{},
}

t.lock.Lock()
defer t.lock.Unlock()

for _, task := range t.data {
ret.Tasks = append(ret.Tasks, SchedulerTask{
At: task.runTime,
Reason: task.reason.String(),
Kind: task.kind.String(),
Ready: task.ready,
})
}

return ret
}
34 changes: 33 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func Start(conf Config) error {

log.WriteF("Starting background scheduler")

manager.BackgroundScheduler()
err = manager.BackgroundScheduler()
if err != nil {
return fmt.Errorf("Error starting scheduler: %s", err)
}

log.WriteF("Began asynchronous cache population")
log.WriteF("Configuring frontend authentication")
Expand All @@ -93,6 +96,7 @@ func Start(conf Config) error {
router.HandleFunc("/v1/auth", authorizer.LoginHandler()).Methods("POST")
router.HandleFunc("/v1/cache", auth(getCache(manager))).Methods("GET")
router.HandleFunc("/v1/cache/refresh", auth(refreshCache(manager))).Methods("POST")
router.HandleFunc("/v1/scheduler", auth(getScheduler(manager))).Methods("GET")

if len(conf.Server.Dev.Mappings) > 0 {
for file, servePath := range conf.Server.Dev.Mappings {
Expand Down Expand Up @@ -179,6 +183,34 @@ func refreshCache(manager *SourceManager) func(w http.ResponseWriter, r *http.Re
}
}

func getScheduler(manager *SourceManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
schedData := manager.SchedulerState()
respRaw := doomsday.GetSchedulerResponse{
Tasks: []doomsday.GetSchedulerTask{},
}

for i := range schedData.Tasks {
respRaw.Tasks = append(respRaw.Tasks, doomsday.GetSchedulerTask{
At: schedData.Tasks[i].At.Unix(),
Reason: schedData.Tasks[i].Reason,
Kind: schedData.Tasks[i].Kind,
Ready: schedData.Tasks[i].Ready,
})
}

resp, err := json.Marshal(&respRaw)
if err != nil {
w.WriteHeader(500)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
writeBody(w, resp)
}
}

func serveFile(content []byte, mimeType string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", mimeType)
Expand Down
4 changes: 3 additions & 1 deletion server/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ type RunTiming struct {
FinishedAt time.Time
}

//TODO: clean up the mode argument
func (s *Source) Refresh(global *Cache, log *logger.Logger) {
log.WriteF("Running populate of `%s'", s.Core.Name)

old := s.Core.Cache()
if old == nil {
old = NewCache()
Expand Down Expand Up @@ -63,6 +64,7 @@ func (s *Source) Refresh(global *Cache, log *logger.Logger) {
}

func (s *Source) Auth(log *logger.Logger) {
log.WriteF("Starting authentication for `%s'", s.Core.Name)

s.lock.Lock()
s.authStatus.LastRun = RunTiming{StartedAt: time.Now()}
Expand Down
43 changes: 26 additions & 17 deletions server/source_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package server

import (
"fmt"
"sort"
"sync"
"time"

"github.com/doomsday-project/doomsday/client/doomsday"
Expand All @@ -17,13 +17,8 @@ const (
const AdHocThrottle = 30 * time.Second

type SourceManager struct {
//sources is a static view of the queue, such that the get calls don't need
// to sync with the populate calls. Otherwise, we would need to read-lock on
// get calls so that the order of the queue doesn't shift from underneath us
// as the async scheduler is running
sources []Source
queue *taskQueue
lock sync.RWMutex
log *logger.Logger
global *Cache
}
Expand All @@ -33,19 +28,9 @@ func NewSourceManager(sources []Source, log *logger.Logger) *SourceManager {
panic("No logger was given")
}

now := time.Now()
globalCache := NewCache()
queue := newTaskQueue(globalCache)

for i := range sources {
queue.enqueue(managerTask{
kind: queueTaskKindAuth,
source: &sources[i],
runTime: now,
reason: runReasonSchedule,
})
}

return &SourceManager{
sources: sources,
queue: queue,
Expand All @@ -54,8 +39,28 @@ func NewSourceManager(sources []Source, log *logger.Logger) *SourceManager {
}
}

func (s *SourceManager) BackgroundScheduler() {
func (s *SourceManager) BackgroundScheduler() error {
now := time.Now()
for i := range s.sources {
s.sources[i].Auth(s.log)
if s.sources[i].authStatus.LastErr != nil {
return fmt.Errorf("Error performing initial auth for backend `%s': %s",
s.sources[i].Core.Name,
s.sources[i].authStatus.LastErr)
}
}

for i := range s.sources {
s.queue.enqueue(managerTask{
kind: queueTaskKindRefresh,
source: &s.sources[i],
runTime: now.Add(1 * time.Second),
reason: runReasonSchedule,
})
}

s.queue.start()
return nil
}

func (s *SourceManager) Data() doomsday.CacheItems {
Expand Down Expand Up @@ -95,3 +100,7 @@ func (s *SourceManager) RefreshAll() {
}
}
}

func (s *SourceManager) SchedulerState() SchedulerState {
return s.queue.dumpState()
}

0 comments on commit 1a01df8

Please sign in to comment.