Skip to content

Consolidate Log Group and Log Stream Config [WIP] #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: genesis-dev-v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def _initialize_components():
# This is done before calling _import_exporters which would try to load exporters
is_emf_enabled = _check_emf_exporter_enabled()

# Set up inferred headers before importing exporters
_setup_inferred_headers()

trace_exporters, metric_exporters, log_exporters = _import_exporters(
_get_exporter_names("traces"),
_get_exporter_names("metrics"),
Expand Down Expand Up @@ -407,11 +410,17 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
_logger.info("Detected using AWS OTLP Logs Endpoint.")

if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers().is_valid:
headers_result = _validate_logs_headers()
if isinstance(log_exporter, OTLPLogExporter) and headers_result.is_valid:
# Get the headers from the OTLPLogExporter
headers = {}
if hasattr(log_exporter, "_headers") and log_exporter._headers:
headers.update(log_exporter._headers)

# Setting default compression mode to Gzip as this is the behavior in upstream's
# collector otlp http exporter:
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
return OTLPAwsLogExporter(endpoint=logs_endpoint)
return OTLPAwsLogExporter(endpoint=logs_endpoint, headers=headers)

_logger.warning(
"Improper configuration see: please export/set "
Expand Down Expand Up @@ -568,14 +577,65 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
return bool(re.match(pattern, otlp_endpoint.lower()))


def _setup_inferred_headers() -> None:
"""
Set up inferred OTLP logs headers from resource attributes if needed.
This must be called before exporters are imported.
"""
# Only infer if headers are not already set and agent observability is enabled
if not os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS) and is_agent_observability_enabled():
resource_attrs = _parse_resource_attributes()

# Extract log group and stream from resource attributes
log_group_names = resource_attrs.get("aws.log.group.names", "")
log_stream_names = resource_attrs.get("aws.log.stream.names", "")

if log_group_names and log_stream_names:
# Construct and set the headers environment variable
inferred_headers = []
inferred_headers.append(f"{AWS_OTLP_LOGS_GROUP_HEADER}={log_group_names}")
inferred_headers.append(f"{AWS_OTLP_LOGS_STREAM_HEADER}={log_stream_names}")

os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = ",".join(inferred_headers)
_logger.info(
"Set OTEL_EXPORTER_OTLP_LOGS_HEADERS from OTEL_RESOURCE_ATTRIBUTES: %s",
os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS],
)


def _parse_resource_attributes() -> Dict[str, str]:
"""
Parse OTEL_RESOURCE_ATTRIBUTES environment variable into a dictionary.

Returns:
Dictionary of resource attributes
"""
resource_attributes = {}
resource_attrs_str = os.environ.get("OTEL_RESOURCE_ATTRIBUTES", "")

if not resource_attrs_str:
return resource_attributes

for pair in resource_attrs_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
resource_attributes[key.strip()] = value.strip()

return resource_attributes


def _validate_logs_headers() -> OtlpLogHeaderSetting:
"""
Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
AWS OTLP Logs endpoint.

Note: Header inference from OTEL_RESOURCE_ATTRIBUTES is now handled by _setup_inferred_headers()
which runs before exporters are created.

Returns:
LogHeadersResult with log_group, log_stream, namespace and is_valid flag
"""
# Headers should already be set by _setup_inferred_headers if inference was needed
logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS)

log_group = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,115 @@ def customize_exporter_test(
for key in config.keys():
os.environ.pop(key, None)

def test_validate_logs_headers_infers_from_resource_attributes(self):
"""Test that _setup_inferred_headers can infer headers from OTEL_RESOURCE_ATTRIBUTES"""
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
_validate_logs_headers,
_parse_resource_attributes,
_setup_inferred_headers,
)

# Test _parse_resource_attributes function
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = (
"service.name=TestService,aws.log.group.names=/aws/genesis/TestAgent,aws.log.stream.names=test-stream"
)
attrs = _parse_resource_attributes()
self.assertEqual(attrs["service.name"], "TestService")
self.assertEqual(attrs["aws.log.group.names"], "/aws/genesis/TestAgent")
self.assertEqual(attrs["aws.log.stream.names"], "test-stream")

