Skip to content

Commit 441d093

Browse files
committed
chore: refactor tracer partial flushing logic
itr:noskip This simplify the code and removes the edge case where we tag a complete trace chunk with unncessary partial flush tag
1 parent bf2f416 commit 441d093

File tree

1 file changed

+25
-23
lines changed

1 file changed

+25
-23
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,20 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
263263

264264

265265
class _Trace:
266-
def __init__(self, spans=None, num_finished=0):
267-
self.spans = spans if spans is not None else []
268-
self.num_finished = num_finished
266+
__slots__ = ("spans", "num_finished")
267+
268+
def __init__(self, spans: Optional[List[Span]] = None, num_finished: int = 0):
269+
self.spans: List[Span] = spans if spans is not None else []
270+
self.num_finished: int = num_finished
271+
272+
def remove_finished(self) -> List[Span]:
273+
# perf: Avoid Span.finished which is a computed property and has function call overhead
274+
# so check Span.duration_ns manually.
275+
finished = [s for s in self.spans if s.duration_ns is not None]
276+
if finished:
277+
self.spans[:] = [s for s in self.spans if s.duration_ns is None]
278+
self.num_finished = 0
279+
return finished
269280

270281

271282
class SpanAggregator(SpanProcessor):
@@ -350,31 +361,22 @@ def on_span_finish(self, span: Span) -> None:
350361
return
351362

352363
trace = self._traces[span.trace_id]
353-
num_buffered = len(trace.spans)
354364
trace.num_finished += 1
355-
should_partial_flush = self.partial_flush_enabled and trace.num_finished >= self.partial_flush_min_spans
356-
is_trace_complete = trace.num_finished >= len(trace.spans)
357-
if not is_trace_complete and not should_partial_flush:
358-
return
359-
360-
if not is_trace_complete:
361-
finished = [s for s in trace.spans if s.finished]
362-
if not finished:
363-
return
364-
trace.spans[:] = [s for s in trace.spans if not s.finished] # In-place update
365-
trace.num_finished = 0
366-
else:
365+
num_buffered = len(trace.spans)
366+
is_trace_complete = trace.num_finished >= num_buffered
367+
num_finished = trace.num_finished
368+
should_partial_flush = False
369+
if is_trace_complete:
367370
finished = trace.spans
368371
del self._traces[span.trace_id]
369372
# perf: Flush span finish metrics to the telemetry writer after the trace is complete
370373
self._queue_span_count_metrics("spans_finished", "integration_name")
371-
372-
num_finished = len(finished)
373-
if should_partial_flush:
374-
# FIXME(munir): should_partial_flush should return false if all the spans in the trace are finished.
375-
# For example if partial flushing min spans is 10 and the trace has 10 spans, the trace should
376-
# not have a partial flush metric. This trace was processed in its entirety.
377-
finished[0].set_metric("_dd.py.partial_flush", num_finished)
374+
elif self.partial_flush_enabled and num_finished >= self.partial_flush_min_spans:
375+
should_partial_flush = True
376+
finished = trace.remove_finished()
377+
finished[0].set_metric("_dd.py.partial_flush", num_finished)
378+
else:
379+
return
378380

379381
# perf: Process spans outside of the span aggregator lock
380382
spans = finished

0 commit comments

Comments
 (0)