Skip to content

fix(ci_visibility): split payload in chunks if needed #13825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions ddtrace/internal/ci_visibility/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Tuple # noqa:F401

from ddtrace._trace.span import Span # noqa:F401

Expand All @@ -42,13 +43,15 @@ class CIVisibilityEncoderV01(BufferedEncoder):
TEST_SUITE_EVENT_VERSION = 1
TEST_EVENT_VERSION = 2
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE
_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB

def __init__(self, *args):
# DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
# which is called implicitly by Cython.
super(CIVisibilityEncoderV01, self).__init__()
self._metadata = {} # type: Dict[str, Dict[str, str]]
self._lock = threading.RLock()
self._metadata = {}
self._is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
self._init_buffer()

def __len__(self):
Expand All @@ -68,16 +71,18 @@ def put(self, spans):
self.buffer.append(spans)

def encode_traces(self, traces):
return self._build_payload(traces=traces)
return self._build_payload(traces=traces)[0]

def encode(self):
with self._lock:
if not self.buffer:
return None, 0
with StopWatch() as sw:
payload = self._build_payload(self.buffer)
payload, count = self._build_payload(self.buffer)
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
buffer_size = len(self.buffer)
self._init_buffer()
return payload, buffer_size
if count:
self.buffer = self.buffer[count:]
return payload, count

def _get_parent_session(self, traces):
for trace in traces:
Expand All @@ -87,29 +92,65 @@ def _get_parent_session(self, traces):
return 0

def _build_payload(self, traces):
new_parent_session_span_id = self._get_parent_session(traces)
is_not_xdist_worker = os.getenv("PYTEST_XDIST_WORKER") is None
normalized_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for trace in traces
for span in trace
if (is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE)
]
if not normalized_spans:
return None
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
if not traces:
return None, 0

# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
new_parent_session_span_id = self._get_parent_session(traces)
return self._send_all_or_half_spans(traces, new_parent_session_span_id)

def _send_all_or_half_spans(self, traces, new_parent_session_span_id):
# Convert all traces to spans with filtering
all_spans_with_trace_info = self._convert_traces_to_spans(traces, new_parent_session_span_id)
total_traces = len(traces)

# Get all spans (flattened)
all_spans = [span for _, trace_spans in all_spans_with_trace_info for span in trace_spans]

if not all_spans:
log.debug("No spans to encode after filtering, returning empty payload")
return None, total_traces

# Try to fit all spans first (optimistic case)
payload = self._create_payload_from_spans(all_spans)
if len(payload) <= self._MAX_PAYLOAD_SIZE or total_traces <= 1:
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(all_spans))
return payload, total_traces

mid = (total_traces + 1) // 2
return self._send_all_or_half_spans(traces[:mid], new_parent_session_span_id)

def _convert_traces_to_spans(self, traces, new_parent_session_span_id):
# type: (List[List[Span]], Optional[int]) -> List[Tuple[int, List[Dict[str, Any]]]]
"""Convert all traces to spans with xdist filtering applied."""
all_spans_with_trace_info = []
for trace_idx, trace in enumerate(traces):
trace_spans = [
self._convert_span(span, trace[0].context.dd_origin, new_parent_session_span_id)
for span in trace
if self._is_not_xdist_worker or span.get_tag(EVENT_TYPE) != SESSION_TYPE
]
all_spans_with_trace_info.append((trace_idx, trace_spans))

return all_spans_with_trace_info

def _create_payload_from_spans(self, spans):
# type: (List[Dict[str, Any]]) -> bytes
"""Create a payload from the given spans."""
return CIVisibilityEncoderV01._pack_payload(
{"version": self.PAYLOAD_FORMAT_VERSION, "metadata": self._metadata, "events": normalized_spans}
{
"version": self.PAYLOAD_FORMAT_VERSION,
"metadata": self._metadata,
"events": spans,
}
)

@staticmethod
def _pack_payload(payload):
return msgpack_packb(payload)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
sp = JSONEncoderV2._span_to_dict(span)
sp = JSONEncoderV2._normalize_span(sp)
sp["type"] = span.get_tag(EVENT_TYPE) or span.span_type
Expand Down Expand Up @@ -218,7 +259,7 @@ def _build_body(self, data):
def _build_data(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
normalized_covs = [
self._convert_span(span, "")
self._convert_span(span)
for trace in traces
for span in trace
if (COVERAGE_TAG_NAME in span.get_tags() or span.get_struct_tag(COVERAGE_TAG_NAME) is not None)
Expand All @@ -230,14 +271,14 @@ def _build_data(self, traces):
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})

