Skip to content

Commit

Permalink
addressed feedback around locking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Barbayar committed Dec 15, 2021
1 parent 38b12f7 commit 3573402
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 84 deletions.
7 changes: 0 additions & 7 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,6 @@ func WithLogStartup(enabled bool) StartOption {
}
}

// WithMaxTagsHeaderLen allows specifying the maximum length of trace tags header value.
func WithMaxTagsHeaderLen(len int) StartOption {
return func(c *config) {
c.maxTagsHeaderLen = len
}
}

// StartSpanOption is a configuration option for StartSpan. It is aliased in order
// to help godoc group all the functions returning it together. It is considered
// more correct to refer to it as the type as the origin, ddtrace.StartSpanOption.
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *span) setSamplingPriorityLocked(priority int, sampler samplerName, rate
return
}
s.setMetric(keySamplingPriority, float64(priority))
s.context.setSamplingPriority(s, priority, sampler, rate)
s.context.setSamplingPriority(s.Service, priority, sampler, rate)
}

// setTagError sets the error tag. It accounts for various valid scenarios.
Expand Down
72 changes: 38 additions & 34 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type spanContext struct {
baggage map[string]string
hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
origin string // e.g. "synthetics"
tags map[string]string
}

// newSpanContext creates a new SpanContext to serve as context for the given
Expand Down Expand Up @@ -67,14 +66,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
if context.trace.root == nil {
// first span in the trace can safely be assumed to be the root
context.trace.root = span
if parent != nil && parent.span == nil {
// the parent is remote, if tags received from the parent
// they will be put in the first span (the root) of the chunk
for k, v := range parent.tags {
span.setMeta(k, v)
}
context.trace.upstreamServices = span.Meta[keyUpstreamServices]
}
}
// put span in context's trace
context.trace.push(span)
Expand All @@ -101,11 +92,11 @@ func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
}
}

