Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Inject Task's LatestSuccess Timestamp In Flux Extern #19402

Merged
merged 8 commits into from
Aug 25, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern

### Bug Fixes

Expand Down
6 changes: 6 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,9 @@
key: enforceOrgDashboardLimits
default: false
contact: Compute Team

- name: Inject Latest Success Time
description: Inject the latest successful task run timestamp into a Task query extern when executing.
key: injectLatestSuccessTime
default: false
contact: Compute Team
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 70 additions & 10 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package executor

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/influxdb/v2"
Expand All @@ -22,6 +24,9 @@ import (
const (
maxPromises = 1000
defaultMaxWorkers = 100

latestSuccessOption = "tasks.latestSuccessTime"
latestFailureOption = "tasks.latestFailureTime"
)

var _ scheduler.Executor = (*Executor)(nil)
Expand Down Expand Up @@ -70,7 +75,31 @@ func WithMaxWorkers(n int) executorOption {

// CompilerBuilderFunc is a function that yields a new flux.Compiler. The
// context.Context provided can be assumed to be an authorized context.
type CompilerBuilderFunc func(ctx context.Context, query string, now time.Time) (flux.Compiler, error)
type CompilerBuilderFunc func(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error)

// CompilerBuilderTimestamps contains timestamps which should be provided along
// with a Task query.
type CompilerBuilderTimestamps struct {
Now time.Time
LatestSuccess time.Time
}

func (ts CompilerBuilderTimestamps) Extern() *ast.File {
var body []ast.Statement

if !ts.LatestSuccess.IsZero() {
body = append(body, &ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: ts.LatestSuccess,
},
},
})
}

return &ast.File{Body: body}
}

// WithSystemCompilerBuilder is an Executor option that configures a
// CompilerBuilderFunc to be used when compiling queries for System Tasks.
Expand Down Expand Up @@ -416,8 +445,6 @@ func (w *worker) start(p *promise) {
}

func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) {

// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()

Expand Down Expand Up @@ -471,7 +498,10 @@ func (w *worker) executeQuery(p *promise) {
if p.task.Type != influxdb.TaskSystemType {
buildCompiler = w.nonSystemBuildCompiler
}
compiler, err := buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor)
compiler, err := buildCompiler(ctx, p.task.Flux, CompilerBuilderTimestamps{
Now: p.run.ScheduledFor,
LatestSuccess: p.task.LatestSuccess,
})
if err != nil {
w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err))
return
Expand Down Expand Up @@ -592,21 +622,45 @@ func exhaustResultIterators(res flux.Result) error {
}

// NewASTCompiler parses a Flux query string into an AST representatation.
func NewASTCompiler(_ context.Context, query string, now time.Time) (flux.Compiler, error) {
func NewASTCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
pkg, err := runtime.ParseToJSON(query)
if err != nil {
return nil, err
}
var externBytes []byte
if feature.InjectLatestSuccessTime().Enabled(ctx) {
extern := ts.Extern()
if len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
return nil, err
}
}
}
return lang.ASTCompiler{
AST: pkg,
Now: now,
AST: pkg,
Now: ts.Now,
Extern: externBytes,
}, nil
}

// NewFluxCompiler wraps a Flux query string in a raw-query representation.
func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compiler, error) {
func NewFluxCompiler(ctx context.Context, query string, ts CompilerBuilderTimestamps) (flux.Compiler, error) {
var externBytes []byte
if feature.InjectLatestSuccessTime().Enabled(ctx) {
extern := ts.Extern()
if len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
return nil, err
}
}
}
return lang.FluxCompiler{
Query: query,
Query: query,
Extern: externBytes,
// TODO(brett): This mitigates an immediate problem where
// Checks/Notifications breaks when sending Now, and system Tasks do not
// break when sending Now. We are currently sending C+N through using
Expand All @@ -617,7 +671,13 @@ func NewFluxCompiler(_ context.Context, query string, _ time.Time) (flux.Compile
// we are able to locate the root cause and use Flux Compiler for all
// Task types.
//
// This should be removed once we diagnose the problem.
// It turns out this is due to the exclusive nature of the stop time in
// Flux "from" and that we weren't including the left-hand boundary of
// the range check for notifications. We're shipping a fix soon in
//
// https://github.com/influxdata/influxdb/pull/19392
//
// Once this has merged, we can send Now again.
//
// Now: now,
}, nil
Expand Down
Loading