def _build_payload(self, traces):
# type: (List[List[Span]]) -> Optional[bytes]
# type: (List[List[Span]]) -> Tuple[Optional[bytes], int]
data = self._build_data(traces)
if not data:
return None
return b"\r\n".join(self._build_body(data))
return None, 0
return b"\r\n".join(self._build_body(data)), len(data)

def _convert_span(self, span, dd_origin, new_parent_session_span_id=0):
# type: (Span, str, Optional[int]) -> Dict[str, Any]
def _convert_span(self, span, dd_origin=None, new_parent_session_span_id=0):
# type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
# DEV: new_parent_session_span_id is unused here, but it is used in super class
files: Dict[str, Any] = {}

Expand Down
77 changes: 41 additions & 36 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,47 +371,52 @@ def flush_queue(self, raise_exc: bool = False):
self._set_drop_rate()

def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None:
n_traces = len(client.encoder)
try:
encoded, n_traces = client.encoder.encode()
total_traces = len(client.encoder)
encoded_traces = total_traces # Declare the variable in case encode() fails
sent_traces = 0
while total_traces > sent_traces:
try:
encoded, encoded_traces = client.encoder.encode()

if encoded is None:
return
if encoded is None:
return

# Should gzip the payload if intake accepts it
if self._intake_accepts_gzip:
original_size = len(encoded)
# Replace the value to send with the gzipped the value
encoded = gzip.compress(encoded, compresslevel=6)
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))
# Should gzip the payload if intake accepts it
if self._intake_accepts_gzip:
original_size = len(encoded)
# Replace the value to send with the gzipped the value
encoded = gzip.compress(encoded, compresslevel=6)
log.debug("Original size in bytes: %s, Compressed size: %s", original_size, len(encoded))

# And add the header
self._headers["Content-Encoding"] = "gzip"
# And add the header
self._headers["Content-Encoding"] = "gzip"

except Exception:
# FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
self._metrics_dist("encoder.dropped.traces", n_traces)
return
except Exception:
# FIXME(munir): if client.encoder raises an Exception encoded_traces may not be accurate
# due to race conditions
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
self._metrics_dist("encoder.dropped.traces", encoded_traces)
return

try:
self._send_payload_with_backoff(encoded, n_traces, client)
except Exception:
self._metrics_dist("http.errors", tags=["type:err"])
self._metrics_dist("http.dropped.bytes", len(encoded))
self._metrics_dist("http.dropped.traces", n_traces)
if raise_exc:
raise
else:
log.error(
"failed to send, dropping %d traces to intake at %s after %d retries",
n_traces,
self._intake_endpoint(client),
self.RETRY_ATTEMPTS,
)
finally:
self._metrics_dist("http.sent.bytes", len(encoded))
self._metrics_dist("http.sent.traces", n_traces)
try:
self._send_payload_with_backoff(encoded, encoded_traces, client)
sent_traces += encoded_traces
except Exception:
self._metrics_dist("http.errors", tags=["type:err"])
self._metrics_dist("http.dropped.bytes", len(encoded))
self._metrics_dist("http.dropped.traces", encoded_traces)
if raise_exc:
raise
else:
log.error(
"failed to send, dropping %d traces to intake at %s after %d retries",
encoded_traces,
self._intake_endpoint(client),
self.RETRY_ATTEMPTS,
)
finally:
self._metrics_dist("http.sent.bytes", len(encoded))
self._metrics_dist("http.sent.traces", encoded_traces)

def periodic(self):
self.flush_queue(raise_exc=False)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fixes:
- |
CI Visibility: This PR fixes an issue where payloads exceeding 5MB could fail to be sent to the intake. Payloads are now automatically split if they exceed the size limit.
4 changes: 2 additions & 2 deletions tests/ci_visibility/test_ci_visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ def tearDown(self):
def assert_test_session_name(self, name):
"""Check that the payload metadata contains the test session name attributes."""
payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
)
assert payload["metadata"]["test_session_end"] == {"test_session.name": name}
assert payload["metadata"]["test_suite_end"] == {"test_session.name": name}
Expand Down Expand Up @@ -1493,7 +1493,7 @@ def test_set_library_capabilities(self):
)

payload = msgpack.loads(
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])
CIVisibility._instance.tracer._span_aggregator.writer._clients[0].encoder._build_payload([[Span("foo")]])[0]
)
assert payload["metadata"]["test"] == {
"_dd.library_capabilities.early_flake_detection": "1",
Expand Down
Loading
Loading