Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions devel-common/src/tests_common/test_utils/version_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)

if AIRFLOW_V_3_1_PLUS:
from airflow.models.xcom import XCOM_RETURN_KEY
else:
from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef]


def get_sqlalchemy_version_tuple() -> tuple[int, int, int]:
import sqlalchemy
Expand All @@ -47,3 +52,13 @@ def get_sqlalchemy_version_tuple() -> tuple[int, int, int]:

SQLALCHEMY_V_1_4 = (1, 4, 0) <= get_sqlalchemy_version_tuple() < (2, 0, 0)
SQLALCHEMY_V_2_0 = (2, 0, 0) <= get_sqlalchemy_version_tuple() < (2, 1, 0)


__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_0_1",
"AIRFLOW_V_3_1_PLUS",
"SQLALCHEMY_V_1_4",
"SQLALCHEMY_V_2_0",
"XCOM_RETURN_KEY",
]
Comment on lines +57 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding this (which is also missing the fns etc) change the import to from ... import XCOM_RETURN_KEY as XCOM_RETURN_KEY - that should remove the unused import warning

Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@
container_is_succeeded,
get_container_termination_message,
)
from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
from airflow.providers.cncf.kubernetes.version_compat import XCOM_RETURN_KEY, BaseOperator
from airflow.settings import pod_mutation_hook
from airflow.utils import yaml
from airflow.utils.helpers import prune_dict, validate_key
from airflow.utils.xcom import XCOM_RETURN_KEY
from airflow.version import version as airflow_version

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)

if AIRFLOW_V_3_1_PLUS:
from airflow.models.xcom import XCOM_RETURN_KEY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're changing this, it's should be changed to use something from sdk, not core.

Also this reminds me- the airflow version check in providers needs to go soon too - we ideally want to "duck type" off what we can I the sdk (i.e. try import except)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this reminds me- the airflow version check in providers needs to go soon too - we ideally want to "duck type" off what we can I the sdk (i.e. try import except)

Question: are we going to have task-sdk installable also in Airflow 2 or is it going to be optional dependency of providers? Currently it's not that's why have all the "ifs" and they have to be part of provider's code - so such a version check in sdk will only be really possible to remove from providers if task-sdk is also installable on Airflow 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be installable in AF2 and if adding an optional dep in providers is what leads to, I think we should.

from airflow.sdk import BaseHook, BaseOperator
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
from airflow.models import BaseOperator # type: ignore[no-redef]
from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef]

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
Expand All @@ -61,4 +63,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"DecoratedOperator",
"TaskDecorator",
"task_decorator_factory",
"XCOM_RETURN_KEY",
]
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from airflow.utils.context import Context
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
from airflow.utils.xcom import XCOM_RETURN_KEY

from tests_common.test_utils import db
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
Expand Down Expand Up @@ -1575,7 +1574,7 @@ def test_xcom_push_failed_pod(self, remote_pod, mock_await, mock_extract_xcom):
with pytest.raises(AirflowException):
k.execute(context=context)

context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test key": "Test value"})
context["ti"].xcom_push.assert_called_with("return_value", {"Test key": "Test value"})

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.xcom import XCOM_RETURN_KEY

from tests_common.test_utils import db
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, XCOM_RETURN_KEY

pytestmark = [pytest.mark.db_test]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
MSGraphTrigger,
ResponseSerializer,
)
from airflow.providers.microsoft.azure.version_compat import BaseOperator
from airflow.utils.xcom import XCOM_RETURN_KEY
from airflow.providers.microsoft.azure.version_compat import XCOM_RETURN_KEY, BaseOperator

if TYPE_CHECKING:
from io import BytesIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseHook
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import (
BaseOperator,
Expand All @@ -50,10 +45,19 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
from airflow.models import BaseOperator, BaseOperatorLink # type: ignore[no-redef]
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if AIRFLOW_V_3_1_PLUS:
from airflow.models.xcom import XCOM_RETURN_KEY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto - sdk please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR: #53180

from airflow.sdk import BaseHook
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef]

__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"BaseHook",
"BaseOperator",
"BaseOperatorLink",
"BaseSensorOperator",
"XCOM_RETURN_KEY",
]
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from airflow.utils import timezone
from airflow.utils.task_instance_session import set_current_task_instance_session
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.xcom import XCOM_RETURN_KEY

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_1, AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_1, AIRFLOW_V_3_0_PLUS, XCOM_RETURN_KEY
from unit.standard.operators.test_python import BasePythonTest

if AIRFLOW_V_3_0_PLUS:
Expand Down