Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions airflow-core/src/airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,11 @@
# under the License.
from __future__ import annotations

from typing import Any, Callable
from typing import Callable

from airflow.decorators.base import TaskDecorator
from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.condition import run_if, skip_if
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.sensor import sensor_task
from airflow.decorators.setup_teardown import setup_task, teardown_task
from airflow.decorators.short_circuit import short_circuit_task
from airflow.decorators.task_group import task_group
from airflow.models.dag import dag
from airflow.providers_manager import ProvidersManager
Expand All @@ -41,15 +32,6 @@
"dag",
"task",
"task_group",
"python_task",
"virtualenv_task",
"external_python_task",
"branch_task",
"branch_virtualenv_task",
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"bash_task",
"setup",
"teardown",
]
Expand All @@ -58,20 +40,9 @@
class TaskDecoratorCollection:
"""Implementation to provide the ``@task`` syntax."""

python = staticmethod(python_task)
virtualenv = staticmethod(virtualenv_task)
external_python = staticmethod(external_python_task)
branch = staticmethod(branch_task)
branch_virtualenv = staticmethod(branch_virtualenv_task)
branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)
bash = staticmethod(bash_task)
run_if = staticmethod(run_if)
skip_if = staticmethod(skip_if)

__call__: Any = python # Alias '@task' to '@task.python'.

def __getattr__(self, name: str) -> TaskDecorator:
"""Dynamically get provider-registered task decorators, e.g. ``@task.docker``."""
if name.startswith("__"):
Expand All @@ -81,6 +52,10 @@ def __getattr__(self, name: str) -> TaskDecorator:
raise AttributeError(f"task decorator {name!r} not found")
return decorators[name]

def __call__(self, *args, **kwargs):
"""Alias '@task' to python dynamically."""
return self.__getattr__("python")(*args, **kwargs)


task = TaskDecoratorCollection()
setup: Callable = setup_task
Expand Down
18 changes: 0 additions & 18 deletions airflow-core/src/airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,7 @@ from docker.types import Mount
from kubernetes.client import models as k8s

from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator, _TaskDecorator
from airflow.decorators.bash import bash_task
from airflow.decorators.branch_external_python import branch_external_python_task
from airflow.decorators.branch_python import branch_task
from airflow.decorators.branch_virtualenv import branch_virtualenv_task
from airflow.decorators.condition import AnyConditionFunc
from airflow.decorators.external_python import external_python_task
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.sensor import sensor_task
from airflow.decorators.short_circuit import short_circuit_task
from airflow.decorators.task_group import task_group
from airflow.models.dag import dag
from airflow.providers.cncf.kubernetes.secret import Secret
Expand All @@ -50,15 +41,6 @@ __all__ = [
"dag",
"task",
"task_group",
"python_task",
"virtualenv_task",
"external_python_task",
"branch_task",
"branch_virtualenv_task",
"branch_external_python_task",
"short_circuit_task",
"sensor_task",
"bash_task",
"setup",
"teardown",
]
Expand Down
7 changes: 6 additions & 1 deletion airflow-core/src/airflow/decorators/setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import types
from typing import TYPE_CHECKING, Callable

from airflow.decorators import python_task
from airflow.decorators.task_group import _TaskGroupFactory
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.setup_teardown import SetupTeardownContext

if TYPE_CHECKING:
from airflow import XComArg

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.decorators.python import python_task
else:
from airflow.decorators import python_task # type: ignore


def setup_task(func: Callable) -> Callable:
# Using FunctionType here since _TaskDecorator is also a callable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2787,7 +2787,7 @@ def x(arg1, arg2, arg3):
"_is_empty": False,
"_is_mapped": True,
"_can_skip_downstream": False,
"_task_module": "airflow.decorators.python",
"_task_module": "airflow.providers.standard.decorators.python",
"task_type": "_PythonDecoratedOperator",
"_operator_name": "@task",
"downstream_task_ids": [],
Expand Down Expand Up @@ -2902,7 +2902,7 @@ def x(arg1, arg2, arg3):
"_is_empty": False,
"_is_mapped": True,
"_can_skip_downstream": False,
"_task_module": "airflow.decorators.python",
"_task_module": "airflow.providers.standard.decorators.python",
"task_type": "_PythonDecoratedOperator",
"_operator_name": "@task",
"python_callable_name": qualname(x),
Expand Down
20 changes: 20 additions & 0 deletions providers/standard/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,23 @@ config:
type: string
example: uv
default: auto

