Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Inject Task's LatestSuccess Timestamp In Flux Extern #19402

Merged
merged 8 commits into from
Aug 25, 2020
Prev Previous commit
chore(task/executor): Add tests for extern injection.
  • Loading branch information
brettbuddin committed Aug 25, 2020
commit c8d420084a9f315de2d68be50a91961915b6a09d
131 changes: 117 additions & 14 deletions task/backend/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (

"github.com/golang/mock/gomock"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom"
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
tracetest "github.com/influxdata/influxdb/v2/kit/tracing/testing"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
influxdbmock "github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/backend"
Expand Down Expand Up @@ -121,8 +124,108 @@ func TestTaskExecutor_QuerySuccess(t *testing.T) {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

<-promise.Done()

if got := promise.Error(); got != nil {
t.Fatal(got)
}

// confirm run is removed from in-mem store
run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID)
if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") {
t.Fatal("run was returned when it should have been removed from kv")
}

// ensure the run returned by TaskControlService.FinishRun(...)
// has run logs formatted as expected
if run = tes.tcs.run; run == nil {
t.Fatal("expected run returned by FinishRun to not be nil")
}

if len(run.Log) < 3 {
t.Fatalf("expected 3 run logs, found %d", len(run.Log))
}

sctx := span.Context().(jaeger.SpanContext)
expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID())
if expectedMessage != run.Log[1].Message {
t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message)
}
}

func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) {
t.Parallel()

tes := taskExecutorSystem(t)

var (
script = fmt.Sprintf(fmtTestScript, t.Name())
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
span = opentracing.GlobalTracer().StartSpan("test-span")
)
ctx = opentracing.ContextWithSpan(ctx, span)

task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: tes.tc.OrgID,
OwnerID: tes.tc.Auth.GetUserID(),
Flux: script,
})
if err != nil {
t.Fatal(err)
}

// Simulate previous run to establish a timestamp
latestSuccess := time.Now().UTC()
task, err = tes.i.UpdateTask(ctx, task.ID, influxdb.TaskUpdate{
LatestSuccess: &latestSuccess,
})
if err != nil {
t.Fatal(err)
}

extern := &ast.File{
Body: []ast.Statement{&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: latestSuccess,
},
},
},
},
}

ctx, err = feature.Annotate(ctx, influxdbmock.NewFlagger(map[feature.Flag]interface{}{
feature.InjectLatestSuccessTime(): true,
}))
if err != nil {
t.Fatal(err)
}

promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())

run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}

if run.ID != promiseID {
t.Fatal("promise and run dont match")
}

if run.RunAt != time.Unix(126, 0).UTC() {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}

tes.svc.WaitForQueryLive(t, script, extern)
tes.svc.SucceedQuery(script, extern)

<-promise.Done()

Expand Down Expand Up @@ -179,8 +282,8 @@ func TestTaskExecutor_QueryFailure(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))

<-promise.Done()

Expand Down Expand Up @@ -228,8 +331,8 @@ func TestManualRun(t *testing.T) {
t.Fatal("promise and run and manual run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

if got := promise.Error(); got != nil {
t.Fatal(got)
Expand Down Expand Up @@ -271,8 +374,8 @@ func TestTaskExecutor_ResumingRun(t *testing.T) {
t.Fatal("promise and run and manual run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

if got := promise.Error(); got != nil {
t.Fatal(got)
Expand All @@ -299,8 +402,8 @@ func TestTaskExecutor_WorkerLimit(t *testing.T) {
t.Fatal("expected a worker to be started")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.FailQuery(script, nil, errors.New("blargyblargblarg"))

<-promise.Done()

Expand Down Expand Up @@ -383,15 +486,15 @@ func TestTaskExecutor_Metrics(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.WaitForQueryLive(t, script, nil)

mg = promtest.MustGather(t, reg)
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total runs active, got %v", got)
}

tes.svc.SucceedQuery(script)
tes.svc.SucceedQuery(script, nil)
<-promise.Done()

mg = promtest.MustGather(t, reg)
Expand Down Expand Up @@ -483,8 +586,8 @@ func TestTaskExecutor_IteratorFailure(t *testing.T) {
t.Fatal("promise and run dont match")
}

tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
tes.svc.WaitForQueryLive(t, script, nil)
tes.svc.SucceedQuery(script, nil)

<-promise.Done()

Expand Down
31 changes: 21 additions & 10 deletions task/backend/executor/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
Expand All @@ -31,14 +32,24 @@ type fakeQueryService struct {

var _ query.AsyncQueryService = (*fakeQueryService)(nil)

func makeAST(q string) lang.ASTCompiler {
func makeAST(q string, extern *ast.File) lang.ASTCompiler {
pkg, err := runtime.ParseToJSON(q)
if err != nil {
panic(err)
}

var externBytes []byte
if extern != nil && len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
panic(err)
}
}
return lang.ASTCompiler{
AST: pkg,
Now: time.Unix(123, 0),
AST: pkg,
Now: time.Unix(123, 0),
Extern: externBytes,
}
}

Expand Down Expand Up @@ -85,12 +96,12 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.
}

// SucceedQuery allows the running query matching the given script to return on its Ready channel.
func (s *fakeQueryService) SucceedQuery(script string) {
func (s *fakeQueryService) SucceedQuery(script string, extern *ast.File) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, extern)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -103,12 +114,12 @@ func (s *fakeQueryService) SucceedQuery(script string) {
}

// FailQuery closes the running query's Ready channel and sets its error to the given value.
func (s *fakeQueryService) FailQuery(script string, forced error) {
func (s *fakeQueryService) FailQuery(script string, extern *ast.File, forced error) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, nil)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -129,12 +140,12 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
// WaitForQueryLive ensures that the query has made it into the service.
// This is particularly useful for the synchronous executor,
// because the execution starts on a separate goroutine.
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string, extern *ast.File) {
t.Helper()

const attempts = 10
ast := makeAST(script)
astUTC := makeAST(script)
ast := makeAST(script, extern)
astUTC := makeAST(script, extern)
astUTC.Now = ast.Now.UTC()
spec := makeASTString(ast)
specUTC := makeASTString(astUTC)
Expand Down