# Test inference when AGENT_OBSERVABILITY_ENABLED is true and OTEL_EXPORTER_OTLP_LOGS_HEADERS is not set
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = (
"service.name=TestService,aws.log.group.names=/aws/genesis/TestAgent,aws.log.stream.names=test-stream"
)
os.environ.pop("OTEL_EXPORTER_OTLP_LOGS_HEADERS", None)

# Call _setup_inferred_headers to set the environment variable
_setup_inferred_headers()

# Verify the environment variable was set correctly
self.assertEqual(
os.environ.get("OTEL_EXPORTER_OTLP_LOGS_HEADERS"),
"x-aws-log-group=/aws/genesis/TestAgent,x-aws-log-stream=test-stream",
)

# Now validate headers should find the inferred values
result = _validate_logs_headers()
self.assertTrue(result.is_valid)
self.assertEqual(result.log_group, "/aws/genesis/TestAgent")
self.assertEqual(result.log_stream, "test-stream")

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_RESOURCE_ATTRIBUTES", None)
os.environ.pop("OTEL_EXPORTER_OTLP_LOGS_HEADERS", None)

def test_validate_logs_headers_explicit_takes_priority(self):
"""Test that explicit OTEL_EXPORTER_OTLP_LOGS_HEADERS takes priority over inference"""
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
_validate_logs_headers,
_setup_inferred_headers,
)

# Set both explicit headers and resource attributes
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true"
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = (
"aws.log.group.names=/aws/genesis/InferredAgent,aws.log.stream.names=inferred-stream"
)
os.environ["OTEL_EXPORTER_OTLP_LOGS_HEADERS"] = (
"x-aws-log-group=/aws/genesis/ExplicitAgent,x-aws-log-stream=explicit-stream"
)

# Call _setup_inferred_headers - it should not override explicit headers
_setup_inferred_headers()

# Verify explicit headers were not overridden
self.assertEqual(
os.environ.get("OTEL_EXPORTER_OTLP_LOGS_HEADERS"),
"x-aws-log-group=/aws/genesis/ExplicitAgent,x-aws-log-stream=explicit-stream",
)

result = _validate_logs_headers()
self.assertTrue(result.is_valid)
# Should use explicit headers, not inferred ones
self.assertEqual(result.log_group, "/aws/genesis/ExplicitAgent")
self.assertEqual(result.log_stream, "explicit-stream")

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_RESOURCE_ATTRIBUTES", None)
os.environ.pop("OTEL_EXPORTER_OTLP_LOGS_HEADERS", None)

def test_validate_logs_headers_no_inference_when_agent_observability_disabled(self):
"""Test that no inference happens when AGENT_OBSERVABILITY_ENABLED is false"""
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
_validate_logs_headers,
_setup_inferred_headers,
)

# Set resource attributes but keep agent observability disabled
os.environ["AGENT_OBSERVABILITY_ENABLED"] = "false"
os.environ["OTEL_RESOURCE_ATTRIBUTES"] = (
"aws.log.group.names=/aws/genesis/TestAgent,aws.log.stream.names=test-stream"
)
os.environ.pop("OTEL_EXPORTER_OTLP_LOGS_HEADERS", None)

# Call _setup_inferred_headers - it should not set headers when agent observability is disabled
_setup_inferred_headers()

# Verify no headers were set
self.assertIsNone(os.environ.get("OTEL_EXPORTER_OTLP_LOGS_HEADERS"))

result = _validate_logs_headers()
self.assertFalse(result.is_valid)
self.assertIsNone(result.log_group)
self.assertIsNone(result.log_stream)

# Clean up
os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None)
os.environ.pop("OTEL_RESOURCE_ATTRIBUTES", None)


def validate_distro_environ():
tc: TestCase = TestCase()
Expand Down