task-decorators:
- class-name: airflow.providers.standard.decorators.python.python_task
name: python
- class-name: airflow.providers.standard.decorators.bash.bash_task
name: bash
- class-name: airflow.providers.standard.decorators.branch_external_python.branch_external_python_task
name: branch_external_python
- class-name: airflow.providers.standard.decorators.branch_python.branch_task
name: branch
- class-name: airflow.providers.standard.decorators.branch_virtualenv.branch_virtualenv_task
name: branch_virtualenv
- class-name: airflow.providers.standard.decorators.external_python.external_python_task
name: external_python
- class-name: airflow.providers.standard.decorators.python_virtualenv.virtualenv_task
name: virtualenv
- class-name: airflow.providers.standard.decorators.sensor.sensor_task
name: sensor
- class-name: airflow.providers.standard.decorators.short_circuit.short_circuit_task
name: short_circuit
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import BranchExternalPythonOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import BranchPythonOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import BranchPythonVirtualenvOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import ExternalPythonOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import PythonVirtualenvOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.decorators.python import _PythonDecoratedOperator
from airflow.providers.standard.operators.python import ShortCircuitOperator

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,35 @@ def get_provider_info():
},
}
},
"task-decorators": [
{"class-name": "airflow.providers.standard.decorators.python.python_task", "name": "python"},
{"class-name": "airflow.providers.standard.decorators.bash.bash_task", "name": "bash"},
{
"class-name": "airflow.providers.standard.decorators.branch_external_python.branch_external_python_task",
"name": "branch_external_python",
},
{
"class-name": "airflow.providers.standard.decorators.branch_python.branch_task",
"name": "branch",
},
{
"class-name": "airflow.providers.standard.decorators.branch_virtualenv.branch_virtualenv_task",
"name": "branch_virtualenv",
},
{
"class-name": "airflow.providers.standard.decorators.external_python.external_python_task",
"name": "external_python",
},
{
"class-name": "airflow.providers.standard.decorators.python_virtualenv.virtualenv_task",
"name": "virtualenv",
},
{"class-name": "airflow.providers.standard.decorators.sensor.sensor_task", "name": "sensor"},
{
"class-name": "airflow.providers.standard.decorators.short_circuit.short_circuit_task",
"name": "short_circuit",
},
],
"dependencies": ["apache-airflow>=2.9.0"],
"devel-dependencies": [],
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import os
import stat
import warnings
from contextlib import nullcontext as no_raise
from pathlib import Path
Expand All @@ -29,15 +28,20 @@
from airflow.decorators import task
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
from airflow.utils import timezone

from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_rendered_ti_fields
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.providers.standard.operators.bash import BashOperator

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
else:
# bad hack but does the job
from airflow.utils.types import NOTSET as SET_DURING_EXECUTION # type: ignore[assignment]

DEFAULT_DATE = timezone.datetime(2023, 1, 1)

Expand Down Expand Up @@ -91,7 +95,10 @@ def bash(): ...
assert bash_task.operator.output_encoding == "utf-8"
assert bash_task.operator.skip_on_exit_code == [99]
assert bash_task.operator.cwd is None
assert bash_task.operator._is_inline_cmd is None
if AIRFLOW_V_3_0_PLUS:
assert bash_task.operator._is_inline_cmd is None
else:
assert bash_task.operator._init_bash_command_not_set is True

@pytest.mark.parametrize(
argnames=["command", "expected_command", "expected_return_val"],
Expand Down Expand Up @@ -289,7 +296,8 @@ def test_env_variables_in_bash_command_file(
"""Test the behavior of user-defined env vars when using an external file with a Bash command."""
cmd_file = tmp_path / "test_file.sh"
cmd_file.write_text("#!/usr/bin/env bash\necho AIRFLOW_HOME=$AIRFLOW_HOME\necho razz=$razz\n")
cmd_file.chmod(stat.S_IEXEC)
# setting chmod +x test_file.sh
cmd_file.chmod(0o755)

with self.dag:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import pytest

from airflow.decorators import task
from airflow.exceptions import DownstreamTasksSkipped
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.exceptions import DownstreamTasksSkipped
else:
from airflow.utils.state import State

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -74,8 +79,32 @@ def branch_operator():

dr = dag_maker.create_dagrun()
df.operator.run(start_date=dr.logical_date, end_date=dr.logical_date, ignore_ti_state=True)
with pytest.raises(DownstreamTasksSkipped) as exc_info:
if AIRFLOW_V_3_0_PLUS:
with pytest.raises(DownstreamTasksSkipped) as exc_info:
branchoperator.operator.run(
start_date=dr.logical_date, end_date=dr.logical_date, ignore_ti_state=True
)
assert exc_info.value.tasks == [(skipped_task_name, -1)]
else:
branchoperator.operator.run(
start_date=dr.logical_date, end_date=dr.logical_date, ignore_ti_state=True
)
assert exc_info.value.tasks == [(skipped_task_name, -1)]
task_1.operator.run(start_date=dr.logical_date, end_date=dr.logical_date, ignore_ti_state=True)
task_2.operator.run(start_date=dr.logical_date, end_date=dr.logical_date, ignore_ti_state=True)
tis = dr.get_task_instances()

for ti in tis:
if ti.task_id == "dummy_f":
assert ti.state == State.SUCCESS
if ti.task_id == "branching":
assert ti.state == State.SUCCESS

if ti.task_id == "task_1" and branch_task_name == "task_1":
assert ti.state == State.SUCCESS
elif ti.task_id == "task_1":
assert ti.state == State.SKIPPED

if ti.task_id == "task_2" and branch_task_name == "task_2":
assert ti.state == State.SUCCESS
elif ti.task_id == "task_2":
assert ti.state == State.SKIPPED
Loading