Skip to content

Commit

Permalink
ddtrace/tracer: propagate _dd.p.upstream_services tags (#1082)
Browse files Browse the repository at this point in the history
This commit introduces the concept of trace-level tags, propagated
trace-level tags, and a new conventional trace-level tag
_dd.p.upstream_services that holds the services changed sampling decisions.
  • Loading branch information
Barbayar authored Jan 24, 2022
1 parent 10f0512 commit e52ef96
Show file tree
Hide file tree
Showing 17 changed files with 523 additions and 31 deletions.
1 change: 1 addition & 0 deletions ddtrace/ext/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
TargetPort = "out.port"

// SamplingPriority is the tag that marks the sampling priority of a span.
// Deprecated in favor of ManualKeep and ManualDrop.
SamplingPriority = "sampling.priority"

// SQLType sets the sql type tag.
Expand Down
7 changes: 6 additions & 1 deletion ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ var (
// defaultSocketDSD specifies the socket path to use for connecting to the statsd server.
// Replaced in tests
defaultSocketDSD = "/var/run/datadog/dsd.socket"

// defaultMaxTagsHeaderLen specifies the default maximum length of the X-Datadog-Tags header value.
defaultMaxTagsHeaderLen = 512
)

// config holds the tracer configuration.
Expand Down Expand Up @@ -256,7 +259,9 @@ func newConfig(opts ...StartOption) *config {
c.transport = newHTTPTransport(c.agentAddr, c.httpClient)
}
if c.propagator == nil {
c.propagator = NewPropagator(nil)
c.propagator = NewPropagator(&PropagatorConfig{
MaxTagsHeaderLen: internal.IntEnv("DD_TRACE_TAGS_PROPAGATION_MAX_LENGTH", defaultMaxTagsHeaderLen),
})
}
if c.logger != nil {
log.UseLogger(c.logger)
Expand Down
11 changes: 6 additions & 5 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -150,9 +151,9 @@ func (ps *prioritySampler) getRate(spn *span) float64 {
func (ps *prioritySampler) apply(spn *span) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoKeep)
spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate, rate)
} else {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoReject)
spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate, rate)
}
spn.SetTag(keySamplingPriorityRate, rate)
}
Expand Down Expand Up @@ -311,15 +312,15 @@ func (rs *rulesSampler) apply(span *span) bool {
func (rs *rulesSampler) applyRate(span *span, rate float64, now time.Time) {
span.SetTag(keyRulesSamplerAppliedRate, rate)
if !sampledByRate(span.TraceID, rate) {
span.SetTag(ext.SamplingPriority, ext.PriorityUserReject)
span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate)
return
}

sampled, rate := rs.limiter.allowOne(now)
if sampled {
span.SetTag(ext.SamplingPriority, ext.PriorityUserKeep)
span.setSamplingPriority(ext.PriorityUserKeep, samplernames.RuleRate, rate)
} else {
span.SetTag(ext.SamplingPriority, ext.PriorityUserReject)
span.setSamplingPriority(ext.PriorityUserReject, samplernames.RuleRate, rate)
}
span.SetTag(keyRulesSamplerLimiterRate, rate)
}
Expand Down
38 changes: 33 additions & 5 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package tracer
import (
"context"
"fmt"
"math"
"os"
"reflect"
"runtime"
Expand All @@ -25,6 +26,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

"github.com/DataDog/datadog-agent/pkg/obfuscate"
"github.com/tinylib/msgp/msgp"
Expand Down Expand Up @@ -148,6 +150,27 @@ func (s *span) SetTag(key string, value interface{}) {
s.setMeta(key, fmt.Sprint(value))
}

// setSamplingPriority locks then span, then updates the sampling priority.
// It also updates the trace's sampling priority.
func (s *span) setSamplingPriority(priority int, sampler samplernames.SamplerName, rate float64) {
s.Lock()
defer s.Unlock()
s.setSamplingPriorityLocked(priority, sampler, rate)
}

// setSamplingPriorityLocked updates the sampling priority.
// It also updates the trace's sampling priority.
func (s *span) setSamplingPriorityLocked(priority int, sampler samplernames.SamplerName, rate float64) {
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
return
}
s.setMetric(keySamplingPriority, float64(priority))
s.context.setSamplingPriority(s.Service, priority, sampler, rate)
}

