Skip to content

Commit

Permalink
Migrate from OpenTracing to OpenTelemetry instrumentation
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com>
  • Loading branch information
andreasgerstmayr committed May 3, 2024
1 parent 854caca commit f8ecdd1
Show file tree
Hide file tree
Showing 158 changed files with 769 additions and 19,530 deletions.
6 changes: 2 additions & 4 deletions cmd/tempo-query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/spf13/viper"
jaeger_config "github.com/uber/jaeger-client-go/config"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
google_grpc "google.golang.org/grpc"

"github.com/grafana/tempo/cmd/tempo-query/tempo"
Expand Down Expand Up @@ -62,8 +61,7 @@ func main() {
Store: plugin,
}, func(options []google_grpc.ServerOption) *google_grpc.Server {
return hcplugin.DefaultGRPCServer([]google_grpc.ServerOption{
google_grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
google_grpc.StatsHandler(otelgrpc.NewServerHandler()),
})
})
}
Expand Down
58 changes: 27 additions & 31 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
tlsCfg "github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"google.golang.org/grpc/metadata"

jaeger "github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -52,6 +52,8 @@ var tlsVersions = map[string]uint16{
"VersionTLS13": tls.VersionTLS13,
}

var tracer = otel.Tracer("tempo-query")

type Backend struct {
tempoBackend string
tlsEnabled bool
Expand Down Expand Up @@ -181,10 +183,10 @@ func (b *Backend) apiSchema() string {
func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger.Trace, error) {
url := fmt.Sprintf("%s://%s/api/traces/%s", b.apiSchema(), b.tempoBackend, traceID)

span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetTrace")
defer span.Finish()
ctx, span := tracer.Start(ctx, "tempo-query.GetTrace")
defer span.End()

req, err := b.newGetRequest(ctx, url, span)
req, err := b.newGetRequest(ctx, url)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -226,7 +228,7 @@ func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger
ProcessMap: []jaeger.Trace_ProcessMapping{},
}

span.LogFields(ot_log.String("msg", "build process map"))
span.AddEvent("build process map")
// otel proto conversion doesn't set jaeger processes
for _, batch := range jaegerBatches {
for _, s := range batch.Spans {
Expand All @@ -250,17 +252,17 @@ func (b *Backend) calculateTimeRange() (int64, int64) {
}

func (b *Backend) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()
ctx, span := tracer.Start(ctx, "tempo-query.GetOperations")
defer span.End()

return b.lookupTagValues(ctx, span, serviceSearchTag)
return b.lookupTagValues(ctx, serviceSearchTag)
}

func (b *Backend) GetOperations(ctx context.Context, _ jaeger_spanstore.OperationQueryParameters) ([]jaeger_spanstore.Operation, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()
ctx, span := tracer.Start(ctx, "tempo-query.GetOperations")
defer span.End()

tagValues, err := b.lookupTagValues(ctx, span, operationSearchTag)
tagValues, err := b.lookupTagValues(ctx, operationSearchTag)
if err != nil {
return nil, err
}
Expand All @@ -277,37 +279,38 @@ func (b *Backend) GetOperations(ctx context.Context, _ jaeger_spanstore.Operatio
}

func (b *Backend) FindTraces(ctx context.Context, query *jaeger_spanstore.TraceQueryParameters) ([]*jaeger.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.FindTraces")
defer span.Finish()
ctx, span := tracer.Start(ctx, "tempo-query.FindTraces")
defer span.End()

traceIDs, err := b.FindTraceIDs(ctx, query)
if err != nil {
return nil, err
}

span.LogFields(ot_log.String("msg", fmt.Sprintf("Found %d trace IDs", len(traceIDs))))
span.AddEvent(fmt.Sprintf("Found %d trace IDs", len(traceIDs)))

// for every traceID, get the full trace
var jaegerTraces []*jaeger.Trace
for _, traceID := range traceIDs {
trace, err := b.GetTrace(ctx, traceID)
if err != nil {
// TODO this seems to be an internal inconsistency error, ignore so we can still show the rest
span.LogFields(ot_log.Error(fmt.Errorf("could not get trace for traceID %v: %w", traceID, err)))
span.AddEvent(fmt.Sprintf("could not get trace for traceID %v", traceID))
span.RecordError(err)
continue
}

jaegerTraces = append(jaegerTraces, trace)
}

span.LogFields(ot_log.String("msg", fmt.Sprintf("Returning %d traces", len(jaegerTraces))))
span.AddEvent(fmt.Sprintf("Returning %d traces", len(jaegerTraces)))

return jaegerTraces, nil
}

func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.TraceQueryParameters) ([]jaeger.TraceID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.FindTraceIDs")
defer span.Finish()
ctx, span := tracer.Start(ctx, "tempo-query.FindTraceIDs")
defer span.End()

url := url.URL{
Scheme: b.apiSchema(),
Expand All @@ -333,7 +336,7 @@ func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.Trac

url.RawQuery = urlQuery.Encode()

req, err := b.newGetRequest(ctx, url.String(), span)
req, err := b.newGetRequest(ctx, url.String())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,7 +401,7 @@ func createTagsQueryParam(service string, operation string, tags map[string]stri
return tagsBuilder.String(), nil
}

func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, tagName string) ([]string, error) {
func (b *Backend) lookupTagValues(ctx context.Context, tagName string) ([]string, error) {
var url string

if b.QueryServicesDuration == nil {
Expand All @@ -408,7 +411,7 @@ func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, ta
url = fmt.Sprintf("%s://%s/api/search/tag/%s/values?start=%d&end=%d", b.apiSchema(), b.tempoBackend, tagName, startTime, endTime)
}

req, err := b.newGetRequest(ctx, url, span)
req, err := b.newGetRequest(ctx, url)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -445,20 +448,13 @@ func (b *Backend) WriteSpan(context.Context, *jaeger.Span) error {
return nil
}

func (b *Backend) newGetRequest(ctx context.Context, url string, span opentracing.Span) (*http.Request, error) {
func (b *Backend) newGetRequest(ctx context.Context, url string) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}

if tracer := opentracing.GlobalTracer(); tracer != nil {
carrier := make(opentracing.TextMapCarrier, len(req.Header))
for k, v := range req.Header {
carrier.Set(k, v[0])
}
// this is not really loggable or anything we can react to. just ignoring this error
_ = tracer.Inject(span.Context(), opentracing.TextMap, carrier)
}
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))

// currently Jaeger Query will only propagate bearer token to the grpc backend and no other headers
// so we are going to extract the tenant id from the header, if it exists and use it
Expand Down
32 changes: 4 additions & 28 deletions cmd/tempo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import (
"github.com/grafana/dskit/flagext"
dslog "github.com/grafana/dskit/log"
"github.com/grafana/dskit/spanprofiler"
"github.com/grafana/dskit/tracing"
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"go.opentelemetry.io/otel"
oc_bridge "go.opentelemetry.io/otel/bridge/opencensus"
ot_bridge "go.opentelemetry.io/otel/bridge/opentracing"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -89,12 +88,7 @@ func main() {
}

// Init tracer
var shutdownTracer func()
if config.UseOTelTracer {
shutdownTracer, err = installOpenTelemetryTracer(config)
} else {
shutdownTracer, err = installOpenTracingTracer(config)
}
shutdownTracer, err := installOpenTelemetryTracer(config)
if err != nil {
level.Error(log.Logger).Log("msg", "error initialising tracer", "err", err)
os.Exit(1)
Expand Down Expand Up @@ -221,33 +215,15 @@ func loadConfig() (*app.Config, bool, error) {
return config, configVerify, nil
}

func installOpenTracingTracer(config *app.Config) (func(), error) {
level.Info(log.Logger).Log("msg", "initialising OpenTracing tracer")

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace, err := tracing.NewFromEnv(fmt.Sprintf("%s-%s", appName, config.Target))
if err != nil {
return nil, fmt.Errorf("error initialising tracer: %w", err)
}
ot.SetGlobalTracer(spanprofiler.NewTracer(ot.GlobalTracer()))

return func() {
if err := trace.Close(); err != nil {
level.Error(log.Logger).Log("msg", "error closing tracing", "err", err)
os.Exit(1)
}
}, nil
}

func installOpenTelemetryTracer(config *app.Config) (func(), error) {
level.Info(log.Logger).Log("msg", "initialising OpenTelemetry tracer")

// for now, migrate OpenTracing Jaeger environment variables
migrateJaegerEnvironmentVariables()

exp, err := jaeger.New(jaeger.WithCollectorEndpoint())
exp, err := otlptracehttp.New(context.Background(), otlptracehttp.WithInsecure())
if err != nil {
return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
}

resources, err := resource.New(context.Background(),
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ require (
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240311184239-73feada6c0d7
github.com/grafana/e2e v0.1.1
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/go-hclog v1.6.2
github.com/hashicorp/go-plugin v1.6.0
github.com/jaegertracing/jaeger v1.55.0
Expand All @@ -45,7 +44,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.97.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.97.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.97.0
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4/v4 v4.1.21
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -74,7 +73,6 @@ require (
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/bridge/opencensus v1.23.1
go.opentelemetry.io/otel/bridge/opentracing v1.21.0
go.opentelemetry.io/otel/exporters/jaeger v1.17.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand Down Expand Up @@ -118,6 +116,7 @@ require (
go.opentelemetry.io/collector/otelcol v0.95.0
go.opentelemetry.io/collector/processor v0.97.0
go.opentelemetry.io/collector/receiver v0.97.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0
go.opentelemetry.io/proto/otlp v1.1.0
Expand Down Expand Up @@ -317,7 +316,6 @@ require (
go.opentelemetry.io/collector/featuregate v1.4.0 // indirect
go.opentelemetry.io/collector/service v0.95.0 // indirect
go.opentelemetry.io/contrib/config v0.4.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.23.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ=
github.com/hashicorp/consul/api v1.25.1 h1:CqrdhYzc8XZuPnhIYZWH45toM0LB9ZeYr/gvpLVI3PE=
Expand Down Expand Up @@ -1151,8 +1150,6 @@ go.opentelemetry.io/otel/bridge/opencensus v1.23.1 h1:QmGawK5vW6UdHXypZwWUuag27d
go.opentelemetry.io/otel/bridge/opencensus v1.23.1/go.mod h1:TNxwRvdhakpilWQImJM/a4yd/8mgqDhRVC9Bph9wI/k=
go.opentelemetry.io/otel/bridge/opentracing v1.21.0 h1:7AfuSFhyvBmt/0YskcdxDyTdHPjQfrHcZQo6Zu5srF4=
go.opentelemetry.io/otel/bridge/opentracing v1.21.0/go.mod h1:giUOMajCV30LvlPHnzRDNBvDV3/NmrGVrqCp/1suDok=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1 h1:ZqRWZJGHXV/1yCcEEVJ6/Uz2JtM79DNS8OZYa3vVY/A=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1/go.mod h1:D7ynngPWlGJrqyGSDOdscuv7uqttfCE3jcBvffDv9y4=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 h1:q/Nj5/2TZRIt6PderQ9oU0M00fzoe8UZuINGw6ETGTw=
Expand Down
8 changes: 5 additions & 3 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/strutil"
"github.com/segmentio/fasthash/fnv1a"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"

Expand Down Expand Up @@ -108,6 +108,8 @@ var (
})
)

var tracer = otel.Tracer("distributor")

// rebatchedTrace is used to more cleanly pass the set of data
type rebatchedTrace struct {
id []byte
Expand Down Expand Up @@ -307,8 +309,8 @@ func (d *Distributor) extractBasicInfo(ctx context.Context, traces ptrace.Traces

// PushTraces pushes a batch of traces
func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "distributor.PushTraces")
defer span.Finish()
ctx, span := tracer.Start(ctx, "distributor.PushTraces")
defer span.End()

userID, spanCount, size, err := d.extractBasicInfo(ctx, traces)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions modules/distributor/forwarder/otlpgrpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/middleware"
grpcmw "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -122,6 +121,7 @@ func (f *Forwarder) newTraceOTLPGRPCClientAndConn(ctx context.Context, endpoint
grpc.WithTransportCredentials(creds),
grpc.WithUnaryInterceptor(grpcmw.ChainUnaryClient(unaryClientInterceptor...)),
grpc.WithStreamInterceptor(grpcmw.ChainStreamClient(streamClientInterceptor...)),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)

grpcClientConn, err := grpc.DialContext(ctx, endpoint, opts...)
Expand All @@ -134,10 +134,8 @@ func (f *Forwarder) newTraceOTLPGRPCClientAndConn(ctx context.Context, endpoint

func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
}, []grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
}
}
Loading

0 comments on commit f8ecdd1

Please sign in to comment.