Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/internal: OpenTelemetry tracing GRPCTraceBinPropagator #7677

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address style, clarification review comments
  • Loading branch information
purnesh42H committed Oct 9, 2024
commit ec54eedcdcecf62977717ca9ac8d5ebc9d7605fa
106 changes: 56 additions & 50 deletions stats/opentelemetry/internal/grpc_trace_bin_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,111 +22,117 @@ import (
"context"
"encoding/base64"

"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
otelpropagation "go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
internaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

// TODO: Move out of internal as part of open telemetry API

// GRPCTraceBinPropagator is TextMapPropagator to propagate cross-cutting
// concerns as both text and binary key-value pairs within a carrier that
// travels in-band across process boundaries.
// GRPCTraceBinPropagator is an OpenTelemetry TextMapPropagator which is used
// to extract and inject trace context data from and into messages exchanged by
// gRPC applications. It propagates trace data in binary format using the
// 'grpc-trace-bin' header.
type GRPCTraceBinPropagator struct{}

// Inject set cross-cutting concerns from the Context into the carrier.
// Inject sets OpenTelemetry trace context information from the Context into
// the carrier.
//
// If carrier is carrier.CustomMapCarrier then SetBinary (fast path) is used,
// otherwise Set (slow path) with encoding is used.
func (p GRPCTraceBinPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
span := trace.SpanFromContext(ctx)
// If the carrier is a CustomCarrier, trace data is directly injected in a
// binary format using the 'grpc-trace-bin' header (fast path). Otherwise,
// the trace data is base64 encoded and injected using the same header in
// text format (slow path).
func (p GRPCTraceBinPropagator) Inject(ctx context.Context, carrier otelpropagation.TextMapCarrier) {
span := oteltrace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return
}

binaryData := Binary(span.SpanContext())
if binaryData == nil {
bd := Binary(span.SpanContext())
if bd == nil {
return
}

if customCarrier, ok := carrier.(otelinternaltracing.CustomCarrier); ok {
customCarrier.SetBinary(binaryData) // fast path: set the binary data without encoding
} else {
carrier.Set(otelinternaltracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(binaryData)) // slow path: set the binary data with encoding
if cc, ok := carrier.(internaltracing.CustomCarrier); ok {
cc.SetBinary(bd)
return
}
carrier.Set(internaltracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(bd))
}

// Extract reads cross-cutting concerns from the carrier into a Context.
// Extract reads OpenTelemetry trace context information from the carrier into a
// Context.
//
// If carrier is carrier.CustomCarrier then GetBinary (fast path) is used,
// otherwise Get (slow path) with decoding is used.
func (p GRPCTraceBinPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
var binaryData []byte

if customCarrier, ok := carrier.(otelinternaltracing.CustomCarrier); ok {
binaryData, _ = customCarrier.GetBinary()
// If the carrier is a CustomCarrier, trace data is read directly in a binary
// format from the 'grpc-trace-bin' header (fast path). Otherwise, the trace
// data is base64 decoded from the same header in text format (slow path).
func (p GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagation.TextMapCarrier) context.Context {
var bd []byte

if cc, ok := carrier.(internaltracing.CustomCarrier); ok {
bd = cc.GetBinary()
} else {
binaryData, _ = base64.StdEncoding.DecodeString(carrier.Get(otelinternaltracing.GRPCTraceBinHeaderKey))
bd, _ = base64.StdEncoding.DecodeString(carrier.Get(internaltracing.GRPCTraceBinHeaderKey))
}
if binaryData == nil {
if bd == nil {
return ctx
}

spanContext, ok := FromBinary([]byte(binaryData))
spanContext, ok := FromBinary([]byte(bd))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch spanContext to sc please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if !ok {
return ctx
}

return trace.ContextWithRemoteSpanContext(ctx, spanContext)
return oteltrace.ContextWithRemoteSpanContext(ctx, spanContext)
}

// Fields returns the keys whose values are set with Inject.
//
// GRPCTraceBinPropagator will only have `grpc-trace-bin` field.
// Fields always returns a slice containing only `grpc-trace-bin` header key
// because the GRPCTraceBinPropagator only uses the 'grpc-trace-bin' header for
// propagating trace context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rewrite this comment to say what Fields() returns from a conceptual point of view. What does Fields mean? Fields of what? Why is it returning a key? Or rename the function to be semantically closer to the intended idea of what this represents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Basically the propagation.TextMapCarrier interface which GRPCTraceBinPropagator implements has Keys()" your reply to another comment - should we call this Keys() instead?

Copy link
Contributor Author

@purnesh42H purnesh42H Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GRPCTraceBinPropagator is implementing https://github.com/open-telemetry/opentelemetry-go/blob/main/propagation/propagation.go#L82 which has method Fields which is what i had my previous comment.

// Fields returns the keys whose values are set with Inject.
Fields() []string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included // Fields returns the keys whose values are set with Inject. as well in docstring

func (p GRPCTraceBinPropagator) Fields() []string {
return []string{otelinternaltracing.GRPCTraceBinHeaderKey}
return []string{internaltracing.GRPCTraceBinHeaderKey}
}

// Binary returns the binary format representation of a SpanContext.
//
// If sc is the zero value, Binary returns nil.
func Binary(sc trace.SpanContext) []byte {
if sc.Equal(trace.SpanContext{}) {
// If sc is the zero value, returns nil.
func Binary(sc oteltrace.SpanContext) []byte {
if sc.Equal(oteltrace.SpanContext{}) {
return nil
}
var b [29]byte
traceID := trace.TraceID(sc.TraceID())
traceID := oteltrace.TraceID(sc.TraceID())
copy(b[2:18], traceID[:])
b[18] = 1
spanID := trace.SpanID(sc.SpanID())
spanID := oteltrace.SpanID(sc.SpanID())
copy(b[19:27], spanID[:])
b[27] = 2
b[28] = uint8(trace.TraceFlags(sc.TraceFlags()))
b[28] = uint8(oteltrace.TraceFlags(sc.TraceFlags()))
return b[:]
}

// FromBinary returns the SpanContext represented by b.
//
// If b has an unsupported version ID or contains no TraceID, FromBinary
// returns with ok==false.
func FromBinary(b []byte) (sc trace.SpanContext, ok bool) {
// returns with zero value SpanContext and false.
func FromBinary(b []byte) (oteltrace.SpanContext, bool) {
if len(b) == 0 || b[0] != 0 {
return trace.SpanContext{}, false
return oteltrace.SpanContext{}, false
}
b = b[1:]

if len(b) >= 17 && b[0] == 0 {
sc = sc.WithTraceID(trace.TraceID(b[1:17]))
b = b[17:]
} else {
return trace.SpanContext{}, false
if len(b) < 17 || b[0] != 0 {
return oteltrace.SpanContext{}, false
}

sc := oteltrace.SpanContext{}
sc = sc.WithTraceID(oteltrace.TraceID(b[1:17]))
b = b[17:]
if len(b) >= 9 && b[0] == 1 {
sc = sc.WithSpanID(trace.SpanID(b[1:9]))
sc = sc.WithSpanID(oteltrace.SpanID(b[1:9]))
b = b[9:]
}
if len(b) >= 2 && b[0] == 2 {
sc = sc.WithTraceFlags(trace.TraceFlags(b[1]))
sc = sc.WithTraceFlags(oteltrace.TraceFlags(b[1]))
}
return sc, true
}
34 changes: 17 additions & 17 deletions stats/opentelemetry/internal/grpc_trace_bin_propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"encoding/base64"
"testing"

"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
otelpropagation "go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
internaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

// TODO: Move out of internal as part of open telemetry API
Expand All @@ -43,20 +43,20 @@ func Test(t *testing.T) {

func (s) TestInject(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top level comment please explaining what this test does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

propagator := GRPCTraceBinPropagator{}
spanContext := trace.NewSpanContext(trace.SpanContextConfig{
spanContext := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24},
TraceFlags: trace.FlagsSampled,
TraceFlags: oteltrace.FlagsSampled,
})
traceCtx, traceCancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer cancel() here is preferred over calling it at the operation at the end. Defer is preferred because what happens if your test fails before it hits the last line/operation. It should still run the cleanup in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

traceCtx = trace.ContextWithSpanContext(traceCtx, spanContext)
traceCtx = oteltrace.ContextWithSpanContext(traceCtx, spanContext)

t.Run("Fast path with CustomCarrier", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
carrier := otelinternaltracing.NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{}))
carrier := internaltracing.NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{}))
propagator.Inject(traceCtx, carrier)

got := stats.OutgoingTrace(*carrier.Ctx)
got := stats.OutgoingTrace(*carrier.Context())
want := Binary(spanContext)
if string(got) != string(want) {
t.Fatalf("got = %v, want %v", got, want)
Expand All @@ -65,10 +65,10 @@ func (s) TestInject(t *testing.T) {
})

t.Run("Slow path with TextMapCarrier", func(t *testing.T) {
carrier := propagation.MapCarrier{}
carrier := otelpropagation.MapCarrier{}
propagator.Inject(traceCtx, carrier)

got := carrier.Get(otelinternaltracing.GRPCTraceBinHeaderKey)
got := carrier.Get(internaltracing.GRPCTraceBinHeaderKey)
want := base64.StdEncoding.EncodeToString(Binary(spanContext))
if got != want {
t.Fatalf("got = %v, want %v", got, want)
Expand All @@ -80,19 +80,19 @@ func (s) TestInject(t *testing.T) {

func (s) TestExtract(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top level comment here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

propagator := GRPCTraceBinPropagator{}
spanContext := trace.NewSpanContext(trace.SpanContextConfig{
spanContext := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24},
TraceFlags: trace.FlagsSampled,
TraceFlags: oteltrace.FlagsSampled,
Remote: true,
})
binaryData := Binary(spanContext)

t.Run("Fast path with CustomCarrier", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
carrier := otelinternaltracing.NewCustomCarrier(stats.SetIncomingTrace(ctx, binaryData))
carrier := internaltracing.NewCustomCarrier(stats.SetIncomingTrace(ctx, binaryData))
traceCtx := propagator.Extract(ctx, carrier)
got := trace.SpanContextFromContext(traceCtx)
got := oteltrace.SpanContextFromContext(traceCtx)

if !got.Equal(spanContext) {
t.Fatalf("got = %v, want %v", got, spanContext)
Expand All @@ -101,12 +101,12 @@ func (s) TestExtract(t *testing.T) {
})

t.Run("Slow path with TextMapCarrier", func(t *testing.T) {
carrier := propagation.MapCarrier{
otelinternaltracing.GRPCTraceBinHeaderKey: base64.StdEncoding.EncodeToString(binaryData),
carrier := otelpropagation.MapCarrier{
internaltracing.GRPCTraceBinHeaderKey: base64.StdEncoding.EncodeToString(binaryData),
}
ctx, cancel := context.WithCancel(context.Background())
traceCtx := propagator.Extract(ctx, carrier)
got := trace.SpanContextFromContext(traceCtx)
got := oteltrace.SpanContextFromContext(traceCtx)

if !got.Equal(spanContext) {
t.Fatalf("got = %v, want %v", got, spanContext)
Expand Down
38 changes: 22 additions & 16 deletions stats/opentelemetry/internal/tracing/custom_carrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ package tracing

import (
"context"
"errors"

"go.opentelemetry.io/otel/propagation"
otelpropagation "go.opentelemetry.io/otel/propagation"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
Expand All @@ -37,23 +36,24 @@ const GRPCTraceBinHeaderKey = "grpc-trace-bin"
// retrieve any propagated key-value pairs in text format along with binary
// format for `grpc-trace-bin` header
type CustomCarrier struct {
propagation.TextMapCarrier
otelpropagation.TextMapCarrier

Ctx *context.Context
ctx *context.Context
}

// NewCustomCarrier creates a new CustomMapCarrier with
// the given context.
func NewCustomCarrier(ctx context.Context) CustomCarrier {
return CustomCarrier{
Ctx: &ctx,
ctx: &ctx,
}
}

// Get returns the string value associated with the passed key from the gRPC
// context. It returns an empty string if the key is not present in the
// context.
func (c CustomCarrier) Get(key string) string {
md, ok := metadata.FromIncomingContext(*c.Ctx)
md, ok := metadata.FromIncomingContext(*c.ctx)
if !ok {
return ""
}
Expand All @@ -67,37 +67,43 @@ func (c CustomCarrier) Get(key string) string {
// Set stores the key-value pair in string format in the gRPC context.
// If the key already exists, its value will be overwritten.
func (c CustomCarrier) Set(key, value string) {
md, ok := metadata.FromOutgoingContext(*c.Ctx)
md, ok := metadata.FromOutgoingContext(*c.ctx)
if !ok {
md = metadata.MD{}
}
md.Set(key, value)
*c.Ctx = metadata.NewOutgoingContext(*c.Ctx, md)
*c.ctx = metadata.NewOutgoingContext(*c.ctx, md)
}

// GetBinary returns the binary value from the gRPC context in the incoming RPC,
// associated with the header `grpc-trace-bin`.
func (c CustomCarrier) GetBinary() ([]byte, error) {
values := stats.Trace(*c.Ctx)
func (c CustomCarrier) GetBinary() []byte {
values := stats.Trace(*c.ctx)
if len(values) == 0 {
return nil, errors.New("`grpc-trace-bin` header not found")
return nil
}

return values, nil
return values
}

// SetBinary sets the binary value to the gRPC context, which will be sent in
// the outgoing RPC with the header grpc-trace-bin.
// the outgoing RPC with the header `grpc-trace-bin`.
func (c CustomCarrier) SetBinary(value []byte) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the point of this API. It seems to do things by deferring to operations on a context either to the stats package or the metadata package? What is the function of this API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this requires a bit more background on migration path from opencensus to opentelemetry (see A72).

The design proposes that while gRPC OpenCensus directly interacts with metadata API, gRPC Open Telemetry will use standardized https://pkg.go.dev/go.opentelemetry.io/otel/propagation package for context propagation by encoding them in metadata for the following benefits:

  • Full integration with OpenTelemetry APIs that is easier for users to reason about.
  • Make it possible to plugin other propagators that the community supports.
  • Flexible API that allows clean and simple migration paths to a different propagator.

As of today, OpenTelemetry propagator API only supports https://pkg.go.dev/go.opentelemetry.io/otel/propagation#TextMapPropagator, that is to send string key/value pairs between the client and server, which is different from the binary header that gRPC currently uses. The future roadmap to support binary propagators at OpenTelemetry is unclear. So, gRPC will use propagator API in TextMap format with optimization path to work around the binary propagator API. Once the Open Telemetry binary propagator API is available in the future, we can continuously integrate with those API with little effort.

Therefore, we need a custom implementation for the Carrier that supports both binary and text values. For binary header grpc-trace-bin we are reusing the existing stats package to conform with existing grpc design.

*c.Ctx = stats.SetTrace(*c.Ctx, value)
*c.ctx = stats.SetTrace(*c.ctx, value)
}

// Keys lists the keys stored in the gRPC context for the outgoing RPC.
// Keys returns the keys stored in the gRPC context for the outgoing RPC.
func (c CustomCarrier) Keys() []string {
md, _ := metadata.FromOutgoingContext(*c.Ctx)
md, _ := metadata.FromOutgoingContext(*c.ctx)
keys := make([]string, 0, len(md))
for k := range md {
keys = append(keys, k)
}
return keys
}

// Context returns the underlying *context.Context associated with the
// CustomCarrier.
func (c CustomCarrier) Context() *context.Context {
return c.ctx
}
Loading