Skip to content

chore: refactor SpanAggregator <> TraceWriter interface #13894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 3 additions & 76 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
from collections import defaultdict
from itertools import chain
from os import environ
from threading import RLock
from typing import Any
from typing import Dict
Expand All @@ -22,25 +21,15 @@
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
from ddtrace.internal.constants import MAX_UINT_64BITS
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import get_span_sampling_rules
from ddtrace.internal.sampling import is_single_span_sampled
from ddtrace.internal.serverless import has_aws_lambda_agent_extension
from ddtrace.internal.serverless import in_aws_lambda
from ddtrace.internal.serverless import in_azure_function
from ddtrace.internal.serverless import in_gcp_function
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
from ddtrace.internal.utils.http import verify_url
from ddtrace.internal.writer import AgentResponse
from ddtrace.internal.writer import AgentWriter
from ddtrace.internal.writer import AgentWriterInterface
from ddtrace.internal.writer import LogWriter
from ddtrace.internal.writer import TraceWriter
from ddtrace.settings._agent import config as agent_config
from ddtrace.internal.writer import create_trace_writer
from ddtrace.settings._config import config
from ddtrace.settings.asm import config as asm_config

Expand Down Expand Up @@ -292,20 +281,7 @@ def __init__(
self.tags_processor = TraceTagsProcessor()
self.dd_processors = dd_processors or []
self.user_processors = user_processors or []
if SpanAggregator._use_log_writer():
self.writer: TraceWriter = LogWriter()
else:
verify_url(agent_config.trace_agent_url)
self.writer = AgentWriter(
intake_url=agent_config.trace_agent_url,
dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
sync_mode=SpanAggregator._use_sync_mode(),
headers={"Datadog-Client-Computed-Stats": "yes"}
if (config._trace_compute_stats or asm_config._apm_opt_out)
else {},
report_metrics=not asm_config._apm_opt_out,
response_callback=self._agent_response_callback,
)
self.writer = create_trace_writer(response_callback=self._agent_response_callback)
# Initialize the trace buffer and lock
self._traces: DefaultDict[int, _Trace] = defaultdict(lambda: _Trace())
self._lock: RLock = RLock()
Expand Down Expand Up @@ -426,44 +402,6 @@ def _agent_response_callback(self, resp: AgentResponse) -> None:
except ValueError as e:
log.error("Failed to set agent service sample rates: %s", str(e))

@staticmethod
def _use_log_writer() -> bool:
"""Returns whether the LogWriter should be used in the environment by
default.

The LogWriter required by default in AWS Lambdas when the Datadog Agent extension
is not available in the Lambda.
"""
if (
environ.get("DD_AGENT_HOST")
or environ.get("DATADOG_TRACE_AGENT_HOSTNAME")
or environ.get("DD_TRACE_AGENT_URL")
):
# If one of these variables are set, we definitely have an agent
return False
elif in_aws_lambda() and has_aws_lambda_agent_extension():
# If the Agent Lambda extension is available then an AgentWriter is used.
return False
elif in_gcp_function() or in_azure_function():
return False
else:
return in_aws_lambda()

@staticmethod
def _use_sync_mode() -> bool:
"""Returns, if an `AgentWriter` is to be used, whether it should be run
in synchronous mode by default.

There are only two cases in which this is desirable:

- AWS Lambdas can have the Datadog agent installed via an extension.
When it's available traces must be sent synchronously to ensure all
are received before the Lambda terminates.
- Google Cloud Functions and Azure Functions have a mini-agent spun up by the tracer.
Similarly to AWS Lambdas, sync mode should be used to avoid data loss.
"""
return (in_aws_lambda() and has_aws_lambda_agent_extension()) or in_gcp_function() or in_azure_function()

def shutdown(self, timeout: Optional[float]) -> None:
"""
This will stop the background writer/worker and flush any finished traces in the buffer. The tracer cannot be
Expand Down Expand Up @@ -526,19 +464,8 @@ def reset(
This method is typically used after a process fork or during runtime reconfiguration.
Arguments that are None will not override existing values.
"""
try:
# Stop the writer to ensure it is not running while we reconfigure it.
self.writer.stop()
except ServiceStatusError:
# Writers like AgentWriter may not start until the first trace is encoded.
# Stopping them before that will raise a ServiceStatusError.
pass

if isinstance(self.writer, AgentWriterInterface) and appsec_enabled:
# Ensure AppSec metadata is encoded by setting the API version to v0.4.
self.writer._api_version = "v0.4"
# Re-create the writer to ensure it is consistent with updated configurations (ex: api_version)
self.writer = self.writer.recreate()
self.writer = self.writer.recreate(appsec_enabled=appsec_enabled)

# Recreate the sampling processor using new or existing config values.
# If an argument is None, the current value is preserved.
Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/ci_visibility/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ def stop(self, timeout=None):
if self.status != service.ServiceStatus.STOPPED:
super(CIVisibilityWriter, self).stop(timeout=timeout)

def recreate(self):
# type: () -> HTTPWriter
def recreate(self, appsec_enabled: Optional[bool] = None) -> "CIVisibilityWriter":
return self.__class__(
intake_url=self.intake_url,
processing_interval=self._interval,
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/internal/writer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .writer import Response
from .writer import TraceWriter
from .writer import _human_size
from .writer import create_trace_writer
from .writer_client import WriterClientBase


Expand All @@ -21,4 +22,5 @@
"TraceWriter",
"WriterClientBase",
"_human_size",
"create_trace_writer",
]
95 changes: 84 additions & 11 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@
from ddtrace.settings.asm import config as asm_config

from ...constants import _KEEP_SPANS_RATE_KEY
from ...internal.utils.formats import parse_tags_str
from ...internal.utils.http import Response
from ...internal.utils.time import StopWatch
from .. import compat
from .. import periodic
from .. import service
from .._encoding import BufferFull
from .._encoding import BufferItemTooLarge
from ..agent import get_connection
from ..constants import _HTTPLIB_NO_TRACE_REQUEST
from ..dogstatsd import get_dogstatsd_client
from ..encoding import JSONEncoderV2
from ..logger import get_logger
from ..serverless import has_aws_lambda_agent_extension
from ..serverless import in_aws_lambda
from ..serverless import in_azure_function
from ..serverless import in_gcp_function
from ..service import ServiceStatusError
from ..sma import SimpleMovingAverage
from ..utils.formats import parse_tags_str
from ..utils.http import Response
from ..utils.http import verify_url
from ..utils.time import StopWatch
from .writer_client import WRITER_CLIENTS
from .writer_client import AgentWriterClientV4
from .writer_client import WriterClientBase
Expand Down Expand Up @@ -79,9 +84,10 @@ def _human_size(nbytes: float) -> str:


class TraceWriter(metaclass=abc.ABCMeta):
# TODO: `appsec_enabled` is used by ASM to dynamically enable ASM at runtime.
# Find an alternative way to do this without having to pass the parameter/recreating the writer
@abc.abstractmethod
def recreate(self):
# type: () -> TraceWriter
def recreate(self, appsec_enabled: Optional[bool] = None) -> "TraceWriter":
pass

@abc.abstractmethod
Expand All @@ -106,8 +112,7 @@ def __init__(
self.encoder = JSONEncoderV2()
self.out = out

def recreate(self):
# type: () -> LogWriter
def recreate(self, appsec_enabled: Optional[bool] = None) -> "LogWriter":
"""Create a new instance of :class:`LogWriter` using the same settings from this instance

:rtype: :class:`LogWriter`
Expand Down Expand Up @@ -561,21 +566,31 @@ def __init__(
report_metrics=report_metrics,
)

def recreate(self) -> HTTPWriter:
new_instance = self.__class__(
def recreate(self, appsec_enabled: Optional[bool] = None) -> HTTPWriter:
# Ensure AppSec metadata is encoded by setting the API version to v0.4.
try:
# Stop the writer to ensure it is not running while we reconfigure it.
self.stop()
except ServiceStatusError:
# Writers like AgentWriter may not start until the first trace is encoded.
# Stopping them before that will raise a ServiceStatusError.
pass

api_version = "v0.4" if appsec_enabled else self._api_version

return self.__class__(
intake_url=self.intake_url,
processing_interval=self._interval,
buffer_size=self._buffer_size,
max_payload_size=self._max_payload_size,
timeout=self._timeout,
dogstatsd=self.dogstatsd,
sync_mode=self._sync_mode,
api_version=self._api_version,
api_version=api_version,
headers=self._headers,
report_metrics=self._report_metrics,
response_callback=self._response_cb,
)
return new_instance

@property
def _agent_endpoint(self):
Expand Down Expand Up @@ -640,3 +655,61 @@ def before_fork(self) -> None:

def set_test_session_token(self, token: Optional[str]) -> None:
self._headers["X-Datadog-Test-Session-Token"] = token or ""


def _use_log_writer() -> bool:
"""Returns whether the LogWriter should be used in the environment by
default.

The LogWriter is required by default in AWS Lambdas when the Datadog Agent extension
is not available in the Lambda.
"""
if (
os.environ.get("DD_AGENT_HOST")
or os.environ.get("DATADOG_TRACE_AGENT_HOSTNAME")
or os.environ.get("DD_TRACE_AGENT_URL")
):
# If one of these variables are set, we definitely have an agent
return False
elif in_aws_lambda() and has_aws_lambda_agent_extension():
# If the Agent Lambda extension is available then an AgentWriter is used.
return False
elif in_gcp_function() or in_azure_function():
return False
else:
return in_aws_lambda()


def _use_sync_mode() -> bool:
"""Returns, if an `AgentWriter` is to be used, whether it should be run
in synchronous mode by default.

There are only two cases in which this is desirable:

- AWS Lambdas can have the Datadog agent installed via an extension.
When it's available traces must be sent synchronously to ensure all
are received before the Lambda terminates.
- Google Cloud Functions and Azure Functions have a mini-agent spun up by the tracer.
Similarly to AWS Lambdas, sync mode should be used to avoid data loss.
"""
return (in_aws_lambda() and has_aws_lambda_agent_extension()) or in_gcp_function() or in_azure_function()


def create_trace_writer(response_callback: Optional[Callable[[AgentResponse], None]] = None) -> TraceWriter:
if _use_log_writer():
return LogWriter()

verify_url(agent_config.trace_agent_url)

headers: Dict[str, str] = {}
if config._trace_compute_stats or asm_config._apm_opt_out:
headers["Datadog-Client-Computed-Stats"] = "yes"

return AgentWriter(
intake_url=agent_config.trace_agent_url,
dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url),
sync_mode=_use_sync_mode(),
headers=headers,
report_metrics=not asm_config._apm_opt_out,
response_callback=response_callback,
)
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def pop(self):
flush_test_tracer_spans(self)
return spans

def recreate(self):
def recreate(self, appsec_enabled: Optional[bool] = None) -> "DummyWriter":
return self.__class__(trace_flush_enabled=self._trace_flush_enabled)


Expand Down
Loading