Skip to content

chore(ci_visibility): aggregate ITR skips in xdist main process #13639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e7950ec
atr + xdist test improvements
gnufede Jun 10, 2025
6e67582
xdist + ITR tests
gnufede Jun 10, 2025
9a45af8
itr + xdist changes
gnufede Jun 10, 2025
1726ab1
plugin changes
gnufede Jun 10, 2025
c8277f3
integration tests for itr + xdist
gnufede Jun 10, 2025
6cb10a8
Revert "allow for test level skipping"
gnufede Jun 10, 2025
7cb7013
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 10, 2025
f804946
reduce test code
gnufede Jun 11, 2025
ec23911
add unit tests
gnufede Jun 11, 2025
4e422dd
finish regardless of itr
gnufede Jun 11, 2025
6e6c255
change place to set itr tags
gnufede Jun 11, 2025
d41b7c8
use proper channels to set distributed itr skip count
gnufede Jun 12, 2025
22bc855
details
gnufede Jun 12, 2025
6f7c2bc
xdist context propagation tests fixed
gnufede Jun 12, 2025
9785070
pass proper argument
gnufede Jun 12, 2025
6c3adec
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 12, 2025
7d87b6c
ensure we are in main process
gnufede Jun 12, 2025
70f665a
empty commit
gnufede Jun 12, 2025
c6d4421
fix test, and another
gnufede Jun 12, 2025
74db92d
style
gnufede Jun 12, 2025
28760c1
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 12, 2025
d8e9cba
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 13, 2025
94a5096
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 13, 2025
52c11c9
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 13, 2025
d021aac
Merge branch 'main' into gnufede/SDTEST-2150-pytest-xdist-itr-skipped…
gnufede Jun 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions ddtrace/contrib/internal/pytest/_plugin_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ def pytest_configure_node(self, node):

node.workerinput["root_span"] = root_span

@pytest.hookimpl
def pytest_testnodedown(self, node, error):
if hasattr(node, "workeroutput") and "itr_skipped_count" in node.workeroutput:
if not hasattr(pytest, "global_worker_itr_results"):
pytest.global_worker_itr_results = 0
pytest.global_worker_itr_results += node.workeroutput["itr_skipped_count"]


def _handle_itr_should_skip(item, test_id) -> bool:
"""Checks whether a test should be skipped
Expand All @@ -135,6 +142,13 @@ def _handle_itr_should_skip(item, test_id) -> bool:
InternalTest.mark_itr_skipped(test_id)
# Marking the test as skipped by ITR so that it appears in pytest's output
item.add_marker(pytest.mark.skip(reason=SKIPPED_BY_ITR_REASON)) # TODO don't rely on internal for reason

# If we're in a worker process, count the skipped test
if hasattr(item.config, "workeroutput"):
if "itr_skipped_count" not in item.config.workeroutput:
item.config.workeroutput["itr_skipped_count"] = 0
item.config.workeroutput["itr_skipped_count"] += 1

return True

return False
Expand Down Expand Up @@ -267,6 +281,10 @@ def pytest_configure(config: pytest_Config) -> None:

if config.pluginmanager.hasplugin("xdist"):
config.pluginmanager.register(XdistHooks())

if not hasattr(config, "workerinput") and os.environ.get("PYTEST_XDIST_WORKER") is None:
# Main process
pytest.global_worker_itr_results = 0
else:
# If the pytest ddtrace plugin is not enabled, we should disable CI Visibility, as it was enabled during
# pytest_load_initial_conftests
Expand Down Expand Up @@ -315,6 +333,7 @@ def pytest_sessionstart(session: pytest.Session) -> None:
InternalTestSession.set_library_capabilities(library_capabilities)

extracted_context = None
distributed_children = False
if hasattr(session.config, "workerinput"):
from ddtrace._trace.context import Context
from ddtrace.constants import USER_KEEP
Expand All @@ -332,8 +351,10 @@ def pytest_sessionstart(session: pytest.Session) -> None:
"pytest_sessionstart: Could not convert root_span %s to int",
received_root_span,
)
elif hasattr(pytest, "global_worker_itr_results"):
distributed_children = True

InternalTestSession.start(extracted_context)
InternalTestSession.start(distributed_children, extracted_context)

if InternalTestSession.efd_enabled() and not _pytest_version_supports_efd():
log.warning("Early Flake Detection disabled: pytest version is not supported")
Expand Down Expand Up @@ -453,7 +474,7 @@ def _pytest_runtest_protocol_post_yield(item, nextitem, coverage_collector):
InternalTestSuite.mark_itr_skipped(suite_id)
else:
_handle_coverage_dependencies(suite_id)
InternalTestSuite.finish(suite_id)
InternalTestSuite.finish(suite_id)
if nextitem is None or (next_test_id is not None and next_test_id.parent_id.parent_id != module_id):
InternalTestModule.finish(module_id)

Expand Down Expand Up @@ -767,6 +788,14 @@ def _pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
if ModuleCodeCollector.is_installed():
ModuleCodeCollector.uninstall()

# Count ITR skipped tests from workers if we're in the main process
if hasattr(pytest, "global_worker_itr_results"):
skipped_count = pytest.global_worker_itr_results
if skipped_count > 0:
# Update the session's internal _itr_skipped_count so that when _set_itr_tags() is called
# during session finishing, it will use the correct worker-aggregated count
InternalTestSession.set_itr_tags(skipped_count)

InternalTestSession.finish(
force_finish_children=True,
override_status=TestStatus.FAIL if session.exitstatus == pytest.ExitCode.TESTS_FAILED else None,
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/ext/test_visibility/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def discover(

@staticmethod
@_catch_and_log_exceptions
def start(context: Optional[Context] = None):
def start(distributed_children: bool = False, context: Optional[Context] = None):
log.debug("Starting session")
core.dispatch("test_visibility.session.start", (context,))
core.dispatch("test_visibility.session.start", (distributed_children, context))

class FinishArgs(NamedTuple):
force_finish_children: bool
Expand Down
6 changes: 5 additions & 1 deletion ddtrace/internal/ci_visibility/api/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def __init__(
) -> None:
super().__init__(name, session_settings, operation_name, initial_tags)
self._children: Dict[CIDT, CITEMT] = {}
self._distributed_children = False

def get_status(self) -> Union[TestStatus, SPECIAL_STATUS]:
"""Recursively computes status based on all children's status
Expand Down Expand Up @@ -659,5 +660,8 @@ def _set_itr_tags(self, itr_enabled: bool) -> None:
self.set_tag(test.ITR_TEST_SKIPPING_TESTS_SKIPPED, self._itr_skipped_count > 0)