func (c *spanContext) setSamplingPriority(spn *span, p int, sampler samplerName, rate float64) {
func (c *spanContext) setSamplingPriority(service string, p int, sampler samplerName, rate float64) {
if c.trace == nil {
c.trace = newTrace()
}
c.trace.setSamplingPriority(spn, p, sampler, rate)
c.trace.setSamplingPriority(service, p, sampler, rate)
}

func (c *spanContext) samplingPriority() (p int, ok bool) {
Expand Down Expand Up @@ -154,14 +145,15 @@ const (
// priority, the root reference and a buffer of the spans which are part of the
// trace, if these exist.
type trace struct {
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
upstreamServices string // _dd.p.upstream_services value from the upstream service
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.
mu sync.RWMutex // guards below fields
spans []*span // all the spans that are part of this trace
tags map[string]string // trace level tags
upstreamServices string // _dd.p.upstream_services value from the upstream service
finished int // the number of finished spans
full bool // signifies that the span buffer is full
priority *float64 // sampling priority
locked bool // specifies if the sampling priority can be altered
samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent.

// root specifies the root of the trace, if known; it is nil when a span
// context is extracted from a carrier, at which point there are no spans in
Expand Down Expand Up @@ -202,16 +194,10 @@ func (t *trace) samplingPriority() (p int, ok bool) {
return t.samplingPriorityLocked()
}

func (t *trace) setSamplingPriority(spn *span, p int, sampler samplerName, rate float64) {
if len(t.spans) > 0 && t.spans[0] != spn {
// `t.setTag` sets tags in the first span until we adapt the new payload format
// ref: https://github.com/DataDog/datadog-agent/blob/ca5556d69ab720c9078fed0ed63e784c970fd732/pkg/trace/pb/tracer_payload.proto#L17
t.spans[0].Lock()
defer t.spans[0].Unlock()
}
func (t *trace) setSamplingPriority(service string, p int, sampler samplerName, rate float64) {
t.mu.Lock()
defer t.mu.Unlock()
t.setSamplingPriorityLocked(spn, p, sampler, rate)
t.setSamplingPriorityLocked(service, p, sampler, rate)
}

func (t *trace) keep() {
Expand All @@ -223,14 +209,13 @@ func (t *trace) drop() {
}

func (t *trace) setTag(key, value string) {
if len(t.spans) == 0 {
return
if t.tags == nil {
t.tags = make(map[string]string, 1)
}
// TODO: t.spans[0] needs to be locked, but if setSamplingPriority is coming from t.spans[0] then it shouldn't
t.spans[0].setMeta(key, value)
t.tags[key] = value
}

func (t *trace) setSamplingPriorityLocked(spn *span, p int, sampler samplerName, rate float64) {
func (t *trace) setSamplingPriorityLocked(service string, p int, sampler samplerName, rate float64) {
if t.locked {
return
}
Expand All @@ -244,7 +229,7 @@ func (t *trace) setSamplingPriorityLocked(spn *span, p int, sampler samplerName,
}
*t.priority = float64(p)
if sampler != samplerNone {
encodedService := b64Encode(spn.Service)
encodedService := b64Encode(service)
if len(t.upstreamServices) > 0 {
t.setTag(keyUpstreamServices, t.upstreamServices+";"+encodedService+"|"+strconv.Itoa(p)+"|"+strconv.Itoa(int(sampler))+"|"+strconv.FormatFloat(rate, 'f', 4, 64))
} else {
Expand Down Expand Up @@ -274,7 +259,7 @@ func (t *trace) push(sp *span) {
}
if v, ok := sp.Metrics[keySamplingPriority]; ok {
// TODO: this can be removed, it looks it's noop.
t.setSamplingPriorityLocked(sp, int(v), samplerNone, 0)
t.setSamplingPriorityLocked(sp.Service, int(v), samplerNone, 0)
}
t.spans = append(t.spans, sp)
if haveTracer {
Expand Down Expand Up @@ -324,5 +309,24 @@ func (t *trace) finishedOne(s *span) {
}
return
}
t.populateTraceTags(s)
tr.pushTrace(t.spans)
}

// populateTraceTags puts trace tags in Meta of the first span.
func (t *trace) populateTraceTags(s *span) {
if len(t.tags) == 0 {
return
}
if t.spans[0] != s {
t.spans[0].Lock()
defer t.spans[0].Unlock()
}
if t.spans[0].Meta == nil {
t.spans[0].Meta = t.tags
return
}
for k, v := range t.tags {
t.spans[0].Meta[k] = v
}
}
18 changes: 11 additions & 7 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr
}
// propagate trace tags
var sb strings.Builder
if ctx.trace != nil && len(ctx.trace.spans) > 0 {
ctx.trace.spans[0].RLock()
for k, v := range ctx.trace.spans[0].Meta {
if ctx.trace != nil {
ctx.trace.mu.RLock()
for k, v := range ctx.trace.tags {
if !strings.HasPrefix(k, "_dd.p.") {
continue
}
Expand All @@ -262,7 +262,7 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr
sb.WriteByte('=')
sb.WriteString(v)
}
ctx.trace.spans[0].RUnlock()
ctx.trace.mu.RUnlock()
if sb.Len() > 0 {
writer.Set(traceTagsHeader, sb.String())
}
Expand Down Expand Up @@ -300,11 +300,15 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
if err != nil {
return ErrSpanContextCorrupted
}
ctx.setSamplingPriority(nil, priority, samplerNone, 0)
ctx.setSamplingPriority("", priority, samplerNone, 0)
case originHeader:
ctx.origin = v
case traceTagsHeader:
ctx.tags, err = parsePropagatableTraceTags(v)
if ctx.trace == nil {
ctx.trace = newTrace()
}
ctx.trace.tags, err = parsePropagatableTraceTags(v)
ctx.trace.upstreamServices = ctx.trace.tags[keyUpstreamServices]
if err != nil {
log.Warn("did not extract trace tags (err: %s)", err.Error())
}
Expand Down Expand Up @@ -393,7 +397,7 @@ func (*propagatorB3) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
if err != nil {
return ErrSpanContextCorrupted
}
ctx.setSamplingPriority(nil, priority, samplerNone, 0)
ctx.setSamplingPriority("", priority, samplerNone, 0)
default:
}
return nil
Expand Down
38 changes: 34 additions & 4 deletions ddtrace/tracer/textmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ func TestTextMapPropagatorOrigin(t *testing.T) {
}
}

func TestTextMapPropagatorTraceTags(t *testing.T) {
func TestTextMapPropagatorTraceTagsWithPriority(t *testing.T) {
src := TextMapCarrier(map[string]string{
DefaultPriorityHeader: "1",
DefaultTraceIDHeader: "1",
DefaultParentIDHeader: "1",
traceTagsHeader: "hello=world,_dd.p.upstream_services=abc|1|2|3;def|4|5|6",
Expand All @@ -191,7 +192,35 @@ func TestTextMapPropagatorTraceTags(t *testing.T) {
assert.Equal(t, map[string]string{
"hello": "world",
"_dd.p.upstream_services": "abc|1|2|3;def|4|5|6",
}, sctx.tags)
}, sctx.trace.tags)
dst := map[string]string{}
err = tracer.Inject(child.Context(), TextMapCarrier(dst))
assert.Nil(t, err)
assert.Equal(t, map[string]string{
"x-datadog-parent-id": strconv.Itoa(int(childSpanID)),
"x-datadog-trace-id": "1",
"x-datadog-sampling-priority": "1",
"x-datadog-tags": "_dd.p.upstream_services=abc|1|2|3;def|4|5|6",
}, dst)
}

func TestTextMapPropagatorTraceTagsWithoutPriority(t *testing.T) {
src := TextMapCarrier(map[string]string{
DefaultTraceIDHeader: "1",
DefaultParentIDHeader: "1",
traceTagsHeader: "hello=world,_dd.p.upstream_services=abc|1|2|3;def|4|5|6",
})
tracer := newTracer()
ctx, err := tracer.Extract(src)
assert.Nil(t, err)
sctx, ok := ctx.(*spanContext)
assert.True(t, ok)
child := tracer.StartSpan("test", ChildOf(sctx))
childSpanID := child.Context().(*spanContext).spanID
assert.Equal(t, map[string]string{
"hello": "world",
"_dd.p.upstream_services": "abc|1|2|3;def|4|5|6;dHJhY2VyLnRlc3Q|1|1|1.0000",
}, sctx.trace.tags)
dst := map[string]string{}
err = tracer.Inject(child.Context(), TextMapCarrier(dst))
assert.Nil(t, err)
Expand All @@ -214,7 +243,7 @@ func TestTextMapPropagatorInvalidTraceTagsHeader(t *testing.T) {
assert.Nil(t, err)
sctx, ok := ctx.(*spanContext)
assert.True(t, ok)
assert.Equal(t, map[string]string(nil), sctx.tags)
assert.Equal(t, map[string]string(nil), sctx.trace.tags)
}

func TestTextMapPropagatorTraceTagsTooLong(t *testing.T) {
Expand All @@ -224,6 +253,7 @@ func TestTextMapPropagatorTraceTagsTooLong(t *testing.T) {
}
traceTags := strings.Join(tags, ",")
src := TextMapCarrier(map[string]string{
DefaultPriorityHeader: "1",
DefaultTraceIDHeader: "1",
DefaultParentIDHeader: "1",
traceTagsHeader: traceTags,
Expand All @@ -235,7 +265,7 @@ func TestTextMapPropagatorTraceTagsTooLong(t *testing.T) {
assert.True(t, ok)
child := tracer.StartSpan("test", ChildOf(sctx))
childSpanID := child.Context().(*spanContext).spanID
assert.Equal(t, 100, len(sctx.tags))
assert.Equal(t, 100, len(sctx.trace.tags))
dst := map[string]string{}
err = tracer.Inject(child.Context(), TextMapCarrier(dst))
assert.Nil(t, err)
Expand Down
Loading

0 comments on commit 3573402

Please sign in to comment.