Skip to content

Commit

Permalink
Adds duration sampler distinct from latency in supplying two bounds
Browse files Browse the repository at this point in the history
  • Loading branch information
garry-cairns committed Aug 26, 2023
1 parent 642e7f7 commit 835e985
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
54 changes: 54 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type duration struct {
logger *zap.Logger
lowerThresholdMs int64
upperThresholdMs int64
}

var _ PolicyEvaluator = (*duration)(nil)

// NewDuration creates a policy evaluator sampling traces with a duration higher than a configured threshold
func NewDuration(settings component.TelemetrySettings, lowerThresholdMs int64, upperThresholdMs int64) PolicyEvaluator {
return &duration{
logger: settings.Logger,
lowerThresholdMs: lowerThresholdMs,
upperThresholdMs: upperThresholdMs,
}
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (l *duration) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *TraceData) (Decision, error) {
l.logger.Debug("Evaluating spans in duration filter")

traceData.Lock()
defer traceData.Unlock()
batches := traceData.ReceivedBatches

var minTime pcommon.Timestamp
var maxTime pcommon.Timestamp

return hasSpanWithCondition(batches, func(span ptrace.Span) bool {
if minTime == 0 || span.StartTimestamp() < minTime {
minTime = span.StartTimestamp()
}
if maxTime == 0 || span.EndTimestamp() > maxTime {
maxTime = span.EndTimestamp()
}

duration := maxTime.AsTime().Sub(minTime.AsTime())
return (l.lowerThresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs)
}), nil
}
125 changes: 125 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/duration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sampling

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestEvaluate_Duration(t *testing.T) {
filter := NewDuration(componenttest.NewNopTelemetrySettings(), 5000, 10000)

traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
now := time.Now()

cases := []struct {
Desc string
Spans []spanWithTimeAndBoundedDuration
Decision Decision
}{
{
"trace duration shorter than lower bound",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 4500 * time.Millisecond,
},
},
NotSampled,
},
{
"trace duration is equal to lower bound",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 5000 * time.Millisecond,
},
},
NotSampled,
},
{
"trace duration is within lower and upper bounds",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 5001 * time.Millisecond,
},
},
Sampled,
},
{
"trace duration is above upper bound",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 10001 * time.Millisecond,
},
},
NotSampled,
},
{
"trace duration equals upper bound",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 10000 * time.Millisecond,
},
},
Sampled,
},
{
"total trace duration is longer than threshold but every single span is shorter",
[]spanWithTimeAndBoundedDuration{
{
StartTime: now,
Duration: 3000 * time.Millisecond,
},
{
StartTime: now.Add(2500 * time.Millisecond),
Duration: 3000 * time.Millisecond,
},
},
Sampled,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
decision, err := filter.Evaluate(context.Background(), traceID, newTraceWithBoundedSpans(c.Spans))

assert.NoError(t, err)
assert.Equal(t, decision, c.Decision)
})
}
}

type spanWithTimeAndBoundedDuration struct {
StartTime time.Time
Duration time.Duration
}

func newTraceWithBoundedSpans(spans []spanWithTimeAndBoundedDuration) *TraceData {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()

for _, s := range spans {
span := ils.Spans().AppendEmpty()
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
span.SetStartTimestamp(pcommon.NewTimestampFromTime(s.StartTime))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(s.StartTime.Add(s.Duration)))
}

return &TraceData{
ReceivedBatches: traces,
}
}

0 comments on commit 835e985

Please sign in to comment.