Skip to content

Commit

Permalink
feat(metrics): add task schedule interval to executor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
AlirieGray committed Dec 10, 2019
1 parent f64c631 commit ea4da8f
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 53 deletions.
4 changes: 2 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,11 +647,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
sch, sm, err := scheduler.NewScheduler(
executor,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) {
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledFor time.Time, err error) {
schLogger.Info(
"error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledAt", scheduledAt),
zap.Time("scheduledFor", scheduledFor),
zap.Error(err))
}),
)
Expand Down
7 changes: 4 additions & 3 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,10 +1360,10 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
}

// CreateRun creates a run with a scheduledFor time as now.
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.createRun(ctx, tx, taskID, scheduledFor)
run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt)
if err != nil {
return err
}
Expand All @@ -1372,13 +1372,14 @@ func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFo
})
return r, err
}
func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
id := s.IDGenerator.ID()

run := influxdb.Run{
ID: id,
TaskID: taskID,
ScheduledFor: scheduledFor,
RunAt: runAt,
Status: backend.RunScheduled.String(),
Log: []influxdb.Log{},
}
Expand Down
6 changes: 3 additions & 3 deletions mock/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, schedule
type TaskControlService struct {
CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error)
NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error)
CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error)
CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error)
CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
Expand All @@ -87,8 +87,8 @@ func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxd
func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
return tcs.NextDueRunFn(ctx, taskID)
}
func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
return tcs.CreateRunFn(ctx, taskID, scheduledFor)
func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
return tcs.CreateRunFn(ctx, taskID, scheduledFor, runAt)
}
func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
return tcs.CurrentlyRunningFn(ctx, taskID)
Expand Down
3 changes: 2 additions & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ type Run struct {
ID ID `json:"id,omitempty"`
TaskID ID `json:"taskID"`
Status string `json:"status"`
ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the time the task is scheduled to run at
ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query
RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset
StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task
FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task
RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task
Expand Down
2 changes: 1 addition & 1 deletion task/backend/analytical_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error {
case scheduledForField:
scheduled, err := time.Parse(time.RFC3339, cr.Strings(j).ValueString(i))
if err != nil {
re.log.Info("Failed to parse scheduledAt time", zap.Error(err))
re.log.Info("Failed to parse scheduledFor time", zap.Error(err))
continue
}
r.ScheduledFor = scheduled.UTC()
Expand Down
14 changes: 13 additions & 1 deletion task/backend/executor/executor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ExecutorMetrics struct {
manualRunsCounter *prometheus.CounterVec
resumeRunsCounter *prometheus.CounterVec
unrecoverableCounter *prometheus.CounterVec
scheduleInterval *prometheus.HistogramVec
}

type runCollector struct {
Expand Down Expand Up @@ -83,6 +84,13 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics {
Name: "resume_runs_counter",
Help: "Total number of runs resumed by task ID",
}, []string{"taskID"}),

scheduleInterval: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "schedule_interval",
Help: "The interval between the time the run was scheduled for and the time the task's next run is due at, by task type",
}, []string{"task_type"}),
}
}

Expand Down Expand Up @@ -122,13 +130,17 @@ func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector {
em.manualRunsCounter,
em.resumeRunsCounter,
em.unrecoverableCounter,
em.scheduleInterval,
}
}

// StartRun store the delta time between when a run is due to start and actually starting.
func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration) {
func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration, scheduleDelta time.Duration) {
em.queueDelta.WithLabelValues(task.Type, "all").Observe(queueDelta.Seconds())
em.queueDelta.WithLabelValues("", task.ID.String()).Observe(queueDelta.Seconds())

// schedule interval duration = (time task was scheduled to run) - (time it actually ran)
em.scheduleInterval.WithLabelValues(task.Type).Observe(scheduleDelta.Seconds())
}

// FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.
Expand Down
6 changes: 3 additions & 3 deletions task/backend/executor/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ var (
func TestTaskConcurrency(t *testing.T) {
tes := taskExecutorSystem(t)
te := tes.ex
r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second))
r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second), time.Now())
if err != nil {
t.Fatal(err)
}
r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second))
r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second), time.Now())
if err != nil {
t.Fatal(err)
}
r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second))
r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second), time.Now())
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 9 additions & 9 deletions task/backend/executor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,20 @@ func (e *TaskExecutor) SetLimitFunc(l LimitFunc) {
}

