diff --git a/processor/samplingprocessor/probabilisticsamplerprocessor/README.md b/processor/samplingprocessor/probabilisticsamplerprocessor/README.md index 31e593a1fb7..7ea2f37be06 100644 --- a/processor/samplingprocessor/probabilisticsamplerprocessor/README.md +++ b/processor/samplingprocessor/probabilisticsamplerprocessor/README.md @@ -20,6 +20,10 @@ The following configuration options can be modified: - `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. - `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces +The sampled spans have [`sampling.probability`](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk.md#sampling) +attribute added, which includes the value in range of `(0, 1.0]` representing the probability with which the record +was sampled. If the span was already sampled before and the attribute is present, the existing value is multiplied. + Examples: ```yaml diff --git a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler.go b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler.go index 9dd551b85b7..8459b3691f9 100644 --- a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler.go +++ b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" ) // samplingPriority has the semantic result of parsing the "sampling.priority" @@ -49,9 +50,10 @@ const ( ) type tracesamplerprocessor struct { - nextConsumer consumer.TracesConsumer - scaledSamplingRate uint32 - hashSeed uint32 + nextConsumer consumer.TracesConsumer + scaledSamplingRate uint32 + samplingProbability float64 + hashSeed uint32 } // newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given @@ -64,8 +66,9 @@ func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (compon return &tracesamplerprocessor{ nextConsumer: nextConsumer, // Adjust sampling percentage on private so recalculations are avoided. - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, + scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), + samplingProbability: float64(cfg.SamplingPercentage) * 0.01, + hashSeed: cfg.HashSeed, }, nil } @@ -82,6 +85,16 @@ func (tsp *tracesamplerprocessor) ConsumeTraces(ctx context.Context, td pdata.Tr return tsp.nextConsumer.ConsumeTraces(ctx, sampledTraceData) } +func (tsp *tracesamplerprocessor) updateSamplingProbability(sampledSpanAttributes pdata.AttributeMap) { + samplingProbability := tsp.samplingProbability + attr, found := sampledSpanAttributes.Get(conventions.AttributeSamplingProbability) + if found && attr.Type() == pdata.AttributeValueDOUBLE { + samplingProbability *= attr.DoubleVal() + } + + sampledSpanAttributes.UpsertDouble(conventions.AttributeSamplingProbability, samplingProbability) +} + func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpans, sampledTraceData pdata.Traces) { scaledSamplingRate := tsp.scaledSamplingRate @@ -115,6 +128,7 @@ func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpan hash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < scaledSamplingRate if sampled { + tsp.updateSamplingProbability(span.Attributes()) spns.Append(span) } } diff --git a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go index a898f1c5566..0aa440d792c 100644 --- a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go +++ b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) @@ -71,6 +72,7 @@ func TestNewTraceProcessor(t *testing.T) { if !tt.wantErr { // The truncation below with uint32 cannot be defined at initialization (compiler error), performing it at runtime. tt.want.(*tracesamplerprocessor).scaledSamplingRate = uint32(tt.cfg.SamplingPercentage * percentageScaleFactor) + tt.want.(*tracesamplerprocessor).samplingProbability = float64(tt.cfg.SamplingPercentage) * 0.01 } got, err := newTraceProcessor(tt.nextConsumer, tt.cfg) if (err != nil) != tt.wantErr { @@ -227,15 +229,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t // Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct. func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { - singleSpanWithAttrib := func(key string, attribValue pdata.AttributeValue) pdata.Traces { - traces := pdata.NewTraces() - traces.ResourceSpans().Resize(1) - rs := traces.ResourceSpans().At(0) - rs.InstrumentationLibrarySpans().Resize(1) - instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0) - instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue)) - return traces - } + tests := []struct { name string cfg Config @@ -247,7 +241,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 0.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueInt(2)), sampled: true, @@ -257,7 +251,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 0.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueDouble(1)), sampled: true, @@ -267,7 +261,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 0.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueString("1")), sampled: true, @@ -277,7 +271,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 100.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueInt(0)), }, @@ -286,7 +280,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 100.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueDouble(0)), }, @@ -295,7 +289,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 100.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "sampling.priority", pdata.NewAttributeValueString("0")), }, @@ -304,7 +298,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 0.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "no.sampling.priority", pdata.NewAttributeValueInt(2)), }, @@ -313,7 +307,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { cfg: Config{ SamplingPercentage: 100.0, }, - td: singleSpanWithAttrib( + td: getTracesWithSpanWithAttribute( "no.sampling.priority", pdata.NewAttributeValueInt(2)), sampled: true, @@ -416,6 +410,59 @@ func Test_parseSpanSamplingPriority(t *testing.T) { } } +// Test_tracesamplerprocessor_SamplingProbabilityAttribute verifies if the attribute describing current sampling rate is included in sampled spans +func Test_tracesamplerprocessor_SamplingProbabilityAttribute(t *testing.T) { + cfg := Config{ + SamplingPercentage: 100.0, + } + + tests := []struct { + name string + traces pdata.Traces + wantSamplingProbabilityAttribute pdata.AttributeValue + }{ + { + name: "simple_span", + traces: getTracesWithSpanWithAttribute("foo", pdata.NewAttributeValueString("bar")), + wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0), + }, + { + name: "span_came_through_sampler_already", + traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueDouble(0.01)), + wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(0.01), + }, + { + name: "simple_with_invalid_attribute_value", + traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueString("bar")), + wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + tsp, err := newTraceProcessor(sink, cfg) + if err != nil { + t.Errorf("error when creating tracesamplerprocessor: %v", err) + return + } + + if err := tsp.ConsumeTraces(context.Background(), tt.traces); err != nil { + t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err) + return + } + assert.Equal(t, 1, sink.SpansCount()) + for _, td := range sink.AllTraces() { + span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) + attrValue, found := span.Attributes().Get(conventions.AttributeSamplingProbability) + assert.True(t, found, "Sampling probability attribute not found") + assert.Equal(t, tt.wantSamplingProbabilityAttribute, attrValue) + } + sink.Reset() + }) + } +} + func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span { span := pdata.NewSpan() span.InitEmpty() @@ -424,6 +471,16 @@ func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span { return span } +func getTracesWithSpanWithAttribute(key string, attribValue pdata.AttributeValue) pdata.Traces { + traces := pdata.NewTraces() + traces.ResourceSpans().Resize(1) + rs := traces.ResourceSpans().At(0) + rs.InstrumentationLibrarySpans().Resize(1) + instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0) + instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue)) + return traces +} + // Test_hash ensures that the hash function supports different key lengths even if in // practice it is only expected to receive keys with length 16 (trace id length in OC proto). func Test_hash(t *testing.T) { diff --git a/translator/conventions/opentelemetry.go b/translator/conventions/opentelemetry.go index 3f3b81cdf55..ac01c889709 100644 --- a/translator/conventions/opentelemetry.go +++ b/translator/conventions/opentelemetry.go @@ -61,6 +61,7 @@ const ( AttributeProcessExecutablePath = "process.executable.path" AttributeProcessID = "process.pid" AttributeProcessOwner = "process.owner" + AttributeSamplingProbability = "sampling.probability" AttributeServiceInstance = "service.instance.id" AttributeServiceName = "service.name" AttributeServiceNamespace = "service.namespace"