Skip to content

Configure Unsampled Span Pipeline for Genesis #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
)


def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvider, resource: Resource = None):
if not is_agent_observability_enabled():
return
Comment on lines +269 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make general as to _export_unsampled_spans, and have this check only needed where this function is called. Pass in spen_exporter if it is custom?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this refactor will be a bit risky given tight timeline. The logic in _export_unsampled_span_for_lambda doesn't seem straighforward to combine together with _export_unsampled_span_for_agent_observability.

Agree it would be good to combine both into _export_unsampled_spans, but maybe later on when there is more bandwidth to carefully test.

What do you think?


traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)

span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())

trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))


def _is_defer_to_workers_enabled():
return os.environ.get(OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG, "false").strip().lower() == "true"

Expand Down Expand Up @@ -408,9 +419,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
if _is_lambda_environment():
provider.add_span_processor(AwsLambdaSpanProcessor())

# We always send 100% spans to Genesis platform for agent observability because
# AI applications typically have low throughput traffic patterns and require
# comprehensive monitoring to catch subtle failure modes like hallucinations
# and quality degradation that sampling could miss.
# Add session.id baggage attribute to span attributes to support AI Agent use cases
# enabling session ID tracking in spans.
if is_agent_observability_enabled():
_export_unsampled_span_for_agent_observability(provider, resource)

def session_id_predicate(baggage_key: str) -> bool:
return baggage_key == "session.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
_customize_sampler,
_customize_span_exporter,
_customize_span_processors,
_export_unsampled_span_for_agent_observability,
_export_unsampled_span_for_lambda,
_init_logging,
_is_application_signals_enabled,
Expand Down Expand Up @@ -795,6 +796,81 @@ def test_export_unsampled_span_for_lambda(self):
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

# pylint: disable=no-self-use
def test_export_unsampled_span_for_agent_observability(self):
mock_tracer_provider: TracerProvider = MagicMock()

# Test when agent observability is disabled (default)
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

# Test when agent observability is enabled with AWS endpoint (the default case)
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"
_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 1)
processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(processor, BatchUnsampledSpanProcessor)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)

# pylint: disable=no-self-use
def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self):
"""Test that OTLPAwsSpanExporter is used for AWS endpoints"""
mock_tracer_provider: TracerProvider = MagicMock()

with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter"
) as mock_aws_exporter:
with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor"
) as mock_processor:
with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider"
) as mock_logger_provider:
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces"

_export_unsampled_span_for_agent_observability(mock_tracer_provider, Resource.get_empty())

# Verify OTLPAwsSpanExporter is created with correct parameters
mock_aws_exporter.assert_called_once_with(
endpoint="https://xray.us-east-1.amazonaws.com/v1/traces",
logger_provider=mock_logger_provider.return_value,
)
# Verify BatchUnsampledSpanProcessor wraps the exporter
mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value)
# Verify processor is added to tracer provider
mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)

# pylint: disable=no-self-use
def test_customize_span_processors_calls_export_unsampled_span(self):
"""Test that _customize_span_processors calls _export_unsampled_span_for_agent_observability"""
mock_tracer_provider: TracerProvider = MagicMock()

with patch(
"amazon.opentelemetry.distro.aws_opentelemetry_configurator._export_unsampled_span_for_agent_observability"
) as mock_agent_observability:
# Test that agent observability function is NOT called when disabled
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
mock_agent_observability.assert_not_called()

# Test that agent observability function is called when enabled
mock_agent_observability.reset_mock()
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
mock_agent_observability.assert_called_once_with(mock_tracer_provider, Resource.get_empty())

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)

def test_customize_metric_exporter(self):
metric_readers = []
views = []
Expand Down
Loading