// Execute is a executor to satisfy the needs of tasks
func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error {
_, err := e.PromisedExecute(ctx, id, scheduledAt)
func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) error {
_, err := e.PromisedExecute(ctx, id, scheduledFor, runAt)
return err
}

// PromisedExecute begins execution for the tasks id with a specific scheduledAt time.
// When we execute we will first build a run for the scheduledAt time,
// PromisedExecute begins execution for the tasks id with a specific scheduledFor time.
// When we execute we will first build a run for the scheduledFor time,
// We then want to add to the queue anything that was manually queued to run.
// If the queue is full the call to execute should hang and apply back pressure to the caller
// We then start a worker to work the newly queued jobs.
func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (Promise, error) {
func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) (Promise, error) {
iid := influxdb.ID(id)
// create a run
p, err := e.createRun(ctx, iid, scheduledAt)
p, err := e.createRun(ctx, iid, scheduledFor, runAt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -154,8 +154,8 @@ func (e *TaskExecutor) ResumeCurrentRun(ctx context.Context, id influxdb.ID, run
return nil, influxdb.ErrRunNotFound
}

func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*promise, error) {
r, err := e.tcs.CreateRun(ctx, id, scheduledAt.UTC())
func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledFor time.Time, runAt time.Time) (*promise, error) {
r, err := e.tcs.CreateRun(ctx, id, scheduledFor.UTC(), runAt.UTC())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (w *worker) start(p *promise) {
w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted)

// add to metrics
w.te.metrics.StartRun(p.task, time.Since(p.createdAt))
w.te.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt))
p.startedAt = time.Now()
}

Expand Down
29 changes: 21 additions & 8 deletions task/backend/executor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func testQuerySuccess(t *testing.T) {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,6 +86,10 @@ func testQuerySuccess(t *testing.T) {
t.Fatal("promise and run dont match")
}

if run.RunAt != time.Unix(126, 0).UTC() {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)

Expand Down Expand Up @@ -113,7 +117,7 @@ func testQueryFailure(t *testing.T) {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -196,7 +200,7 @@ func testResumingRun(t *testing.T) {
t.Fatal(err)
}

stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0))
stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -239,7 +243,7 @@ func testWorkerLimit(t *testing.T) {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -281,7 +285,7 @@ func testLimitFunc(t *testing.T) {
return nil
})

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -317,7 +321,7 @@ func testMetrics(t *testing.T) {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -383,6 +387,15 @@ func testMetrics(t *testing.T) {
t.Fatalf("expected 1 manual run, got %v", got)
}

m = promtest.MustFindMetric(t, mg, "task_executor_schedule_interval", map[string]string{"task_type": ""})
if got := *m.Histogram.SampleCount; got != 1 {
t.Fatalf("expected to count 1 schedule interval metric, got %v", got)
}

if got := *m.Histogram.SampleSum; got <= 100 {
t.Fatalf("expected schedule interval metric to be very large, got %v", got)
}

}

func testIteratorFailure(t *testing.T) {
Expand All @@ -403,7 +416,7 @@ func testIteratorFailure(t *testing.T) {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -447,7 +460,7 @@ func testErrorHandling(t *testing.T) {
forcedErr := errors.New("could not find bucket")
tes.svc.FailNextQuery(forcedErr)

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions task/backend/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type RunCreation struct {
// Unix timestamp for when the next run is due.
NextDue int64

// time.Time for when the next run is due (includes offset)
RunAt time.Time

// Whether there are any manual runs queued for this task.
// If so, the scheduler should begin executing them after handling real-time tasks.
HasQueue bool
Expand Down
2 changes: 1 addition & 1 deletion task/backend/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Executor interface {
// Errors returned from the execute request imply that this attempt has failed and
// should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors
// so the time can be configurable.
Execute(ctx context.Context, id ID, scheduledAt time.Time) error
Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error
}

// Schedulable is the interface that encapsulates work that
Expand Down
Loading

0 comments on commit ea4da8f

Please sign in to comment.