Skip to content

Commit

Permalink
airbyte-ci: run incremental acceptance tests step for community conne…
Browse files Browse the repository at this point in the history
…ctors� (#39363)

## What
When a connector under test is a community connector we want to fail acceptance tests only if the current branch is adding new test failures compared to CAT results in the released image.

## How
* Introduce a `IncrementalAcceptanceTests` step which runs after `AcceptanceTests`
* It runs `AcceptanceTests` on the released image of the connector.
* It compares the test reports from the released version vs. the current branch version and fails if new failing tests are detected.

## Some words about the "cost" of running acceptance tests twice
Acceptance tests will run on the connector container of the branch and on a container build from the latest released image.
You can consider acceptance tests is run twice and this is costly, but:
* Subsequent runs of acceptance tests on the released image will be cached by Dagger, it should lead to 1 run of CAT on released image per PRs.
* It's a good way to make sure we're comparing apple to apples: instead of pulling a previously generated test report on a potentially different version of CAT we use the exact same test suite on both versions.
* This `IncrementalAcceptanceTests` only runs for community connectors, it is skipped for certified ones, so it's not adding an overhead for Airbyters maintaining certified connecors.
* It's a good way to run "just in time" tests instead of scheduling costly nightly / weekly builds on all community connectors on `master`.
  • Loading branch information
alafanechere authored Jun 11, 2024
1 parent 1900f07 commit 8fd3a3c
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
"""

ACCEPTANCE = "acceptance"
INCREMENTAL_ACCEPTANCE = "incremental_acceptance"
BUILD_NORMALIZATION = "build_normalization"
BUILD_TAR = "build_tar"
BUILD = "build"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
"""This module groups steps made to run tests agnostic to a connector language."""

import datetime
import json
import os
import time
from abc import ABC, abstractmethod
from functools import cached_property
from pathlib import Path
from textwrap import dedent
from typing import ClassVar, List, Optional
from typing import ClassVar, List, Optional, Set

import requests # type: ignore
import semver
Expand Down Expand Up @@ -344,12 +345,100 @@ async def get_step_result(self, container: Container) -> StepResult:
status=self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output=container,
output={"report_log": report_log_artifact},
artifacts=[report_log_artifact],
consider_in_overall_status=is_hard_failure,
)


class IncrementalAcceptanceTests(Step):
"""This step runs the acceptance tests on the released image of the connector and compares the results with the current acceptance tests report log.
It fails if there are new failing tests in the current acceptance tests report log.
"""

title = "Incremental Acceptance Tests"
context: ConnectorContext

async def get_failed_pytest_node_ids(self, current_acceptance_tests_report_log: Artifact) -> Set[str]:
"""Parse the report log of the acceptance tests and return the pytest node ids of the failed tests.
Args:
current_acceptance_tests_report_log (Artifact): The report log of the acceptance tests.
Returns:
List[str]: The pytest node ids of the failed tests.
"""
current_report_lines = (await current_acceptance_tests_report_log.content.contents()).splitlines()
failed_nodes = set()
for line in current_report_lines:
single_test_report = json.loads(line)
if "nodeid" not in single_test_report or "outcome" not in single_test_report:
continue
if single_test_report["outcome"] == "failed":
failed_nodes.add(single_test_report["nodeid"])
return failed_nodes

async def get_result_log_on_master(self) -> Artifact:
"""Runs acceptance test on the released image of the connector and returns the report log.
The released image version is fetched from the master metadata file of the connector.
We're not using the online connector registry here as some connectors might not be released to OSS nor Airbyte Cloud.
Thanks to Dagger caching subsequent runs of this step will be cached if the released image did not change.
Returns:
Artifact: The report log of the acceptance tests run on the released image.
"""
raw_master_metadata = requests.get(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations/connectors/{self.context.connector.technical_name}/metadata.yaml"
)
master_metadata = yaml.safe_load(raw_master_metadata.text)
master_docker_image_tag = master_metadata["data"]["dockerImageTag"]
released_image = f'{master_metadata["data"]["dockerRepository"]}:{master_docker_image_tag}'
released_container = self.dagger_client.container().from_(released_image)
self.logger.info(f"Running acceptance tests on released image: {released_image}")
acceptance_tests_results_on_master = await AcceptanceTests(self.context, self.secrets).run(released_container)
return acceptance_tests_results_on_master.output["report_log"]

async def _run(self, current_acceptance_tests_result: StepResult) -> StepResult:
"""Compare the acceptance tests report log of the current image with the one of the released image.
Fails if there are new failing tests in the current acceptance tests report log.
"""
if current_acceptance_tests_result.consider_in_overall_status:
return StepResult(
step=self, status=StepStatus.SKIPPED, stdout="Skipping because the current acceptance tests are hard failures."
)

current_acceptance_tests_report_log = current_acceptance_tests_result.output["report_log"]
current_failing_nodes = await self.get_failed_pytest_node_ids(current_acceptance_tests_report_log)
if not current_failing_nodes:
return StepResult(
step=self, status=StepStatus.SKIPPED, stdout="No failing acceptance tests were detected on the current version."
)

master_failings = await self.get_failed_pytest_node_ids(await self.get_result_log_on_master())
new_failing_nodes = current_failing_nodes - master_failings
if not new_failing_nodes:
return StepResult(
step=self,
status=StepStatus.SUCCESS,
stdout=dedent(
f"""
No new failing acceptance tests were detected.
Acceptance tests are still failing with {len(current_failing_nodes)} failing tests but the AcceptanceTests step is not a hard failure for this connector.
Please checkout the original acceptance tests failures and assess how critical they are.
"""
),
)
else:
return StepResult(
step=self,
status=StepStatus.FAILURE,
stdout=f"{len(new_failing_nodes)} new failing acceptance tests detected:\n-"
+ "\n-".join(current_failing_nodes)
+ "\nPlease fix the new failing tests before merging this PR."
+ f"\nPlease also check the original {len(current_failing_nodes)} acceptance tests failures and assess how critical they are.",
)


class RegressionTests(Step):
"""A step to run regression tests for a connector."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.test.context import ConnectorTestContext
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, RegressionTests
from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, IncrementalAcceptanceTests, RegressionTests
from pipelines.consts import LOCAL_BUILD_PLATFORM
from pipelines.dagger.actions import secrets
from pipelines.dagger.actions.python.poetry import with_poetry
Expand Down Expand Up @@ -293,4 +293,12 @@ def get_test_steps(context: ConnectorTestContext) -> STEP_TREE:
depends_on=[CONNECTOR_TEST_STEP_ID.BUILD],
),
],
[
StepToRun(
id=CONNECTOR_TEST_STEP_ID.INCREMENTAL_ACCEPTANCE,
step=IncrementalAcceptanceTests(context, secrets=context.get_secrets_for_step_id(CONNECTOR_TEST_STEP_ID.ACCEPTANCE)),
args=lambda results: {"current_acceptance_tests_result": results[CONNECTOR_TEST_STEP_ID.ACCEPTANCE]},
depends_on=[CONNECTOR_TEST_STEP_ID.ACCEPTANCE],
)
],
]
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ def _step_dependencies_succeeded(step_to_eval: StepToRun, results: RESULTS_DICT)
f"Step {step_to_eval.id} depends on {step_id} which has not been run yet. This implies that the order of the steps is not correct. Please check that the steps are in the correct order."
)

return all(results[step_id] and results[step_id].status is StepStatus.SUCCESS for step_id in step_to_eval.depends_on)
return all(
results[step_id] and (results[step_id].status is StepStatus.SUCCESS or not results[step_id].consider_in_overall_status)
for step_id in step_to_eval.depends_on
)


def _filter_skipped_steps(steps_to_evaluate: STEP_TREE, skip_steps: List[str], results: RESULTS_DICT) -> Tuple[STEP_TREE, RESULTS_DICT]:
Expand Down Expand Up @@ -301,7 +304,7 @@ async def run_steps(
options.log_step_tree = False

# If any of the previous steps failed, skip the remaining steps
if options.fail_fast and any(result.status is StepStatus.FAILURE for result in results.values()):
if options.fail_fast and any(result.status is StepStatus.FAILURE and result.consider_in_overall_status for result in results.values()):
skipped_results = _skip_remaining_steps(runnables)
return {**results, **skipped_results}

Expand Down

0 comments on commit 8fd3a3c

Please sign in to comment.