Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ def worker(args):
# This needs to be imported locally to not trigger Providers Manager initialization
from airflow.providers.celery.executors.celery_executor import app as celery_app

# Check if a worker with the same hostname already exists
if args.celery_hostname:
inspect = celery_app.control.inspect()
active_workers = inspect.active_queues()
if active_workers:
active_worker_names = list(active_workers.keys())
# Check if any worker ends with @hostname
if any(name.endswith(f"@{args.celery_hostname}") for name in active_worker_names):
raise SystemExit(
f"Error: A worker with hostname '{args.celery_hostname}' is already running. "
"Please use a different hostname or stop the existing worker first."
)

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.log import configure_logging

Expand Down
52 changes: 52 additions & 0 deletions providers/celery/tests/unit/celery/cli/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,58 @@ def test_worker_failure_gracefull_shutdown(self, mock_celery_app, mock_popen):
mock_popen().terminate.assert_called()


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")
class TestWorkerDuplicateHostnameCheck:
@classmethod
def setup_class(cls):
with conf_vars({("core", "executor"): "CeleryExecutor"}):
importlib.reload(executor_loader)
importlib.reload(cli_parser)
cls.parser = cli_parser.get_parser()

@pytest.mark.db_test
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
def test_worker_fails_when_hostname_already_exists(self, mock_inspect):
"""Test that worker command fails when trying to start a worker with a duplicate hostname."""
args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "existing_host"])

# Mock the inspect to return an active worker with the same hostname
mock_instance = MagicMock()
mock_instance.active_queues.return_value = {
"celery@existing_host": [{"name": "queue1"}],
}
mock_inspect.return_value = mock_instance

# Test that SystemExit is raised with appropriate error message
with pytest.raises(SystemExit) as exc_info:
celery_command.worker(args)

assert "existing_host" in str(exc_info.value)
assert "already running" in str(exc_info.value)

@pytest.mark.db_test
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
def test_worker_starts_when_hostname_is_unique(self, mock_celery_app, mock_popen, mock_inspect):
"""Test that worker command succeeds when the hostname is unique."""
args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "new_host"])

# Mock the inspect to return active workers without the new hostname
mock_instance = MagicMock()
mock_instance.active_queues.return_value = {
"celery@existing_host": [{"name": "queue1"}],
}
mock_inspect.return_value = mock_instance

# Worker should start successfully
celery_command.worker(args)

# Verify that worker_main was called
assert mock_celery_app.worker_main.called


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")
class TestFlowerCommand:
Expand Down