Skip to content

Commit

Permalink
feat(tasks): remove flux stats from run log and replace with trace id (
Browse files Browse the repository at this point in the history
…#16263)

* feat(tasks): trace id in task run logs

* refactor(tracing): move trace info utility into kit/tracing package
  • Loading branch information
GeorgeMac authored Dec 18, 2019
1 parent 2967462 commit ff38420
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 56 deletions.
2 changes: 1 addition & 1 deletion http/query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *FluxHandler) handleQuery(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
log := h.log.With(logger.TraceFields(ctx)...)
if id, _, found := logger.TraceInfo(ctx); found {
if id, _, found := tracing.InfoFromContext(ctx); found {
w.Header().Set(traceIDHeader, id)
}

Expand Down
4 changes: 2 additions & 2 deletions http/query_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"testing"
"time"

"github.com/influxdata/influxdb/kit/tracing"
tracetesting "github.com/influxdata/influxdb/kit/tracing/testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
Expand Down Expand Up @@ -323,7 +323,7 @@ var _ metric.EventRecorder = noopEventRecorder{}

// Certain error cases must be encoded as influxdb.Error so they can be properly decoded clientside.
func TestFluxHandler_PostQuery_Errors(t *testing.T) {
defer tracing.JaegerTestSetupAndTeardown(t.Name())()
defer tracetesting.SetupInMemoryTracing(t.Name())()

i := inmem.NewService()
b := &FluxBackend{
Expand Down
24 changes: 24 additions & 0 deletions kit/tracing/testing/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package testing

import (
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
)

// SetupInMemoryTracing sets the global tracer to an in memory Jaeger instance for testing.
// The returned function should be deferred by the caller to tear down this setup after testing is complete.
func SetupInMemoryTracing(name string) func() {
var (
old = opentracing.GlobalTracer()
tracer, closer = jaeger.NewTracer(name,
jaeger.NewConstSampler(true),
jaeger.NewInMemoryReporter(),
)
)

opentracing.SetGlobalTracer(tracer)
return func() {
_ = closer.Close()
opentracing.SetGlobalTracer(old)
}
}
29 changes: 17 additions & 12 deletions kit/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,22 @@ func StartSpanFromContextWithOperationName(ctx context.Context, operationName st
return span, ctx
}

// JaegerTestSetupAndTeardown sets the global tracer to an in memory Jaeger instance for testing.
// The returned function should be deferred by the caller to tear down this setup after testing is complete.
func JaegerTestSetupAndTeardown(name string) func() {
old := opentracing.GlobalTracer()
tracer, closer := jaeger.NewTracer(name,
jaeger.NewConstSampler(true),
jaeger.NewInMemoryReporter(),
)
opentracing.SetGlobalTracer(tracer)
return func() {
_ = closer.Close()
opentracing.SetGlobalTracer(old)
// InfoFromSpan returns the traceID and if it was sampled from the span, given it is a jaeger span.
// It returns whether a span associated to the context has been found.
func InfoFromSpan(span opentracing.Span) (traceID string, sampled bool, found bool) {
if spanContext, ok := span.Context().(jaeger.SpanContext); ok {
traceID = spanContext.TraceID().String()
sampled = spanContext.IsSampled()
return traceID, sampled, true
}
return "", false, false
}

// InfoFromContext returns the traceID and if it was sampled from the Jaeger span
// found in the given context. It returns whether a span associated to the context has been found.
func InfoFromContext(ctx context.Context) (traceID string, sampled bool, found bool) {
if span := opentracing.SpanFromContext(ctx); span != nil {
return InfoFromSpan(span)
}
return "", false, false
}
16 changes: 2 additions & 14 deletions logger/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/pkg/snowflake"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
Expand Down Expand Up @@ -93,23 +94,10 @@ func Shard(id uint64) zapcore.Field {
return zap.Uint64(DBShardIDKey, id)
}

// TraceInfo returns the traceID and if it was sampled from the Jaeger span
// found in the given context. It returns whether a span associated to the context has been found.
func TraceInfo(ctx context.Context) (traceID string, sampled bool, found bool) {
if span := opentracing.SpanFromContext(ctx); span != nil {
if spanContext, ok := span.Context().(jaeger.SpanContext); ok {
traceID = spanContext.TraceID().String()
sampled = spanContext.IsSampled()
return traceID, sampled, true
}
}
return "", false, false
}

// TraceFields returns a fields "ot_trace_id" and "ot_trace_sampled", values pulled from the (Jaeger) trace ID
// found in the given context. Returns nil if the context doesn't have a trace ID.
func TraceFields(ctx context.Context) []zap.Field {
id, sampled, found := TraceInfo(ctx)
id, sampled, found := tracing.InfoFromContext(ctx)
if !found {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions query/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/influxdata/flux/iocounter"
"github.com/influxdata/influxdb/kit/check"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/logger"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -51,7 +50,7 @@ func (s *LoggingProxyQueryService) Query(ctx context.Context, w io.Writer, req *
entry.Write(zap.Error(err))
}
}
traceID, sampled, _ := logger.TraceInfo(ctx)
traceID, sampled, _ := tracing.InfoFromContext(ctx)
log := Log{
OrganizationID: req.Request.OrganizationID,
TraceID: traceID,
Expand Down
15 changes: 9 additions & 6 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ func (p *syncRunPromise) Cancel() {

func (p *syncRunPromise) finish(res *runResult, err error) {
p.finishOnce.Do(func() {
defer p.logEnd()
defer func() {
// Always cancel p's context.
// If finish is called before p.qs.Query completes, the query will be interrupted.
// If afterwards, then p.cancel is just a resource cleanup.
p.cancel()

p.logEnd()

// Always cancel p's context.
// If finish is called before p.qs.Query completes, the query will be interrupted.
// If afterwards, then p.cancel is just a resource cleanup.
defer p.cancel()
close(p.ready)
}()

p.res, p.err = res, err
close(p.ready)

if err != nil {
p.log.Debug("Execution failed to get result", zap.Error(err))
Expand Down
11 changes: 4 additions & 7 deletions task/backend/executor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package executor

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -400,12 +399,10 @@ func (w *worker) executeQuery(p *promise) {

it.Release()

// log the statistics on the run
stats := it.Statistics()

b, err := json.Marshal(stats)
if err == nil {
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), string(b))
// log the trace id and whether or not it was sampled into the run log
if traceID, isSampled, ok := tracing.InfoFromSpan(span); ok {
msg := fmt.Sprintf("trace_id=%s is_sampled=%t", traceID, isSampled)
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), msg)
}

if runErr != nil {
Expand Down
67 changes: 55 additions & 12 deletions task/backend/executor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"testing"
Expand All @@ -15,35 +16,52 @@ import (
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kit/prom"
"github.com/influxdata/influxdb/kit/prom/promtest"
tracetest "github.com/influxdata/influxdb/kit/tracing/testing"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/scheduler"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"go.uber.org/zap/zaptest"
)

func TestMain(m *testing.M) {
var code int
func() {
defer tracetest.SetupInMemoryTracing("task_backend_tests")()

code = m.Run()
}()

os.Exit(code)
}

type tes struct {
svc *fakeQueryService
ex *TaskExecutor
metrics *ExecutorMetrics
i *kv.Service
tcs *taskControlService
tc testCreds
}

func taskExecutorSystem(t *testing.T) tes {
aqs := newFakeQueryService()
qs := query.QueryServiceBridge{
AsyncQueryService: aqs,
}

i := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())

ex, metrics := NewExecutor(zaptest.NewLogger(t), qs, i, i, taskControlService{i})
var (
aqs = newFakeQueryService()
qs = query.QueryServiceBridge{
AsyncQueryService: aqs,
}
i = kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
tcs = &taskControlService{TaskControlService: i}
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, i, i, tcs)
)
return tes{
svc: aqs,
ex: ex,
metrics: metrics,
i: i,
tcs: tcs,
tc: createCreds(t, i),
}
}
Expand All @@ -62,10 +80,16 @@ func TestTaskExecutor(t *testing.T) {

func testQuerySuccess(t *testing.T) {
t.Parallel()

tes := taskExecutorSystem(t)

script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
var (
script = fmt.Sprintf(fmtTestScript, t.Name())
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
span = opentracing.GlobalTracer().StartSpan("test-span")
)
ctx = opentracing.ContextWithSpan(ctx, span)

task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -98,12 +122,28 @@ func testQuerySuccess(t *testing.T) {
if got := promise.Error(); got != nil {
t.Fatal(got)
}

// confirm run is removed from in-mem store
run, err = tes.i.FindRunByID(context.Background(), task.ID, run.ID)
if run != nil || err == nil || !strings.Contains(err.Error(), "run not found") {
t.Fatal("run was returned when it should have been removed from kv")
}

// ensure the run returned by TaskControlService.FinishRun(...)
// has run logs formatted as expected
if run = tes.tcs.run; run == nil {
t.Fatal("expected run returned by FinishRun to not be nil")
}

if len(run.Log) < 3 {
t.Fatalf("expected 3 run logs, found %d", len(run.Log))
}

sctx := span.Context().(jaeger.SpanContext)
expectedMessage := fmt.Sprintf("trace_id=%s is_sampled=true", sctx.TraceID())
if expectedMessage != run.Log[1].Message {
t.Errorf("expected %q, found %q", expectedMessage, run.Log[1].Message)
}
}

func testQueryFailure(t *testing.T) {
Expand Down Expand Up @@ -490,14 +530,17 @@ func testErrorHandling(t *testing.T) {

type taskControlService struct {
backend.TaskControlService

run *influxdb.Run
}

func (t taskControlService) FinishRun(ctx context.Context, taskID influxdb.ID, runID influxdb.ID) (*influxdb.Run, error) {
func (t *taskControlService) FinishRun(ctx context.Context, taskID influxdb.ID, runID influxdb.ID) (*influxdb.Run, error) {
// ensure auth set on context
_, err := icontext.GetAuthorizer(ctx)
if err != nil {
panic(err)
}

return t.TaskControlService.FinishRun(ctx, taskID, runID)
t.run, err = t.TaskControlService.FinishRun(ctx, taskID, runID)
return t.run, err
}

0 comments on commit ff38420

Please sign in to comment.