Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions airflow-ctl-tests/tests/airflowctl_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@

from tests_common.test_utils.fernet import generate_fernet_key_string

docker_client = None

class _CtlTestState:
docker_client: DockerClient | None = None


# Pytest hook to run at the start of the session
Expand Down Expand Up @@ -145,8 +147,6 @@ def docker_compose_up(tmp_path_factory):
"""Fixture to spin up Docker Compose environment for the test session."""
from shutil import copyfile

global docker_client

tmp_dir = tmp_path_factory.mktemp("airflow-ctl-test")
console.print(f"[yellow]Tests are run in {tmp_dir}")

Expand Down Expand Up @@ -174,23 +174,27 @@ def docker_compose_up(tmp_path_factory):
os.environ["FERNET_KEY"] = generate_fernet_key_string()

# Initialize Docker client
docker_client = DockerClient(compose_files=[str(tmp_docker_compose_file)])
_CtlTestState.docker_client = DockerClient(compose_files=[str(tmp_docker_compose_file)])

try:
console.print(f"[blue]Spinning up airflow environment using {DOCKER_IMAGE}")
docker_client.compose.up(detach=True, wait=True)
_CtlTestState.docker_client.compose.up(detach=True, wait=True)
console.print("[green]Docker compose started for airflowctl test\n")
except Exception:
print_diagnostics(docker_client.compose, docker_client.compose.version(), docker.version())
print_diagnostics(
_CtlTestState.docker_client.compose,
_CtlTestState.docker_client.compose.version(),
docker.version(),
)
debug_environment()
docker_compose_down()
raise


def docker_compose_down():
"""Tear down Docker Compose environment."""
if docker_client:
docker_client.compose.down(remove_orphans=True, volumes=True, quiet=True)
if _CtlTestState.docker_client:
_CtlTestState.docker_client.compose.down(remove_orphans=True, volumes=True, quiet=True)


def pytest_sessionfinish(session, exitstatus):
Expand Down
59 changes: 32 additions & 27 deletions airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import os
from datetime import datetime
from pathlib import Path
from shutil import copyfile, copytree

import pytest
Expand All @@ -40,8 +41,11 @@
from tests_common.test_utils.fernet import generate_fernet_key_string

console = Console(width=400, color_system="standard")
compose_instance = None
airflow_logs_path = None


class _E2ETestState:
compose_instance: DockerCompose | None = None
airflow_logs_path: Path | None = None


def _setup_s3_integration(dot_env_file, tmp_dir):
Expand All @@ -64,9 +68,7 @@ def _setup_s3_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)


def spin_up_airflow_environment(tmp_path_factory):
global compose_instance
global airflow_logs_path
def spin_up_airflow_environment(tmp_path_factory: pytest.TempPathFactory):
tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests")

console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}")
Expand All @@ -79,7 +81,7 @@ def spin_up_airflow_environment(tmp_path_factory):
for subdir in subfolders:
(tmp_dir / subdir).mkdir()

airflow_logs_path = tmp_dir / "logs"
_E2ETestState.airflow_logs_path = tmp_dir / "logs"

console.print(f"[yellow]Copying dags to:[/ {tmp_dir / 'dags'}")
copytree(E2E_DAGS_FOLDER, tmp_dir / "dags", dirs_exist_ok=True)
Expand Down Expand Up @@ -108,33 +110,37 @@ def spin_up_airflow_environment(tmp_path_factory):

try:
console.print(f"[blue]Spinning up airflow environment using {DOCKER_IMAGE}")
compose_instance = DockerCompose(tmp_dir, compose_file_name=compose_file_names, pull=pull)
_E2ETestState.compose_instance = DockerCompose(
tmp_dir, compose_file_name=compose_file_names, pull=pull
)

compose_instance.start()
_E2ETestState.compose_instance.start()

compose_instance.wait_for(f"http://{DOCKER_COMPOSE_HOST_PORT}/api/v2/monitor/health")
compose_instance.exec_in_container(
_E2ETestState.compose_instance.wait_for(f"http://{DOCKER_COMPOSE_HOST_PORT}/api/v2/monitor/health")
_E2ETestState.compose_instance.exec_in_container(
command=["airflow", "dags", "reserialize"], service_name="airflow-dag-processor"
)

except Exception:
console.print("[red]Failed to start docker compose")
_print_logs(compose_instance)
compose_instance.stop()
if _E2ETestState.compose_instance:
_print_logs(_E2ETestState.compose_instance)
_E2ETestState.compose_instance.stop()
raise


def _print_logs(compose_instance):
def _print_logs(compose_instance: DockerCompose):
containers = compose_instance.get_containers()
for container in containers:
service = container.Service
stdout, _ = compose_instance.get_logs(service)
console.print(f"::group:: {service} Logs")
console.print(f"[red]{stdout}")
console.print("::endgroup::")
if service:
stdout, _ = compose_instance.get_logs(service)
console.print(f"::group:: {service} Logs")
console.print(f"[red]{stdout}")
console.print("::endgroup::")


def pytest_sessionstart(session):
def pytest_sessionstart(session: pytest.Session):
tmp_path_factory = session.config._tmp_path_factory
spin_up_airflow_environment(tmp_path_factory)

Expand Down Expand Up @@ -162,19 +168,18 @@ def pytest_runtest_makereport(item, call):
test_results.append(test_result)


def pytest_sessionfinish(session, exitstatus):
def pytest_sessionfinish(session: pytest.Session, exitstatus: int | pytest.ExitCode):
"""Generate report after all tests complete."""
generate_test_report(test_results)
if airflow_logs_path is not None:
copytree(airflow_logs_path, LOGS_FOLDER, dirs_exist_ok=True)

# If any test failures lets print the services logs
if any(r["status"] == "failed" for r in test_results):
_print_logs(compose_instance=compose_instance)
if _E2ETestState.airflow_logs_path is not None:
copytree(_E2ETestState.airflow_logs_path, LOGS_FOLDER, dirs_exist_ok=True)

if compose_instance:
if _E2ETestState.compose_instance:
# If any test failures lets print the services logs
if any(r["status"] == "failed" for r in test_results):
_print_logs(_E2ETestState.compose_instance)
if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
compose_instance.stop()
_E2ETestState.compose_instance.stop()


def generate_test_report(results):
Expand Down