diff --git a/client/doomsday/client.go b/client/doomsday/client.go index ded687c..eb00bd5 100644 --- a/client/doomsday/client.go +++ b/client/doomsday/client.go @@ -173,3 +173,20 @@ func (c *Client) Info() (*InfoResponse, error) { err := c.doRequest("GET", "/v1/info", nil, &resp) return &resp, err } + +type GetSchedulerResponse struct { + Tasks []GetSchedulerTask `json:"tasks"` +} + +type GetSchedulerTask struct { + At int64 `json:"at"` + Reason string `json:"reason"` + Kind string `json:"kind"` + Ready bool `json:"ready"` +} + +func (c *Client) GetSchedulerState() (*GetSchedulerResponse, error) { + resp := GetSchedulerResponse{} + err := c.doRequest("GET", "/v1/scheduler", nil, &resp) + return &resp, err +} diff --git a/cmd/main.go b/cmd/main.go index 857289c..e9556f3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -56,6 +56,10 @@ func registerCommands(app *kingpin.Application) { cmdIndex["dashboard"] = &dashboardCmd{} cmdIndex["dash"] = cmdIndex["dashboard"] + _ = app.Command("scheduler", "View the current state of the doomsday scheduler").Alias("sched").Hidden() + cmdIndex["scheduler"] = &schedulerCmd{} + cmdIndex["sched"] = cmdIndex["scheduler"] + _ = app.Command("refresh", "Refresh the servers cache") cmdIndex["refresh"] = &refreshCmd{} diff --git a/cmd/scheduler.go b/cmd/scheduler.go new file mode 100644 index 0000000..3f97377 --- /dev/null +++ b/cmd/scheduler.go @@ -0,0 +1,52 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/doomsday-project/doomsday/client/doomsday" + "github.com/olekukonko/tablewriter" + "github.com/starkandwayne/goutils/ansi" +) + +type schedulerCmd struct{} + +func (*schedulerCmd) Run() error { + state, err := client.GetSchedulerState() + if err != nil { + return err + } + + printSchedTaskList(state.Tasks) + return nil +} + +func printSchedTaskList(tasks []doomsday.GetSchedulerTask) { + fmt.Printf("\n") + table := tablewriter.NewWriter(os.Stdout) + table.SetBorder(false) + table.SetRowLine(true) + table.SetAutoWrapText(false) + table.SetReflowDuringAutoWrap(false) + table.SetHeader([]string{"At", "Reason", "Kind", "Ready"}) + + readyStr := ansi.Sprintf("@G{YES}") + notReadyStr := ansi.Sprintf("@R{NO}") + now := time.Now() + for _, task := range tasks { + timeUntilStr := time.Unix(task.At, 0).Sub(now).String() + readyOutStr := notReadyStr + if task.Ready { + readyOutStr = readyStr + } + table.Append([]string{ + timeUntilStr, + task.Reason, + task.Kind, + readyOutStr, + }) + } + table.Render() + fmt.Printf("\n") +} diff --git a/server/scheduler.go b/server/scheduler.go index a95c0fb..0530ff7 100644 --- a/server/scheduler.go +++ b/server/scheduler.go @@ -133,7 +133,9 @@ func (t *taskQueue) empty() bool { func (t *taskQueue) start() { go func() { for { - t.run(t.next()) + next := t.next() + t.log.WriteF("Scheduler running %s %s of `%s'", next.reason, next.kind, next.source.Core.Name) + t.run(next) } }() } @@ -143,12 +145,10 @@ func (t *taskQueue) run(task managerTask) { var skipSched bool switch task.kind { case queueTaskKindAuth: - log.WriteF("Starting authentication for `%s'", task.source.Core.Name) task.source.Auth(t.log) nextTime, skipSched = task.source.CalcNextAuth() case queueTaskKindRefresh: - t.log.WriteF("Running %s populate of `%s'", task.reason.String(), task.source.Core.Name) task.source.Refresh(t.globalCache, t.log) nextTime = task.source.CalcNextRefresh() } @@ -168,3 +168,34 @@ func (t *taskQueue) run(task managerTask) { kind: task.kind, }) } + +type SchedulerState struct { + Tasks []SchedulerTask +} + +type SchedulerTask struct { + At time.Time + Reason string + Kind string + Ready bool +} + +func (t *taskQueue) dumpState() SchedulerState { + ret := SchedulerState{ + Tasks: []SchedulerTask{}, + } + + t.lock.Lock() + defer t.lock.Unlock() + + for _, task := range t.data { + ret.Tasks = append(ret.Tasks, SchedulerTask{ + At: task.runTime, + Reason: task.reason.String(), + Kind: task.kind.String(), + Ready: task.ready, + }) + } + + return ret +} diff --git a/server/server.go b/server/server.go index 4af40d5..42bfefa 100644 --- a/server/server.go +++ b/server/server.go @@ -68,7 +68,10 @@ func Start(conf Config) error { log.WriteF("Starting background scheduler") - manager.BackgroundScheduler() + err = manager.BackgroundScheduler() + if err != nil { + return fmt.Errorf("Error starting scheduler: %s", err) + } log.WriteF("Began asynchronous cache population") log.WriteF("Configuring frontend authentication") @@ -93,6 +96,7 @@ func Start(conf Config) error { router.HandleFunc("/v1/auth", authorizer.LoginHandler()).Methods("POST") router.HandleFunc("/v1/cache", auth(getCache(manager))).Methods("GET") router.HandleFunc("/v1/cache/refresh", auth(refreshCache(manager))).Methods("POST") + router.HandleFunc("/v1/scheduler", auth(getScheduler(manager))).Methods("GET") if len(conf.Server.Dev.Mappings) > 0 { for file, servePath := range conf.Server.Dev.Mappings { @@ -179,6 +183,34 @@ func refreshCache(manager *SourceManager) func(w http.ResponseWriter, r *http.Re } } +func getScheduler(manager *SourceManager) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + schedData := manager.SchedulerState() + respRaw := doomsday.GetSchedulerResponse{ + Tasks: []doomsday.GetSchedulerTask{}, + } + + for i := range schedData.Tasks { + respRaw.Tasks = append(respRaw.Tasks, doomsday.GetSchedulerTask{ + At: schedData.Tasks[i].At.Unix(), + Reason: schedData.Tasks[i].Reason, + Kind: schedData.Tasks[i].Kind, + Ready: schedData.Tasks[i].Ready, + }) + } + + resp, err := json.Marshal(&respRaw) + if err != nil { + w.WriteHeader(500) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + writeBody(w, resp) + } +} + func serveFile(content []byte, mimeType string) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", mimeType) diff --git a/server/source.go b/server/source.go index 4422542..a3e423c 100644 --- a/server/source.go +++ b/server/source.go @@ -30,8 +30,9 @@ type RunTiming struct { FinishedAt time.Time } -//TODO: clean up the mode argument func (s *Source) Refresh(global *Cache, log *logger.Logger) { + log.WriteF("Running populate of `%s'", s.Core.Name) + old := s.Core.Cache() if old == nil { old = NewCache() @@ -63,6 +64,7 @@ func (s *Source) Refresh(global *Cache, log *logger.Logger) { } func (s *Source) Auth(log *logger.Logger) { + log.WriteF("Starting authentication for `%s'", s.Core.Name) s.lock.Lock() s.authStatus.LastRun = RunTiming{StartedAt: time.Now()} diff --git a/server/source_manager.go b/server/source_manager.go index 6ca289f..9b70079 100644 --- a/server/source_manager.go +++ b/server/source_manager.go @@ -1,8 +1,8 @@ package server import ( + "fmt" "sort" - "sync" "time" "github.com/doomsday-project/doomsday/client/doomsday" @@ -17,13 +17,8 @@ const ( const AdHocThrottle = 30 * time.Second type SourceManager struct { - //sources is a static view of the queue, such that the get calls don't need - // to sync with the populate calls. Otherwise, we would need to read-lock on - // get calls so that the order of the queue doesn't shift from underneath us - // as the async scheduler is running sources []Source queue *taskQueue - lock sync.RWMutex log *logger.Logger global *Cache } @@ -33,19 +28,9 @@ func NewSourceManager(sources []Source, log *logger.Logger) *SourceManager { panic("No logger was given") } - now := time.Now() globalCache := NewCache() queue := newTaskQueue(globalCache) - for i := range sources { - queue.enqueue(managerTask{ - kind: queueTaskKindAuth, - source: &sources[i], - runTime: now, - reason: runReasonSchedule, - }) - } - return &SourceManager{ sources: sources, queue: queue, @@ -54,8 +39,28 @@ func NewSourceManager(sources []Source, log *logger.Logger) *SourceManager { } } -func (s *SourceManager) BackgroundScheduler() { +func (s *SourceManager) BackgroundScheduler() error { + now := time.Now() + for i := range s.sources { + s.sources[i].Auth(s.log) + if s.sources[i].authStatus.LastErr != nil { + return fmt.Errorf("Error performing initial auth for backend `%s': %s", + s.sources[i].Core.Name, + s.sources[i].authStatus.LastErr) + } + } + + for i := range s.sources { + s.queue.enqueue(managerTask{ + kind: queueTaskKindRefresh, + source: &s.sources[i], + runTime: now.Add(1 * time.Second), + reason: runReasonSchedule, + }) + } + s.queue.start() + return nil } func (s *SourceManager) Data() doomsday.CacheItems { @@ -95,3 +100,7 @@ func (s *SourceManager) RefreshAll() { } } } + +func (s *SourceManager) SchedulerState() SchedulerState { + return s.queue.dumpState() +}