Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tailsamplingprocessor] Support external decision cache implementations #37035

Merged
27 changes: 27 additions & 0 deletions .chloggen/tsp-external-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for external caches when using the Tailsampling Processor in code.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37035]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

Expand Down
2 changes: 2 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,6 @@ type Config struct {
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
// DecisionCache holds configuration for the decision cache(s)
DecisionCache DecisionCacheConfig `mapstructure:"decision_cache"`
// Options allows for additional configuration of the tail-based sampling processor in code.
Options []Option `mapstructure:"-"`
}
14 changes: 7 additions & 7 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
Expand Down Expand Up @@ -88,7 +88,7 @@ type Option func(*tailSamplingSpanProcessor)

// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}

for _, opt := range opts {
for _, opt := range cfg.Options {
opt(tsp)
}

Expand Down Expand Up @@ -174,15 +174,15 @@ func withTickerFrequency(frequency time.Duration) Option {
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withSampledDecisionCache(c cache.Cache[bool]) Option {
// WithSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func WithSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.sampledIDCache = c
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withNonSampledDecisionCache(c cache.Cache[bool]) Option {
// WithNonSampledDecisionCache sets the cache which the processor uses to store recently non-sampled trace IDs.
func WithNonSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.nonSampledIDCache = c
}
Expand Down
118 changes: 77 additions & 41 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadatatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func TestSamplingPolicyTypicalPath(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -34,7 +30,15 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -65,10 +69,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
}

func TestSamplingPolicyInvertSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -79,7 +79,15 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -110,10 +118,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
}

func TestSamplingMultiplePolicies(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -126,7 +130,15 @@ func TestSamplingMultiplePolicies(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -161,10 +173,6 @@ func TestSamplingMultiplePolicies(t *testing.T) {
}

func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -175,7 +183,15 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -207,10 +223,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
}

func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -223,7 +235,15 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -258,10 +278,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
}

func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -274,7 +290,15 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -328,10 +352,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
}

func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
tel := metadatatest.SetupTelemetry()
idb := newSyncIDBatcher()
Expand All @@ -344,7 +364,17 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))

cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
WithSampledDecisionCache(c),
},
}
p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -401,10 +431,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
}

func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := metadatatest.SetupTelemetry()
ct := s.NewSettings()
Expand All @@ -418,7 +444,17 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c))

cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
WithNonSampledDecisionCache(c),
},
}
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
Loading
Loading