Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ def __getattr__(name):


__deprecated_classes = {
"abstractoperator": {
"AbstractOperator": "airflow.sdk.definitions._internal.abstractoperator.AbstractOperator",
"NotMapped": "airflow.sdk.definitions._internal.abstractoperator.NotMapped",
"TaskStateChangeCallback": "airflow.sdk.definitions._internal.abstractoperator.TaskStateChangeCallback",
"DEFAULT_OWNER": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_OWNER",
"DEFAULT_QUEUE": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_QUEUE",
"DEFAULT_TASK_EXECUTION_TIMEOUT": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_TASK_EXECUTION_TIMEOUT",
},
"param": {
"Param": "airflow.sdk.definitions.param.Param",
"ParamsDict": "airflow.sdk.definitions.param.ParamsDict",
Expand Down
34 changes: 0 additions & 34 deletions airflow-core/src/airflow/models/abstractoperator.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
"bash_command": "echo {{ task.task_id }}",
"task_type": "BashOperator",
"_task_module": "airflow.providers.standard.operators.bash",
"owner": "airflow",
"pool": "default_pool",
"is_setup": False,
"is_teardown": False,
Expand Down Expand Up @@ -3162,6 +3163,7 @@ def test_handle_v1_serdag():
"_task_type": "BashOperator",
# Slightly difference from v2-10-stable here, we manually changed this path
"_task_module": "airflow.providers.standard.operators.bash",
"owner": "airflow",
"pool": "default_pool",
"is_setup": False,
"is_teardown": False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
from airflow.cli.cli_config import GroupCommand
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.models.abstractoperator import DEFAULT_QUEUE

try:
from airflow.models.abstractoperator import DEFAULT_QUEUE
except (ImportError, AttributeError):
from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_QUEUE
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.edge3.cli.edge_command import EDGE_COMMANDS
from airflow.providers.edge3.models.edge_job import EdgeJobModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

TaskStateChangeCallback = Callable[[Context], None]

DEFAULT_OWNER: str = "airflow"
DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_POOL_NAME = "default_pool"
DEFAULT_PRIORITY_WEIGHT: int = 1
Expand All @@ -62,17 +62,23 @@
MINIMUM_PRIORITY_WEIGHT: int = -2147483648
MAXIMUM_PRIORITY_WEIGHT: int = 2147483647
DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = conf.get("operators", "default_queue", "default")
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
DEFAULT_RETRIES: int = 0
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300)
MAX_RETRY_DELAY: int = 24 * 60 * 60
DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0)
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
seconds=conf.getint("core", "default_task_retry_delay", fallback=300)
)
MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 * 60 * 60)

# TODO: Task-SDK -- these defaults should be overridable from the Airflow config
DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
DEFAULT_WEIGHT_RULE: WeightRule = WeightRule.DOWNSTREAM
DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None
DEFAULT_WEIGHT_RULE: WeightRule = WeightRule(
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta(
"core", "default_task_execution_timeout"
)

log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class TaskDecoratorCollection:
"""
# [END decorator_signature]
@overload
def kubernetes(
def kubernetes( # type: ignore[misc]
self,
*,
multiple_outputs: bool | None = None,
Expand Down Expand Up @@ -670,7 +670,7 @@ class TaskDecoratorCollection:
@overload
def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
@overload
def kubernetes_cmd(
def kubernetes_cmd( # type: ignore[misc]
self,
*,
args_only: bool = False, # Added by _KubernetesCmdDecoratedOperator.
Expand Down
7 changes: 3 additions & 4 deletions task-sdk/src/airflow/sdk/definitions/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import attrs
import methodtools

from airflow.models.abstractoperator import TaskStateChangeCallback
from airflow.sdk.definitions._internal.abstractoperator import (
DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
Expand All @@ -42,6 +41,7 @@
DEFAULT_WEIGHT_RULE,
AbstractOperator,
NotMapped,
TaskStateChangeCallback,
)
from airflow.sdk.definitions._internal.expandinput import (
DictOfListsExpandInput,
Expand All @@ -51,7 +51,7 @@
from airflow.sdk.definitions._internal.types import NOTSET
from airflow.serialization.enums import DagAttributeTypes
from airflow.task.priority_strategy import PriorityWeightStrategy, validate_and_load_priority_weight_strategy
from airflow.typing_compat import Literal
from airflow.typing_compat import Literal, TypeAlias, TypeGuard
from airflow.utils.helpers import is_container, prevent_duplicates
from airflow.utils.xcom import XCOM_RETURN_KEY

Expand All @@ -73,13 +73,12 @@
from airflow.sdk.definitions.xcom_arg import XComArg
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
from airflow.typing_compat import TypeGuard
from airflow.utils.context import Context
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

TaskStateChangeCallbackAttrType = TaskStateChangeCallback | list[TaskStateChangeCallback] | None
TaskStateChangeCallbackAttrType: TypeAlias = TaskStateChangeCallback | list[TaskStateChangeCallback] | None
ValidationSource = Literal["expand"] | Literal["partial"]


Expand Down
Loading