|
| 1 | +import pytest |
| 2 | +from pytest_docker_tools import build |
| 3 | +from pytest_docker_tools import container |
| 4 | +from pytest_docker_tools import fxtr |
| 5 | + |
| 6 | +from pytest_celery import DEFAULT_WORKER_CONTAINER_TIMEOUT |
| 7 | +from pytest_celery import DEFAULT_WORKER_VOLUME |
| 8 | +from pytest_celery import WORKER_DOCKERFILE_ROOTDIR |
| 9 | +from pytest_celery import CeleryTestSetup |
| 10 | +from pytest_celery import CeleryTestWorker |
| 11 | +from pytest_celery import CeleryWorkerCluster |
| 12 | +from pytest_celery import CeleryWorkerContainer |
| 13 | +from tests.conftest import get_celery_versions |
| 14 | + |
| 15 | +versions_range = get_celery_versions("v5.0.0", "v5.0.5") |
| 16 | +versions_list = ["v4.4.7", "v5.2.7", "v5.3.0"] |
| 17 | + |
| 18 | + |
| 19 | +def generate_workers(versions: list[str]) -> list[str]: |
| 20 | + worker_containers = list() |
| 21 | + for v in versions: |
| 22 | + img = f"worker_v{v.replace('.', '_')}_image" |
| 23 | + globals()[img] = build( |
| 24 | + path=WORKER_DOCKERFILE_ROOTDIR, |
| 25 | + tag=f"pytest-celery/examples/worker:v{v}", |
| 26 | + buildargs={ |
| 27 | + "CELERY_VERSION": v, |
| 28 | + "CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"), |
| 29 | + "CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"), |
| 30 | + "CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"), |
| 31 | + }, |
| 32 | + ) |
| 33 | + cnt = f"worker_v{v.replace('.', '_')}_container" |
| 34 | + globals()[cnt] = container( |
| 35 | + image="{" + f"{img}.id" + "}", |
| 36 | + environment=fxtr("default_worker_env"), |
| 37 | + network="{default_pytest_celery_network.name}", |
| 38 | + volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, |
| 39 | + wrapper_class=CeleryWorkerContainer, |
| 40 | + timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, |
| 41 | + ) |
| 42 | + worker_containers.append(cnt) |
| 43 | + return worker_containers |
| 44 | + |
| 45 | + |
| 46 | +class TestClusterList: |
| 47 | + @pytest.fixture(params=[generate_workers(versions_list)]) |
| 48 | + def celery_worker_cluster(self, request: pytest.FixtureRequest) -> CeleryWorkerCluster: |
| 49 | + nodes: list[CeleryWorkerContainer] = [request.getfixturevalue(worker) for worker in request.param] |
| 50 | + cluster = CeleryWorkerCluster(*nodes) |
| 51 | + yield cluster |
| 52 | + cluster.teardown() |
| 53 | + |
| 54 | + def test_worker_cluster_with_fixed_list(self, celery_setup: CeleryTestSetup, subtests): |
| 55 | + worker: CeleryTestWorker |
| 56 | + for version, worker in zip(versions_list, celery_setup.worker_cluster): |
| 57 | + with subtests.test(msg=f"Found worker {version} in cluster"): |
| 58 | + assert f"{worker.hostname()} {version}" in worker.logs() |
| 59 | + |
| 60 | + |
| 61 | +class TestClusterRange: |
| 62 | + @pytest.fixture(params=[generate_workers(versions_range)]) |
| 63 | + def celery_worker_cluster(self, request: pytest.FixtureRequest) -> CeleryWorkerCluster: |
| 64 | + nodes: list[CeleryWorkerContainer] = [request.getfixturevalue(worker) for worker in request.param] |
| 65 | + cluster = CeleryWorkerCluster(*nodes) |
| 66 | + yield cluster |
| 67 | + cluster.teardown() |
| 68 | + |
| 69 | + def test_worker_cluster_with_versions_range(self, celery_setup: CeleryTestSetup, subtests): |
| 70 | + worker: CeleryTestWorker |
| 71 | + for version, worker in zip(versions_range, celery_setup.worker_cluster): |
| 72 | + with subtests.test(msg=f"Found worker v{version} in cluster"): |
| 73 | + assert f"{worker.hostname()} v{version}" in worker.logs() |
0 commit comments