diff --git a/ddtrace/ext/tags.go b/ddtrace/ext/tags.go index a18a60833b..51d1cd3645 100644 --- a/ddtrace/ext/tags.go +++ b/ddtrace/ext/tags.go @@ -15,6 +15,7 @@ const ( TargetPort = "out.port" // SamplingPriority is the tag that marks the sampling priority of a span. + // Deprecated in favor of ManualKeep and ManualDrop. SamplingPriority = "sampling.priority" // SQLType sets the sql type tag. diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 7f61450d36..f74cf8ebda 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -38,6 +38,9 @@ var ( // defaultSocketDSD specifies the socket path to use for connecting to the statsd server. // Replaced in tests defaultSocketDSD = "/var/run/datadog/dsd.socket" + + // defaultMaxTagsHeaderLen specifies the default maximum length of the X-Datadog-Tags header value. + defaultMaxTagsHeaderLen = 512 ) // config holds the tracer configuration. @@ -256,7 +259,9 @@ func newConfig(opts ...StartOption) *config { c.transport = newHTTPTransport(c.agentAddr, c.httpClient) } if c.propagator == nil { - c.propagator = NewPropagator(nil) + c.propagator = NewPropagator(&PropagatorConfig{ + MaxTagsHeaderLen: internal.IntEnv("DD_TRACE_TAGS_PROPAGATION_MAX_LENGTH", defaultMaxTagsHeaderLen), + }) } if c.logger != nil { log.UseLogger(c.logger) diff --git a/ddtrace/tracer/sampler.go b/ddtrace/tracer/sampler.go index e9931686c5..e85218a47f 100644 --- a/ddtrace/tracer/sampler.go +++ b/ddtrace/tracer/sampler.go @@ -20,6 +20,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" "golang.org/x/time/rate" ) @@ -150,9 +151,9 @@ func (ps *prioritySampler) getRate(spn *span) float64 { func (ps *prioritySampler) apply(spn *span) { rate := ps.getRate(spn) if sampledByRate(spn.TraceID, rate) { - spn.SetTag(ext.SamplingPriority, ext.PriorityAutoKeep) + spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate, rate) } else { - spn.SetTag(ext.SamplingPriority, ext.PriorityAutoReject) + spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate, rate) } spn.SetTag(keySamplingPriorityRate, rate) } @@ -311,15 +312,15 @@ func (rs *rulesSampler) apply(span *span) bool { func (rs *rulesSampler) applyRate(span *span, rate float64, now time.Time) { span.SetTag(keyRulesSamplerAppliedRate, rate) if !sampledByRate(span.TraceID, rate) { - span.SetTag(ext.SamplingPriority, ext.PriorityUserReject) + span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate) return } sampled, rate := rs.limiter.allowOne(now) if sampled { - span.SetTag(ext.SamplingPriority, ext.PriorityUserKeep) + span.setSamplingPriority(ext.PriorityUserKeep, samplernames.RuleRate, rate) } else { - span.SetTag(ext.SamplingPriority, ext.PriorityUserReject) + span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate) } span.SetTag(keyRulesSamplerLimiterRate, rate) } diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index 630271989f..68a18e6b7f 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -10,6 +10,7 @@ package tracer import ( "context" "fmt" + "math" "os" "reflect" "runtime" @@ -25,6 +26,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" "github.com/DataDog/datadog-agent/pkg/obfuscate" "github.com/tinylib/msgp/msgp" @@ -148,6 +150,27 @@ func (s *span) SetTag(key string, value interface{}) { s.setMeta(key, fmt.Sprint(value)) } +// setSamplingPriority locks then span, then updates the sampling priority. +// It also updates the trace's sampling priority. +func (s *span) setSamplingPriority(priority int, sampler samplernames.SamplerName, rate float64) { + s.Lock() + defer s.Unlock() + s.setSamplingPriorityLocked(priority, sampler, rate) +} + +// setSamplingPriorityLocked updates the sampling priority. +// It also updates the trace's sampling priority. +func (s *span) setSamplingPriorityLocked(priority int, sampler samplernames.SamplerName, rate float64) { + // We don't lock spans when flushing, so we could have a data race when + // modifying a span as it's being flushed. This protects us against that + // race, since spans are marked `finished` before we flush them. + if s.finished { + return + } + s.setMetric(keySamplingPriority, float64(priority)) + s.context.setSamplingPriority(s.Service, priority, sampler, rate) +} + // setTagError sets the error tag. It accounts for various valid scenarios. // This method is not safe for concurrent use. func (s *span) setTagError(value interface{}, cfg errorConfig) { @@ -266,11 +289,11 @@ func (s *span) setTagBool(key string, v bool) { } case ext.ManualDrop: if v { - s.setMetric(ext.SamplingPriority, ext.PriorityUserReject) + s.setSamplingPriorityLocked(ext.PriorityUserReject, samplernames.Manual, math.NaN()) } case ext.ManualKeep: if v { - s.setMetric(ext.SamplingPriority, ext.PriorityUserKeep) + s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.Manual, math.NaN()) } default: if v { @@ -289,10 +312,14 @@ func (s *span) setMetric(key string, v float64) { } delete(s.Meta, key) switch key { + case ext.ManualKeep: + if v == float64(samplernames.AppSec) { + s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.AppSec, math.NaN()) + } case ext.SamplingPriority: - // setting sampling priority per spec - s.Metrics[keySamplingPriority] = v - s.context.setSamplingPriority(int(v)) + // ext.SamplingPriority is deprecated in favor of ext.ManualKeep and ext.ManualDrop. + // We have it here for backward compatibility. + s.setSamplingPriorityLocked(int(v), samplernames.Manual, math.NaN()) default: s.Metrics[key] = v } @@ -525,6 +552,7 @@ func (s *span) Format(f fmt.State, c rune) { const ( keySamplingPriority = "_sampling_priority_v1" keySamplingPriorityRate = "_dd.agent_psr" + keyUpstreamServices = "_dd.p.upstream_services" keyOrigin = "_dd.origin" keyHostname = "_dd.hostname" keyRulesSamplerAppliedRate = "_dd.rule_psr" diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index 7730b4758e..0431bae885 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -456,7 +456,9 @@ func TestSpanError(t *testing.T) { span.Finish() span.SetTag(ext.Error, err) assert.Equal(int32(0), span.Error) - assert.Equal(nMeta, len(span.Meta)) + // '+1' is `_dd.p.upstream_services`, + // because we add it into Meta of the first span, when root is finished. + assert.Equal(nMeta+1, len(span.Meta)) assert.Equal("", span.Meta["error.msg"]) assert.Equal("", span.Meta["error.type"]) assert.Equal("", span.Meta["error.stack"]) diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index 950b7c7590..7297fa8349 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -6,6 +6,8 @@ package tracer import ( + "math" + "strconv" "sync" "sync/atomic" @@ -13,6 +15,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) var _ ddtrace.SpanContext = (*spanContext)(nil) @@ -91,11 +94,11 @@ func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { } } -func (c *spanContext) setSamplingPriority(p int) { +func (c *spanContext) setSamplingPriority(service string, p int, sampler samplernames.SamplerName, rate float64) { if c.trace == nil { c.trace = newTrace() } - c.trace.setSamplingPriority(float64(p)) + c.trace.setSamplingPriority(service, p, sampler, rate) } func (c *spanContext) samplingPriority() (p int, ok bool) { @@ -144,13 +147,15 @@ const ( // priority, the root reference and a buffer of the spans which are part of the // trace, if these exist. type trace struct { - mu sync.RWMutex // guards below fields - spans []*span // all the spans that are part of this trace - finished int // the number of finished spans - full bool // signifies that the span buffer is full - priority *float64 // sampling priority - locked bool // specifies if the sampling priority can be altered - samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent. + mu sync.RWMutex // guards below fields + spans []*span // all the spans that are part of this trace + tags map[string]string // trace level tags + upstreamServices string // _dd.p.upstream_services value from the upstream service + finished int // the number of finished spans + full bool // signifies that the span buffer is full + priority *float64 // sampling priority + locked bool // specifies if the sampling priority can be altered + samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent. // root specifies the root of the trace, if known; it is nil when a span // context is extracted from a carrier, at which point there are no spans in @@ -191,10 +196,10 @@ func (t *trace) samplingPriority() (p int, ok bool) { return t.samplingPriorityLocked() } -func (t *trace) setSamplingPriority(p float64) { +func (t *trace) setSamplingPriority(service string, p int, sampler samplernames.SamplerName, rate float64) { t.mu.Lock() defer t.mu.Unlock() - t.setSamplingPriorityLocked(p) + t.setSamplingPriorityLocked(service, p, sampler, rate) } func (t *trace) keep() { @@ -205,7 +210,14 @@ func (t *trace) drop() { atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop)) } -func (t *trace) setSamplingPriorityLocked(p float64) { +func (t *trace) setTag(key, value string) { + if t.tags == nil { + t.tags = make(map[string]string, 1) + } + t.tags[key] = value +} + +func (t *trace) setSamplingPriorityLocked(service string, p int, sampler samplernames.SamplerName, rate float64) { if t.locked { return } @@ -217,7 +229,14 @@ func (t *trace) setSamplingPriorityLocked(p float64) { if t.priority == nil { t.priority = new(float64) } - *t.priority = p + *t.priority = float64(p) + if sampler != samplernames.Upstream { + if t.upstreamServices != "" { + t.setTag(keyUpstreamServices, t.upstreamServices+";"+compactUpstreamServices(service, p, sampler, rate)) + } else { + t.setTag(keyUpstreamServices, compactUpstreamServices(service, p, sampler, rate)) + } + } } // push pushes a new span into the trace. If the buffer is full, it returns @@ -240,7 +259,7 @@ func (t *trace) push(sp *span) { return } if v, ok := sp.Metrics[keySamplingPriority]; ok { - t.setSamplingPriorityLocked(v) + t.setSamplingPriorityLocked(sp.Service, int(v), samplernames.Upstream, math.NaN()) } t.spans = append(t.spans, sp) if haveTracer { @@ -269,6 +288,16 @@ func (t *trace) finishedOne(s *span) { t.root.setMetric(keySamplingPriority, *t.priority) t.locked = true } + if len(t.spans) > 0 && s == t.spans[0] { + // first span in chunk finished, lock down the tags + // + // TODO(barbayar): make sure this doesn't happen in vain when switching to + // the new wire format. We won't need to set the tags on the first span + // in the chunk there. + for k, v := range t.tags { + s.setMeta(k, v) + } + } if len(t.spans) != t.finished { return } @@ -292,3 +321,14 @@ func (t *trace) finishedOne(s *span) { } tr.pushTrace(t.spans) } + +func compactUpstreamServices(service string, priority int, sampler samplernames.SamplerName, rate float64) string { + sb64 := b64Encode(service) + p := strconv.Itoa(priority) + s := strconv.Itoa(int(sampler)) + r := "" + if !math.IsNaN(rate) { + r = strconv.FormatFloat(rate, 'f', 4, 64) + } + return sb64 + "|" + p + "|" + s + "|" + r +} diff --git a/ddtrace/tracer/spancontext_test.go b/ddtrace/tracer/spancontext_test.go index 7a00f2a071..38f91c2772 100644 --- a/ddtrace/tracer/spancontext_test.go +++ b/ddtrace/tracer/spancontext_test.go @@ -7,6 +7,7 @@ package tracer import ( "context" + "math" "strings" "sync" "testing" @@ -16,6 +17,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) func setupteardown(start, max int) func() { @@ -75,6 +77,21 @@ func TestAsyncSpanRace(t *testing.T) { } }() wg.Add(1) + go func() { + defer wg.Done() + select { + case <-done: + root.Finish() + for i := 0; i < 500; i++ { + for range root.(*span).Meta { + // this range simulates iterating over the meta map + // as we do when encoding msgpack upon flushing. + } + } + return + } + }() + wg.Add(1) go func() { defer wg.Done() select { @@ -405,6 +422,24 @@ func TestSpanContextIteratorBreak(t *testing.T) { assert.Len(t, got, 0) } +func TestBuildNewUpstreamServices(t *testing.T) { + var testCases = []struct { + service string + priority int + sampler samplernames.SamplerName + rate float64 + expected string + }{ + {"service-account", 1, samplernames.AgentRate, 0.99, "c2VydmljZS1hY2NvdW50|1|1|0.9900"}, + {"service-storage", 2, samplernames.Manual, math.NaN(), "c2VydmljZS1zdG9yYWdl|2|4|"}, + {"service-video", 1, samplernames.RuleRate, 1, "c2VydmljZS12aWRlbw|1|3|1.0000"}, + } + + for _, tt := range testCases { + assert.Equal(t, tt.expected, compactUpstreamServices(tt.service, tt.priority, tt.sampler, tt.rate)) + } +} + // testLogger implements a mock Printer. type testLogger struct { mu sync.RWMutex diff --git a/ddtrace/tracer/textmap.go b/ddtrace/tracer/textmap.go index 505ced0823..3224016fa0 100644 --- a/ddtrace/tracer/textmap.go +++ b/ddtrace/tracer/textmap.go @@ -7,6 +7,7 @@ package tracer import ( "fmt" + "math" "net/http" "os" "strconv" @@ -15,6 +16,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) // HTTPHeadersCarrier wraps an http.Header as a TextMapWriter and TextMapReader, allowing @@ -90,6 +92,9 @@ const ( // It is used with the Synthetics product and usually has the value "synthetics". const originHeader = "x-datadog-origin" +// traceTagsHeader holds the propagated trace tags +const traceTagsHeader = "x-datadog-tags" + // PropagatorConfig defines the configuration for initializing a propagator. type PropagatorConfig struct { // BaggagePrefix specifies the prefix that will be used to store baggage @@ -107,6 +112,9 @@ type PropagatorConfig struct { // PriorityHeader specifies the map key that will be used to store the sampling priority. // It deafults to DefaultPriorityHeader. PriorityHeader string + + // MaxTagsHeaderLen specifies the maximum length of trace tags header value. + MaxTagsHeaderLen int } // NewPropagator returns a new propagator which uses TextMap to inject @@ -232,6 +240,35 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr for k, v := range ctx.baggage { writer.Set(p.cfg.BaggagePrefix+k, v) } + // propagate trace tags + var sb strings.Builder + if ctx.trace != nil { + ctx.trace.mu.RLock() + for k, v := range ctx.trace.tags { + if !strings.HasPrefix(k, "_dd.p.") { + continue + } + if err := isValidPropagatableTraceTag(k, v); err != nil { + log.Warn("won't propagate tag '%s' (err: %s)", k, err.Error()) + continue + } + if sb.Len()+len(k)+len(v) > p.cfg.MaxTagsHeaderLen { + sb.Reset() + log.Warn("won't propagate trace tags (err: max trace tags header len (%d) reached)", p.cfg.MaxTagsHeaderLen) + break + } + if sb.Len() > 0 { + sb.WriteByte(',') + } + sb.WriteString(k) + sb.WriteByte('=') + sb.WriteString(v) + } + ctx.trace.mu.RUnlock() + if sb.Len() > 0 { + writer.Set(traceTagsHeader, sb.String()) + } + } return nil } @@ -265,9 +302,18 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext, if err != nil { return ErrSpanContextCorrupted } - ctx.setSamplingPriority(priority) + ctx.setSamplingPriority("", priority, samplernames.Upstream, math.NaN()) case originHeader: ctx.origin = v + case traceTagsHeader: + if ctx.trace == nil { + ctx.trace = newTrace() + } + ctx.trace.tags, err = parsePropagatableTraceTags(v) + ctx.trace.upstreamServices = ctx.trace.tags[keyUpstreamServices] + if err != nil { + log.Warn("did not extract trace tags (err: %s)", err.Error()) + } default: if strings.HasPrefix(key, p.cfg.BaggagePrefix) { ctx.setBaggageItem(strings.TrimPrefix(key, p.cfg.BaggagePrefix), v) @@ -353,7 +399,7 @@ func (*propagatorB3) extractTextMap(reader TextMapReader) (ddtrace.SpanContext, if err != nil { return ErrSpanContextCorrupted } - ctx.setSamplingPriority(priority) + ctx.setSamplingPriority("", priority, samplernames.Upstream, math.NaN()) default: } return nil diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index b011598a7f..1fb0fbd818 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -7,9 +7,11 @@ package tracer import ( "errors" + "fmt" "net/http" "os" "strconv" + "strings" "testing" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" @@ -173,6 +175,121 @@ func TestTextMapPropagatorOrigin(t *testing.T) { } } +func TestTextMapPropagatorTraceTagsWithPriority(t *testing.T) { + src := TextMapCarrier(map[string]string{ + DefaultPriorityHeader: "1", + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + traceTagsHeader: "hello=world,_dd.p.upstream_services=abc|1|2|3;def|4|5|6", + }) + tracer := newTracer() + ctx, err := tracer.Extract(src) + assert.Nil(t, err) + sctx, ok := ctx.(*spanContext) + assert.True(t, ok) + child := tracer.StartSpan("test", ChildOf(sctx)) + childSpanID := child.Context().(*spanContext).spanID + assert.Equal(t, map[string]string{ + "hello": "world", + "_dd.p.upstream_services": "abc|1|2|3;def|4|5|6", + }, sctx.trace.tags) + dst := map[string]string{} + err = tracer.Inject(child.Context(), TextMapCarrier(dst)) + assert.Nil(t, err) + assert.Len(t, dst, 4) + assert.Equal(t, strconv.Itoa(int(childSpanID)), dst["x-datadog-parent-id"]) + assert.Equal(t, "1", dst["x-datadog-trace-id"]) + assert.Equal(t, "1", dst["x-datadog-sampling-priority"]) + assertTraceTags(t, "_dd.p.upstream_services=abc|1|2|3;def|4|5|6", dst["x-datadog-tags"]) +} + +func TestTextMapPropagatorTraceTagsWithoutPriority(t *testing.T) { + src := TextMapCarrier(map[string]string{ + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + traceTagsHeader: "hello=world,_dd.p.upstream_services=abc|1|2|3;def|4|5|6", + }) + tracer := newTracer() + ctx, err := tracer.Extract(src) + assert.Nil(t, err) + sctx, ok := ctx.(*spanContext) + assert.True(t, ok) + child := tracer.StartSpan("test", ChildOf(sctx)) + childSpanID := child.Context().(*spanContext).spanID + assert.Equal(t, map[string]string{ + "hello": "world", + "_dd.p.upstream_services": "abc|1|2|3;def|4|5|6;dHJhY2VyLnRlc3Q|1|1|1.0000", + }, sctx.trace.tags) + dst := map[string]string{} + err = tracer.Inject(child.Context(), TextMapCarrier(dst)) + assert.Nil(t, err) + assert.Len(t, dst, 4) + assert.Equal(t, strconv.Itoa(int(childSpanID)), dst["x-datadog-parent-id"]) + assert.Equal(t, "1", dst["x-datadog-trace-id"]) + assert.Equal(t, "1", dst["x-datadog-sampling-priority"]) + assertTraceTags(t, "_dd.p.upstream_services=abc|1|2|3;def|4|5|6;dHJhY2VyLnRlc3Q|1|1|1.0000", dst["x-datadog-tags"]) +} + +func TestTextMapPropagatorInvalidTraceTagsHeader(t *testing.T) { + src := TextMapCarrier(map[string]string{ + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + traceTagsHeader: "hello=world,=", // invalid value + }) + tracer := newTracer() + ctx, err := tracer.Extract(src) + assert.Nil(t, err) + sctx, ok := ctx.(*spanContext) + assert.True(t, ok) + assert.Equal(t, map[string]string(nil), sctx.trace.tags) +} + +func TestTextMapPropagatorTraceTagsTooLong(t *testing.T) { + tags := make([]string, 0) + for i := 0; i < 100; i++ { + tags = append(tags, fmt.Sprintf("_dd.p.tag%d=value%d", i, i)) + } + traceTags := strings.Join(tags, ",") + src := TextMapCarrier(map[string]string{ + DefaultPriorityHeader: "1", + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + traceTagsHeader: traceTags, + }) + tracer := newTracer() + ctx, err := tracer.Extract(src) + assert.Nil(t, err) + sctx, ok := ctx.(*spanContext) + assert.True(t, ok) + child := tracer.StartSpan("test", ChildOf(sctx)) + childSpanID := child.Context().(*spanContext).spanID + assert.Equal(t, 100, len(sctx.trace.tags)) + dst := map[string]string{} + err = tracer.Inject(child.Context(), TextMapCarrier(dst)) + assert.Nil(t, err) + assert.Equal(t, map[string]string{ + "x-datadog-parent-id": strconv.Itoa(int(childSpanID)), + "x-datadog-trace-id": "1", + "x-datadog-sampling-priority": "1", + }, dst) +} + +func TestTextMapPropagatorInvalidTraceTags(t *testing.T) { + tracer := newTracer() + child := tracer.StartSpan("test") + child.Context().(*spanContext).trace.setTag("_dd.p.hello1", "world") // valid value + child.Context().(*spanContext).trace.setTag("_dd.p.hello2", "world,") // invalid value + childSpanID := child.Context().(*spanContext).spanID + dst := map[string]string{} + err := tracer.Inject(child.Context(), TextMapCarrier(dst)) + assert.Nil(t, err) + assert.Len(t, dst, 4) + assert.Equal(t, strconv.Itoa(int(childSpanID)), dst["x-datadog-parent-id"]) + assert.Equal(t, strconv.Itoa(int(childSpanID)), dst["x-datadog-trace-id"]) + assert.Equal(t, "1", dst["x-datadog-sampling-priority"]) + assertTraceTags(t, "_dd.p.upstream_services=dHJhY2VyLnRlc3Q|1|1|1.0000,_dd.p.hello1=world", dst["x-datadog-tags"]) +} + func TestTextMapPropagatorInjectExtract(t *testing.T) { propagator := NewPropagator(&PropagatorConfig{ BaggagePrefix: "bg-", @@ -342,3 +459,7 @@ func TestB3(t *testing.T) { assert.Equal(2, p) }) } + +func assertTraceTags(t *testing.T, expected, actual string) { + assert.ElementsMatch(t, strings.Split(expected, ","), strings.Split(actual, ",")) +} diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index eb63a2b310..c395da0b6f 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -240,6 +240,7 @@ func TestTracerStartSpan(t *testing.T) { ext.PriorityAutoReject, ext.PriorityAutoKeep, }, span.Metrics[keySamplingPriority]) + assert.Equal("dHJhY2VyLnRlc3Q|1|1|1.0000", span.context.trace.tags[keyUpstreamServices]) // A span is not measured unless made so specifically _, ok := span.Meta[keyMeasured] assert.False(ok) @@ -251,6 +252,7 @@ func TestTracerStartSpan(t *testing.T) { tracer := newTracer() span := tracer.StartSpan("web.request", Tag(ext.SamplingPriority, ext.PriorityUserKeep)).(*span) assert.Equal(t, float64(ext.PriorityUserKeep), span.Metrics[keySamplingPriority]) + assert.Equal(t, "dHJhY2VyLnRlc3Q|2|4|", span.context.trace.tags[keyUpstreamServices]) }) t.Run("name", func(t *testing.T) { @@ -287,6 +289,7 @@ func TestSamplingDecision(t *testing.T) { child.Finish() span.Finish() assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, "dGVzdF9zZXJ2aWNl|0|1|0.0000", span.context.trace.tags[keyUpstreamServices]) assert.Equal(t, decisionKeep, span.context.trace.samplingDecision) }) @@ -301,6 +304,7 @@ func TestSamplingDecision(t *testing.T) { child.Finish() span.Finish() assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, "dGVzdF9zZXJ2aWNl|0|1|0.0000", span.context.trace.tags[keyUpstreamServices]) assert.Equal(t, decisionNone, span.context.trace.samplingDecision) }) @@ -316,6 +320,7 @@ func TestSamplingDecision(t *testing.T) { child.Finish() span.Finish() assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, "dGVzdF9zZXJ2aWNl|0|1|0.0000", span.context.trace.tags[keyUpstreamServices]) assert.Equal(t, decisionKeep, span.context.trace.samplingDecision) }) @@ -332,6 +337,9 @@ func TestSamplingDecision(t *testing.T) { child.Finish() span.Finish() assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + // this trace won't be sent to the agent, + // therefore not necessary to populate keyUpstreamServices + assert.Equal(t, "", span.context.trace.tags[keyUpstreamServices]) assert.Equal(t, decisionDrop, span.context.trace.samplingDecision) }) } @@ -524,6 +532,7 @@ func TestTracerSamplingPriorityPropagation(t *testing.T) { root := tracer.StartSpan("web.request", Tag(ext.SamplingPriority, 2)).(*span) child := tracer.StartSpan("db.query", ChildOf(root.Context())).(*span) assert.EqualValues(2, root.Metrics[keySamplingPriority]) + assert.Equal("dHJhY2VyLnRlc3Q|2|4|", root.context.trace.tags[keyUpstreamServices]) assert.EqualValues(2, child.Metrics[keySamplingPriority]) assert.EqualValues(2., *root.context.trace.priority) assert.EqualValues(2., *child.context.trace.priority) @@ -536,9 +545,36 @@ func TestTracerSamplingPriorityEmptySpanCtx(t *testing.T) { spanCtx := &spanContext{ traceID: root.context.TraceID(), spanID: root.context.SpanID(), + trace: &trace{ + tags: map[string]string{ + keyUpstreamServices: "previous", + }, + upstreamServices: "previous", + }, } child := tracer.StartSpan("db.query", ChildOf(spanCtx)).(*span) assert.EqualValues(1, child.Metrics[keySamplingPriority]) + assert.Equal("previous;dHJhY2VyLnRlc3Q|1|1|1.0000", child.context.trace.tags[keyUpstreamServices]) +} + +func TestTracerDDUpstreamServicesManualKeep(t *testing.T) { + assert := assert.New(t) + tracer := newTracer() + root := newBasicSpan("web.request") + spanCtx := &spanContext{ + traceID: root.context.TraceID(), + spanID: root.context.SpanID(), + trace: &trace{ + tags: map[string]string{ + keyUpstreamServices: "previous", + }, + upstreamServices: "previous", + }, + } + child := tracer.StartSpan("db.query", ChildOf(spanCtx)).(*span) + grandChild := tracer.StartSpan("db.query", ChildOf(child.Context())).(*span) + grandChild.SetTag(ext.ManualKeep, true) + assert.Equal("previous;dHJhY2VyLnRlc3Q|2|4|", grandChild.context.trace.tags[keyUpstreamServices]) } func TestTracerBaggageImmutability(t *testing.T) { @@ -723,6 +759,7 @@ func TestTracerPrioritySampler(t *testing.T) { s := tr.newEnvSpan("pylons", "") assert.Equal(1., s.Metrics[keySamplingPriorityRate]) assert.Equal(1., s.Metrics[keySamplingPriority]) + assert.Equal("cHlsb25z|1|1|1.0000", s.context.trace.tags[keyUpstreamServices]) p, ok := s.context.samplingPriority() assert.True(ok) assert.EqualValues(p, s.Metrics[keySamplingPriority]) @@ -758,6 +795,7 @@ func TestTracerPrioritySampler(t *testing.T) { s := tr.newEnvSpan(tt.service, tt.env) assert.Equal(tt.rate, s.Metrics[keySamplingPriorityRate], strconv.Itoa(i)) prio, ok := s.Metrics[keySamplingPriority] + assert.Equal(b64Encode(tt.service)+"|"+strconv.Itoa(int(prio))+"|1|"+strconv.FormatFloat(tt.rate, 'f', 4, 64), s.context.trace.tags[keyUpstreamServices]) assert.True(ok) assert.Contains([]float64{0, 1}, prio) p, ok := s.context.samplingPriority() diff --git a/ddtrace/tracer/util.go b/ddtrace/tracer/util.go index 84da0a9efd..4a0326584c 100644 --- a/ddtrace/tracer/util.go +++ b/ddtrace/tracer/util.go @@ -6,8 +6,12 @@ package tracer import ( + "encoding/base64" + "fmt" "strconv" "strings" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) // toFloat64 attempts to convert value into a float64. If the value is an integer @@ -26,6 +30,8 @@ func toFloat64(value interface{}) (f float64, ok bool) { return i, true case int: return float64(i), true + case int8: + return float64(i), true case int16: return float64(i), true case int32: @@ -46,6 +52,8 @@ func toFloat64(value interface{}) (f float64, ok bool) { return 0, false } return float64(i), true + case samplernames.SamplerName: + return float64(i), true default: return 0, false } @@ -63,3 +71,59 @@ func parseUint64(str string) (uint64, error) { } return strconv.ParseUint(str, 10, 64) } + +func isValidPropagatableTraceTag(k, v string) error { + if len(k) == 0 { + return fmt.Errorf("key length must be greater than zero") + } + for _, ch := range k { + if ch < 32 || ch > 126 || ch == ' ' || ch == '=' || ch == ',' { + return fmt.Errorf("key contains an invalid character %d", ch) + } + } + if len(v) == 0 { + return fmt.Errorf("value length must be greater than zero") + } + for _, ch := range v { + if ch < 32 || ch > 126 || ch == '=' || ch == ',' { + return fmt.Errorf("value contains an invalid character %d", ch) + } + } + return nil +} + +func parsePropagatableTraceTags(s string) (map[string]string, error) { + if len(s) == 0 { + return nil, nil + } + tags := make(map[string]string) + searchingKey, start := true, 0 + var key string + for i, ch := range s { + switch ch { + case '=': + if !searchingKey || i-start == 0 { + return nil, fmt.Errorf("invalid format") + } + key = s[start:i] + searchingKey, start = false, i+1 + case ',': + if searchingKey || i-start == 0 { + return nil, fmt.Errorf("invalid format") + } + tags[key] = s[start:i] + searchingKey, start = true, i+1 + } + } + if searchingKey || len(s)-start == 0 { + return nil, fmt.Errorf("invalid format") + } + tags[key] = s[start:] + return tags, nil +} + +var b64 = base64.StdEncoding.WithPadding(base64.NoPadding) + +func b64Encode(s string) string { + return b64.EncodeToString([]byte(s)) +} diff --git a/ddtrace/tracer/util_test.go b/ddtrace/tracer/util_test.go index 917af9054f..b23daf4b26 100644 --- a/ddtrace/tracer/util_test.go +++ b/ddtrace/tracer/util_test.go @@ -70,3 +70,46 @@ func TestParseUint64(t *testing.T) { assert.Error(t, err) }) } + +func TestIsValidPropagatableTraceTag(t *testing.T) { + for i, tt := range [...]struct { + key string + value string + err error + }{ + {"hello", "world", nil}, + {"hello=", "world", fmt.Errorf("key contains an invalid character 61")}, + {"hello", "world=", fmt.Errorf("value contains an invalid character 61")}, + {"", "world", fmt.Errorf("key length must be greater than zero")}, + {"hello", "", fmt.Errorf("value length must be greater than zero")}, + {"こんにちは", "world", fmt.Errorf("key contains an invalid character 12371")}, + {"hello", "世界", fmt.Errorf("value contains an invalid character 19990")}, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + assert.Equal(t, tt.err, isValidPropagatableTraceTag(tt.key, tt.value)) + }) + } +} + +func TestParsePropagatableTraceTags(t *testing.T) { + for i, tt := range [...]struct { + input string + output map[string]string + err error + }{ + {"hello=world", map[string]string{"hello": "world"}, nil}, + {" hello = world ", map[string]string{" hello ": " world "}, nil}, + {"hello=world,service=account", map[string]string{"hello": "world", "service": "account"}, nil}, + {"hello", nil, fmt.Errorf("invalid format")}, + {"hello=world,service=", nil, fmt.Errorf("invalid format")}, + {"hello=world,", nil, fmt.Errorf("invalid format")}, + {"=world", nil, fmt.Errorf("invalid format")}, + {",hello=world", nil, fmt.Errorf("invalid format")}, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + output, err := parsePropagatableTraceTags(tt.input) + assert.Equal(t, tt.output, output) + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/internal/appsec/dyngo/instrumentation/grpcsec/tags.go b/internal/appsec/dyngo/instrumentation/grpcsec/tags.go index 9f36ce2fd1..b23436b146 100644 --- a/internal/appsec/dyngo/instrumentation/grpcsec/tags.go +++ b/internal/appsec/dyngo/instrumentation/grpcsec/tags.go @@ -14,6 +14,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/httpsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) // SetSecurityEventTags sets the AppSec-specific span tags when a security event @@ -53,7 +54,12 @@ func setEventSpanTags(span ddtrace.Span, events []json.RawMessage) error { } span.SetTag("_dd.appsec.json", string(val)) // Keep this span due to the security event - span.SetTag(ext.ManualKeep, true) + // + // This is a workaround to tell the tracer that the trace was kept by AppSec. + // Passing any other value than `appsec.SamplerAppSec` has no effect. + // Customers should use `span.SetTag(ext.ManualKeep, true)` pattern + // to keep the trace, manually. + span.SetTag(ext.ManualKeep, samplernames.AppSec) span.SetTag("_dd.origin", "appsec") // Set the appsec.event tag needed by the appsec backend span.SetTag("appsec.event", true) diff --git a/internal/appsec/dyngo/instrumentation/grpcsec/tags_test.go b/internal/appsec/dyngo/instrumentation/grpcsec/tags_test.go index ce6e79144e..5e0e21db69 100644 --- a/internal/appsec/dyngo/instrumentation/grpcsec/tags_test.go +++ b/internal/appsec/dyngo/instrumentation/grpcsec/tags_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) func TestSetSecurityEventTags(t *testing.T) { @@ -158,7 +160,13 @@ func (m *MockSpan) SetTag(key string, value interface{}) { if m.tags == nil { m.tags = make(map[string]interface{}) } - m.tags[key] = value + if key == ext.ManualKeep { + if value == samplernames.AppSec { + m.tags[ext.ManualKeep] = true + } + } else { + m.tags[key] = value + } } func (m *MockSpan) SetOperationName(operationName string) { diff --git a/internal/appsec/dyngo/instrumentation/httpsec/tags.go b/internal/appsec/dyngo/instrumentation/httpsec/tags.go index 77ae050586..3261db644c 100644 --- a/internal/appsec/dyngo/instrumentation/httpsec/tags.go +++ b/internal/appsec/dyngo/instrumentation/httpsec/tags.go @@ -13,6 +13,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" ) // SetAppSecTags sets the AppSec-specific span tags that are expected to be in @@ -37,7 +38,12 @@ func setEventSpanTags(span ddtrace.Span, events json.RawMessage) { } span.SetTag("_dd.appsec.json", string(event)) // Keep this span due to the security event - span.SetTag(ext.ManualKeep, true) + // + // This is a workaround to tell the tracer that the trace was kept by AppSec. + // Passing any other value than `appsec.SamplerAppSec` has no effect. + // Customers should use `span.SetTag(ext.ManualKeep, true)` pattern + // to keep the trace, manually. + span.SetTag(ext.ManualKeep, samplernames.AppSec) span.SetTag("_dd.origin", "appsec") // Set the appsec.event tag needed by the appsec backend span.SetTag("appsec.event", true) diff --git a/internal/env.go b/internal/env.go index 9d1064ad8e..b75569f81d 100644 --- a/internal/env.go +++ b/internal/env.go @@ -19,3 +19,13 @@ func BoolEnv(key string, def bool) bool { } return v } + +// IntEnv returns the parsed int value of an environment variable, or +// def otherwise. +func IntEnv(key string, def int) int { + v, err := strconv.Atoi(os.Getenv(key)) + if err != nil { + return def + } + return v +} diff --git a/internal/samplernames/samplernames.go b/internal/samplernames/samplernames.go new file mode 100644 index 0000000000..8c88fe4c39 --- /dev/null +++ b/internal/samplernames/samplernames.go @@ -0,0 +1,38 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package samplernames + +import "math" + +// SamplerName specifies the name of a sampler which was +// responsible for a certain sampling decision. +type SamplerName int8 + +const ( + // Upstream specifies that the sampling decision was made in an upstream service + // and no sampling needs to be done. + Upstream SamplerName = math.MinInt8 + // Unknown specifies that the span was sampled + // but, the tracer was unable to identify the sampler. + Unknown SamplerName = -1 + // Default specifies that the span was sampled without any sampler. + Default SamplerName = 0 + // AgentRate specifies that the span was sampled + // with a rate calculated by the trace agent. + AgentRate SamplerName = 1 + // RemoteRate specifies that the span was sampled + // with a dynamically calculated remote rate. + RemoteRate SamplerName = 2 + // RuleRate specifies that the span was sampled by the RuleSampler. + RuleRate SamplerName = 3 + // Manual specifies that the span was sampled manually by user. + Manual SamplerName = 4 + // AppSec specifies that the span was sampled by AppSec. + AppSec SamplerName = 5 + // RemoteUserRate specifies that the span was sampled + // with an user specified remote rate. + RemoteUserRate SamplerName = 6 +)