# Only parent items set skipped counts because tests would always be 1 or 0
if self._children:
if self._children or self._distributed_children:
self.set_tag(test.ITR_TEST_SKIPPING_COUNT, self._itr_skipped_count)

def set_distributed_children(self) -> None:
self._distributed_children = True
4 changes: 4 additions & 0 deletions ddtrace/internal/ci_visibility/api/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ def _telemetry_record_event_finished(self):
def add_coverage_data(self, *args, **kwargs):
raise NotImplementedError("Coverage data cannot be added to session.")

def set_skipped_count(self, skipped_count: int):
self._itr_skipped_count = skipped_count
self._set_itr_tags(self._session_settings.itr_test_skipping_enabled)

def set_covered_lines_pct(self, coverage_pct: float):
self.set_tag(test.TEST_LINES_PCT, coverage_pct)

Expand Down
11 changes: 10 additions & 1 deletion ddtrace/internal/ci_visibility/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,10 +1103,12 @@ def _on_discover_session(discover_args: TestSession.DiscoverArgs) -> None:


@_requires_civisibility_enabled
def _on_start_session(context: Optional[Context] = None) -> None:
def _on_start_session(distributed_children: bool = False, context: Optional[Context] = None) -> None:
log.debug("Handling start session")
session = CIVisibility.get_session()
session.start(context)
if distributed_children:
session.set_distributed_children()


@_requires_civisibility_enabled
Expand Down Expand Up @@ -1171,6 +1173,12 @@ def _on_session_set_library_capabilities(capabilities: LibraryCapabilities) -> N
CIVisibility.set_library_capabilities(capabilities)


@_requires_civisibility_enabled
def _on_session_set_itr_skipped_count(skipped_count: int) -> None:
log.debug("Setting skipped count: %d", skipped_count)
CIVisibility.get_session().set_skipped_count(skipped_count)


@_requires_civisibility_enabled
def _on_session_get_path_codeowners(path: Path) -> Optional[List[str]]:
log.debug("Getting codeowners for path %s", path)
Expand Down Expand Up @@ -1203,6 +1211,7 @@ def _register_session_handlers() -> None:
)
core.on("test_visibility.session.set_covered_lines_pct", _on_session_set_covered_lines_pct)
core.on("test_visibility.session.set_library_capabilities", _on_session_set_library_capabilities)
core.on("test_visibility.session.set_itr_skipped_count", _on_session_set_itr_skipped_count)


@_requires_civisibility_enabled
Expand Down
6 changes: 6 additions & 0 deletions ddtrace/internal/test_visibility/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ def get_path_codeowners(path: Path) -> t.Optional[t.List[str]]:
def set_library_capabilities(capabilities: LibraryCapabilities) -> None:
core.dispatch("test_visibility.session.set_library_capabilities", (capabilities,))

@staticmethod
@_catch_and_log_exceptions
def set_itr_tags(skipped_count: int):
log.debug("Setting itr session tags: %d", skipped_count)
core.dispatch("test_visibility.session.set_itr_skipped_count", (skipped_count,))


class InternalTestModule(ext_api.TestModule, InternalTestBase):
pass
Expand Down
Loading
Loading