Skip to content

Commit

Permalink
SpanExporter to not receive span if not sampled (#1070)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Sep 9, 2020
1 parent 6be4ae3 commit 4726bbf
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from enum import Enum

from opentelemetry.context import attach, detach, set_value
from opentelemetry.sdk.trace import sampling
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor
Expand Down Expand Up @@ -74,6 +75,8 @@ def on_start(self, span: Span) -> None:
pass

def on_end(self, span: Span) -> None:
if not span.context.trace_flags.sampled:
return
token = attach(set_value("suppress_instrumentation", True))
try:
self.span_exporter.export((span,))
Expand Down Expand Up @@ -156,6 +159,8 @@ def on_end(self, span: Span) -> None:
if self.done:
logger.warning("Already shutdown, dropping span.")
return
if not span.context.trace_flags.sampled:
return
if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
Expand Down
52 changes: 51 additions & 1 deletion opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,35 @@ def test_simple_span_processor_no_context(self):

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

def test_simple_span_processor_not_sampled(self):
tracer_provider = trace.TracerProvider(
sampler=trace.sampling.ALWAYS_OFF
)
tracer = tracer_provider.get_tracer(__name__)

spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.SimpleExportSpanProcessor(my_exporter)
tracer_provider.add_span_processor(span_processor)

with tracer.start_as_current_span("foo"):
with tracer.start_as_current_span("bar"):
with tracer.start_as_current_span("xxx"):
pass

self.assertListEqual([], spans_names_list)


def _create_start_and_end_span(name, span_processor):
span = trace.Span(
name,
mock.Mock(spec=trace_api.SpanContext),
trace_api.SpanContext(
0xDEADBEEF,
0xDEADBEEF,
is_remote=False,
trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED),
),
span_processor=span_processor,
)
span.start()
Expand Down Expand Up @@ -219,6 +243,7 @@ def test_batch_span_processor_lossless(self):
for _ in range(512):
_create_start_and_end_span("foo", span_processor)

time.sleep(1)
self.assertTrue(span_processor.force_flush())
self.assertEqual(len(spans_names_list), 512)
span_processor.shutdown()
Expand Down Expand Up @@ -247,6 +272,31 @@ def test_batch_span_processor_many_spans(self):
self.assertEqual(len(spans_names_list), 1024)
span_processor.shutdown()

def test_batch_span_processor_not_sampled(self):
tracer_provider = trace.TracerProvider(
sampler=trace.sampling.ALWAYS_OFF
)
tracer = tracer_provider.get_tracer(__name__)
spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, max_export_batch_size=128
)
span_processor = export.BatchExportSpanProcessor(
my_exporter,
max_queue_size=256,
max_export_batch_size=64,
schedule_delay_millis=100,
)
tracer_provider.add_span_processor(span_processor)
with tracer.start_as_current_span("foo"):
pass
time.sleep(0.05) # give some time for the exporter to upload spans

self.assertTrue(span_processor.force_flush())
self.assertEqual(len(spans_names_list), 0)
span_processor.shutdown()

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
spans_names_list = []
Expand Down

0 comments on commit 4726bbf

Please sign in to comment.