Skip to content

Commit

Permalink
feat(task): Separate System from Non-System query compiler feature fl…
Browse files Browse the repository at this point in the history
…agging.
  • Loading branch information
brettbuddin committed Jun 8, 2020
1 parent 34b3dc7 commit 9b53315
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
54 changes: 37 additions & 17 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func MultiLimit(limits ...LimitFunc) LimitFunc {
type LimitFunc func(*influxdb.Task, *influxdb.Run) error

type executorConfig struct {
maxWorkers int
buildCompiler CompilerBuilderFunc
maxWorkers int
systemBuildCompiler CompilerBuilderFunc
nonSystemBuildCompiler CompilerBuilderFunc
}

type executorOption func(*executorConfig)
Expand All @@ -65,19 +66,29 @@ func WithMaxWorkers(n int) executorOption {
// context.Context provided can be assumed to be an authorized context.
type CompilerBuilderFunc func(ctx context.Context, query string, now time.Time) (flux.Compiler, error)

// WithCompilerBuilder is an Executor option that configures a
// CompilerBuilderFunc to be used when compiling queries.
func WithCompilerBuilder(builder CompilerBuilderFunc) executorOption {
// WithSystemCompilerBuilder is an Executor option that configures a
// CompilerBuilderFunc to be used when compiling queries for System Tasks.
func WithSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption {
return func(o *executorConfig) {
o.buildCompiler = builder
o.systemBuildCompiler = builder
}
}

// WithNonSystemCompilerBuilder is an Executor option that configures a
// CompilerBuilderFunc to be used when compiling queries for non-System Tasks
// (Checks and Notifications).
func WithNonSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption {
return func(o *executorConfig) {
o.nonSystemBuildCompiler = builder
}
}

// NewExecutor creates a new task executor
func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
cfg := &executorConfig{
maxWorkers: defaultMaxWorkers,
buildCompiler: NewASTCompiler,
maxWorkers: defaultMaxWorkers,
systemBuildCompiler: NewASTCompiler,
nonSystemBuildCompiler: NewASTCompiler,
}
for _, opt := range opts {
opt(cfg)
Expand All @@ -90,11 +101,12 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.Authorizati
qs: qs,
as: as,

currentPromises: sync.Map{},
promiseQueue: make(chan *promise, maxPromises),
workerLimit: make(chan struct{}, cfg.maxWorkers),
limitFunc: func(*influxdb.Task, *influxdb.Run) error { return nil }, // noop
buildCompiler: cfg.buildCompiler,
currentPromises: sync.Map{},
promiseQueue: make(chan *promise, maxPromises),
workerLimit: make(chan struct{}, cfg.maxWorkers),
limitFunc: func(*influxdb.Task, *influxdb.Run) error { return nil }, // noop
systemBuildCompiler: cfg.systemBuildCompiler,
nonSystemBuildCompiler: cfg.nonSystemBuildCompiler,
}

e.metrics = NewExecutorMetrics(e)
Expand Down Expand Up @@ -130,7 +142,8 @@ type Executor struct {
workerPool sync.Pool
workerLimit chan struct{}

buildCompiler CompilerBuilderFunc
nonSystemBuildCompiler CompilerBuilderFunc
systemBuildCompiler CompilerBuilderFunc
}

// SetLimitFunc sets the limit func for this task executor
Expand Down Expand Up @@ -284,7 +297,8 @@ func (wm *workerMaker) new() interface{} {
return &worker{
e: wm.e,
exhaustResultIterators: exhaustResultIterators,
buildCompiler: wm.e.buildCompiler,
systemBuildCompiler: wm.e.systemBuildCompiler,
nonSystemBuildCompiler: wm.e.nonSystemBuildCompiler,
}
}

Expand All @@ -295,7 +309,8 @@ type worker struct {
// of a flux query
exhaustResultIterators func(res flux.Result) error

buildCompiler CompilerBuilderFunc
systemBuildCompiler CompilerBuilderFunc
nonSystemBuildCompiler CompilerBuilderFunc
}

func (w *worker) work() {
Expand Down Expand Up @@ -416,7 +431,12 @@ func (w *worker) executeQuery(p *promise) {
w.start(p)

ctx = icontext.SetAuthorizer(ctx, p.task.Authorization)
compiler, err := w.buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor)

buildCompiler := w.systemBuildCompiler
if p.task.Type != influxdb.TaskSystemType {
buildCompiler = w.nonSystemBuildCompiler
}
compiler, err := buildCompiler(ctx, p.task.Flux, p.run.ScheduledFor)
if err != nil {
w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err))
return
Expand Down
3 changes: 2 additions & 1 deletion task/backend/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ func testIteratorFailure(t *testing.T) {
exhaustResultIterators: func(flux.Result) error {
return errors.New("something went wrong exhausting iterator")
},
buildCompiler: NewASTCompiler,
systemBuildCompiler: NewASTCompiler,
nonSystemBuildCompiler: NewASTCompiler,
}
}}

Expand Down

0 comments on commit 9b53315

Please sign in to comment.