Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion tests/integration/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
from datetime import datetime
from importlib import reload
from time import sleep
from unittest import mock

Expand All @@ -37,10 +38,12 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.executors import base_executor
from airflow.models.dag import DAG
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.utils.state import State
from airflow.utils.state import State, TaskInstanceState
from tests.test_utils import db

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,6 +110,27 @@ def teardown_method(self) -> None:
db.clear_db_runs()
db.clear_db_jobs()

def test_change_state_back_compat(self):
# This represents the old implementation that an Airflow package may have
def _change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:
pass

# Replace change_state function on base executor with the old version to force the backcompat edge
# case we're looking for
base_executor.BaseExecutor.change_state = _change_state
# Create an instance of celery executor while the base executor is modified
from airflow.providers.celery.executors import celery_executor

executor = celery_executor.CeleryExecutor()

# This will throw an exception if the backcompat is not properly handled
executor.change_state(
key=TaskInstanceKey("foo", "bar", "baz"), state=TaskInstanceState.QUEUED, info="test"
)
# Restore the base executor and celery modules
reload(base_executor)
reload(celery_executor)

@pytest.mark.flaky(reruns=3)
@pytest.mark.parametrize("broker_url", _prepare_test_bodies())
def test_celery_integration(self, broker_url):
Expand Down