// setTagError sets the error tag. It accounts for various valid scenarios.
// This method is not safe for concurrent use.
func (s *span) setTagError(value interface{}, cfg errorConfig) {
Expand Down Expand Up @@ -266,11 +289,11 @@ func (s *span) setTagBool(key string, v bool) {
}
case ext.ManualDrop:
if v {
s.setMetric(ext.SamplingPriority, ext.PriorityUserReject)
s.setSamplingPriorityLocked(ext.PriorityUserReject, samplernames.Manual, math.NaN())
}
case ext.ManualKeep:
if v {
s.setMetric(ext.SamplingPriority, ext.PriorityUserKeep)
s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.Manual, math.NaN())
}
default:
if v {
Expand All @@ -289,10 +312,14 @@ func (s *span) setMetric(key string, v float64) {
}
delete(s.Meta, key)
switch key {
case ext.ManualKeep:
if v == float64(samplernames.AppSec) {
s.setSamplingPriorityLocked(ext.PriorityUserKeep, samplernames.AppSec, math.NaN())
}
case ext.SamplingPriority:
// setting sampling priority per spec
s.Metrics[keySamplingPriority] = v
s.context.setSamplingPriority(int(v))
// ext.SamplingPriority is deprecated in favor of ext.ManualKeep and ext.ManualDrop.
// We have it here for backward compatibility.
s.setSamplingPriorityLocked(int(v), samplernames.Manual, math.NaN())
default:
s.Metrics[key] = v
}
Expand Down Expand Up @@ -525,6 +552,7 @@ func (s *span) Format(f fmt.State, c rune) {
const (
keySamplingPriority = "_sampling_priority_v1"
keySamplingPriorityRate = "_dd.agent_psr"
keyUpstreamServices = "_dd.p.upstream_services"
keyOrigin = "_dd.origin"
keyHostname = "_dd.hostname"
keyRulesSamplerAppliedRate = "_dd.rule_psr"
Expand Down
4 changes: 3 additions & 1 deletion ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,9 @@ func TestSpanError(t *testing.T) {
span.Finish()
span.SetTag(ext.Error, err)
assert.Equal(int32(0), span.Error)
assert.Equal(nMeta, len(span.Meta))
// '+1' is `_dd.p.upstream_services`,
// because we add it into Meta of the first span, when root is finished.
assert.Equal(nMeta+1, len(span.Meta))
assert.Equal("", span.Meta["error.msg"])
assert.Equal("", span.Meta["error.type"])
assert.Equal("", span.Meta["error.stack"])
Expand Down
68 changes: 54 additions & 14 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
package tracer

import (
"math"
"strconv"
"sync"
"sync/atomic"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
)

var _ ddtrace.SpanContext = (*spanContext)(nil)
Expand Down Expand Up @@ -91,11 +94,11 @@ func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
}
}

func (c *spanContext) setSamplingPriority(p int) {
func (c *spanContext) setSamplingPriority(service string, p int, sampler samplernames.SamplerName, rate float64) {
if c.trace == nil {
c.trace = newTrace()
}
c.trace.setSamplingPriority(float64(p))
c.trace.setSamplingPriority(service, p, sampler, rate)
}

func (c *spanContext) samplingPriority() (p int, ok bool) {
Expand Down Expand Up @@ -144,13 +147,15 @@ const (
// priority, the root reference and a buffer of the spans which are part of the
// trace, if these exist.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
tags map[string]string // trace level tags
upstreamServices string // _dd.p.upstream_services value from the upstream service
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -191,10 +196,10 @@ func (t *trace) samplingPriority() (p int, ok bool) {
return t.samplingPriorityLocked()
}

func (t *trace) setSamplingPriority(p float64) {
func (t *trace) setSamplingPriority(service string, p int, sampler samplernames.SamplerName, rate float64) {
t.mu.Lock()
defer t.mu.Unlock()
t.setSamplingPriorityLocked(p)
t.setSamplingPriorityLocked(service, p, sampler, rate)
}

func (t *trace) keep() {
Expand All @@ -205,7 +210,14 @@ func (t *trace) drop() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop))
}

func (t *trace) setSamplingPriorityLocked(p float64) {
func (t *trace) setTag(key, value string) {
if t.tags == nil {
t.tags = make(map[string]string, 1)
}
t.tags[key] = value
}

func (t *trace) setSamplingPriorityLocked(service string, p int, sampler samplernames.SamplerName, rate float64) {
if t.locked {
return
}
Expand All @@ -217,7 +229,14 @@ func (t *trace) setSamplingPriorityLocked(p float64) {
if t.priority == nil {
t.priority = new(float64)
}
*t.priority = p
*t.priority = float64(p)
if sampler != samplernames.Upstream {
if t.upstreamServices != "" {
t.setTag(keyUpstreamServices, t.upstreamServices+";"+compactUpstreamServices(service, p, sampler, rate))
} else {
t.setTag(keyUpstreamServices, compactUpstreamServices(service, p, sampler, rate))
}
}
}

// push pushes a new span into the trace. If the buffer is full, it returns
Expand All @@ -240,7 +259,7 @@ func (t *trace) push(sp *span) {
return
}
if v, ok := sp.Metrics[keySamplingPriority]; ok {
t.setSamplingPriorityLocked(v)
t.setSamplingPriorityLocked(sp.Service, int(v), samplernames.Upstream, math.NaN())
}
t.spans = append(t.spans, sp)
if haveTracer {
Expand Down Expand Up @@ -269,6 +288,16 @@ func (t *trace) finishedOne(s *span) {
t.root.setMetric(keySamplingPriority, *t.priority)
t.locked = true
}
if len(t.spans) > 0 && s == t.spans[0] {
// first span in chunk finished, lock down the tags
//
// TODO(barbayar): make sure this doesn't happen in vain when switching to
// the new wire format. We won't need to set the tags on the first span
// in the chunk there.
for k, v := range t.tags {
s.setMeta(k, v)
}
}
if len(t.spans) != t.finished {
return
}
Expand All @@ -292,3 +321,14 @@ func (t *trace) finishedOne(s *span) {
}
tr.pushTrace(t.spans)
}

func compactUpstreamServices(service string, priority int, sampler samplernames.SamplerName, rate float64) string {
sb64 := b64Encode(service)
p := strconv.Itoa(priority)
s := strconv.Itoa(int(sampler))
r := ""
if !math.IsNaN(rate) {
r = strconv.FormatFloat(rate, 'f', 4, 64)
}
return sb64 + "|" + p + "|" + s + "|" + r
}
35 changes: 35 additions & 0 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package tracer

import (
"context"
"math"
"strings"
"sync"
"testing"
Expand All @@ -16,6 +17,7 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
)

func setupteardown(start, max int) func() {
Expand Down Expand Up @@ -75,6 +77,21 @@ func TestAsyncSpanRace(t *testing.T) {
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-done:
root.Finish()
for i := 0; i < 500; i++ {
for range root.(*span).Meta {
// this range simulates iterating over the meta map
// as we do when encoding msgpack upon flushing.
}
}
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
Expand Down Expand Up @@ -405,6 +422,24 @@ func TestSpanContextIteratorBreak(t *testing.T) {
assert.Len(t, got, 0)
}

func TestBuildNewUpstreamServices(t *testing.T) {
var testCases = []struct {
service string
priority int
sampler samplernames.SamplerName
rate float64
expected string
}{
{"service-account", 1, samplernames.AgentRate, 0.99, "c2VydmljZS1hY2NvdW50|1|1|0.9900"},
{"service-storage", 2, samplernames.Manual, math.NaN(), "c2VydmljZS1zdG9yYWdl|2|4|"},
{"service-video", 1, samplernames.RuleRate, 1, "c2VydmljZS12aWRlbw|1|3|1.0000"},
}

for _, tt := range testCases {
assert.Equal(t, tt.expected, compactUpstreamServices(tt.service, tt.priority, tt.sampler, tt.rate))
}
}

// testLogger implements a mock Printer.
type testLogger struct {
mu sync.RWMutex
Expand Down
Loading

0 comments on commit e52ef96

Please sign in to comment.