Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ repos:
language: python
entry: ./scripts/ci/pre_commit/check_init_decorator_arguments.py
pass_filenames: false
files: ^airflow-core/src/airflow/models/dag\.py$|^airflow-core/src/airflow/(?:decorators|utils)/task_group\.py$
files: ^task-sdk/src/airflow/sdk/definitions/dag\.py$|^task-sdk/src/airflow/sdk/definitions/decorators/task_group\.py$
additional_dependencies: ['rich>=12.4.4']
- id: check-template-context-variable-in-sync
name: Sync template context variable refs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ In this example, you have a regular data delivery to an S3 bucket and want to ap
from datetime import datetime

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator

Expand Down
6 changes: 3 additions & 3 deletions airflow-core/docs/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Not avoiding top-level DAG code:
import pendulum

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task


def expensive_api_call():
Expand Down Expand Up @@ -154,7 +154,7 @@ Avoiding top-level DAG code:
import pendulum

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task


def expensive_api_call():
Expand Down Expand Up @@ -536,7 +536,7 @@ It's easier to grab the concept with an example. Let's say that we have the foll
from datetime import datetime

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task
from airflow.exceptions import AirflowException
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/docs/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG gener

import datetime

from airflow.decorators import dag
from airflow.sdk import dag
from airflow.providers.standard.operators.empty import EmptyOperator


Expand Down Expand Up @@ -458,7 +458,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
# dags/branch_without_trigger.py
import pendulum

from airflow.decorators import task
from airflow.sdk import task
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

Expand Down Expand Up @@ -557,7 +557,7 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the
.. code-block:: python
:emphasize-lines: 4,12

from airflow.decorators import task_group
from airflow.sdk import task_group


@task_group()
Expand All @@ -578,7 +578,7 @@ TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``defau
import datetime

from airflow.sdk import DAG
from airflow.decorators import task_group
from airflow.sdk import task_group
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/core-concepts/params.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Use a dictionary that maps Param names to either a :class:`~airflow.sdk.definiti
:emphasize-lines: 7-10

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task
from airflow.sdk import Param

with DAG(
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/core-concepts/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ If you write most of your dags using plain Python code rather than Operators, th

TaskFlow takes care of moving inputs and outputs between your Tasks using XComs for you, as well as automatically calculating dependencies - when you call a TaskFlow function in your DAG file, rather than executing it, you will get an object representing the XCom for the result (an ``XComArg``), that you can then use as inputs to downstream tasks or operators. For example::

from airflow.decorators import task
from airflow.sdk import task
from airflow.providers.email import EmailOperator

@task
Expand Down Expand Up @@ -112,7 +112,7 @@ a ``Asset``, which is ``@attr.define`` decorated, together with TaskFlow.
import requests

from airflow import Asset
from airflow.decorators import dag, task
from airflow.sdk import dag, task

SRC = Asset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/core-concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ If you'd like to reproduce task instance heartbeat timeouts for development/test

.. code-block:: python

from airflow.decorators import dag
from airflow.sdk import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo
:name: dag_loader.py

from airflow.sdk import DAG
from airflow.decorators import task
from airflow.sdk import task

import pendulum

Expand Down Expand Up @@ -413,7 +413,7 @@ upstream task.

import pendulum

from airflow.decorators import dag, task
from airflow.sdk import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule

Expand Down
10 changes: 5 additions & 5 deletions airflow-core/docs/howto/create-custom-decorator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ tasks. The steps to create and register ``@task.foo`` are:
.. code-block:: python

from typing import TYPE_CHECKING
from airflow.decorators.base import task_decorator_factory
from airflow.sdk.bases.decorator import task_decorator_factory

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
from airflow.sdk.bases.decorator import TaskDecorator


def foo_task(
Expand Down Expand Up @@ -96,11 +96,11 @@ tasks. The steps to create and register ``@task.foo`` are:
For better or worse, Python IDEs can not auto-complete dynamically
generated methods (see `JetBrain's write up on the subject <https://intellij-support.jetbrains.com/hc/en-us/community/posts/115000665110-auto-completion-for-dynamic-module-attributes-in-python>`_).

To hack around this problem, a type stub ``airflow/decorators/__init__.pyi`` is provided to statically declare
To hack around this problem, a type stub ``airflow/sdk/definitions/decorators/__init__.pyi`` is provided to statically declare
the type signature of each task decorator. A newly added task decorator should declare its signature stub
like this:

.. exampleinclude:: /../src/airflow/decorators/__init__.pyi
.. exampleinclude:: ../../../task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
:language: python
:start-after: [START decorator_signature]
:end-before: [END decorator_signature]
Expand All @@ -114,7 +114,7 @@ If the new decorator can be used without arguments (e.g. ``@task.python`` instea
You should also add an overload that takes a single callable immediately after the "real" definition so mypy
can recognize the function as a "bare decorator":

.. exampleinclude:: /../src/airflow/decorators/__init__.pyi
.. exampleinclude:: ../../../task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi
:language: python
:start-after: [START mixin_for_typing]
:end-before: [END mixin_for_typing]
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/howto/dynamic-dag-generation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ and Airflow will automatically register them.
.. code-block:: python

from datetime import datetime
from airflow.decorators import dag, task
from airflow.sdk import dag, task

configs = {
"config1": {"message": "first DAG will receive this message"},
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/docs/tutorial/pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Here we retrieve data, save it to a file on our Airflow instance, and load the d

import os
import requests
from airflow.decorators import task
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


Expand Down Expand Up @@ -160,7 +160,7 @@ Here we select completely unique records from the retrieved data, then we check

.. code-block:: python

from airflow.decorators import task
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


Expand Down Expand Up @@ -219,7 +219,7 @@ Putting all of the pieces together, we have our completed DAG.
import os

import requests
from airflow.decorators import dag, task
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/tutorial/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ Below is an example of how you can reuse a decorated task in multiple dags:

.. code-block:: python

from airflow.decorators import task, dag
from airflow.sdk import task, dag
from datetime import datetime


Expand Down Expand Up @@ -216,7 +216,7 @@ Suppose the ``add_task`` code lives in a file called ``common.py``. You can do t
.. code-block:: python

from common import add_task
from airflow.decorators import dag
from airflow.sdk import dag
from datetime import datetime


Expand Down
23 changes: 22 additions & 1 deletion airflow-core/newsfragments/aip-72.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ As part of this change the following breaking changes have occurred:
different secrets backend.
Priority defined as workers backend > workers env > secrets backend on API server > API server env > metadata DB.

- All the decorators have been moved to the task SDK.

Old imports:

.. code-block:: python

from airflow.decorators import dag, task, task_group, setup, teardown

New imports:

.. code-block:: python

from airflow.sdk import dag, task, task_group, setup, teardown


* Types of change

Expand All @@ -111,4 +125,11 @@ As part of this change the following breaking changes have occurred:

* AIR302

* [x] ``airflow.models.baseoperatorlink.BaseOperatorLink`` → ``airflow.sdk.definitions.baseoperatorlink.BaseOperatorLink``
* [x] ``airflow.models.baseoperatorlink.BaseOperatorLink`` → ``airflow.sdk.BaseOperatorLink``
* [ ] ``airflow.models.dag.DAG`` → ``airflow.sdk.DAG``
* [ ] ``airflow.models.DAG`` → ``airflow.sdk.DAG``
* [ ] ``airflow.decorators.dag`` → ``airflow.sdk.dag``
* [ ] ``airflow.decorators.task`` → ``airflow.sdk.task``
* [ ] ``airflow.decorators.task_group`` → ``airflow.sdk.task_group``
* [ ] ``airflow.decorators.setup`` → ``airflow.sdk.setup``
* [ ] ``airflow.decorators.teardown`` → ``airflow.sdk.teardown``
64 changes: 20 additions & 44 deletions airflow-core/src/airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,23 @@
# under the License.
from __future__ import annotations

from typing import Callable

from airflow.decorators.base import TaskDecorator
from airflow.decorators.condition import run_if, skip_if
from airflow.decorators.setup_teardown import setup_task, teardown_task
from airflow.decorators.task_group import task_group
from airflow.models.dag import dag
from airflow.providers_manager import ProvidersManager

# Please keep this in sync with the .pyi's __all__.
__all__ = [
"TaskDecorator",
"TaskDecoratorCollection",
"dag",
"task",
"task_group",
"setup",
"teardown",
]


class TaskDecoratorCollection:
"""Implementation to provide the ``@task`` syntax."""

run_if = staticmethod(run_if)
skip_if = staticmethod(skip_if)

def __getattr__(self, name: str) -> TaskDecorator:
"""Dynamically get provider-registered task decorators, e.g. ``@task.docker``."""
if name.startswith("__"):
raise AttributeError(f"{type(self).__name__} has no attribute {name!r}")
decorators = ProvidersManager().taskflow_decorators
if name not in decorators:
raise AttributeError(f"task decorator {name!r} not found")
return decorators[name]

def __call__(self, *args, **kwargs):
"""Alias '@task' to python dynamically."""
return self.__getattr__("python")(*args, **kwargs)


task = TaskDecoratorCollection()
setup: Callable = setup_task
teardown: Callable = teardown_task
from airflow.sdk.definitions.decorators import (
TaskDecorator as TaskDecorator,
TaskDecoratorCollection as TaskDecoratorCollection,
dag as dag,
setup as setup,
task as task,
task_group as task_group,
teardown as teardown,
)
from airflow.utils.deprecation_tools import add_deprecated_classes

__deprecated_classes = {
"base": {
"DecoratedMappedOperator": "airflow.sdk.bases.decorator.DecoratedMappedOperator",
"DecoratedOperator": "airflow.sdk.bases.decorator.DecoratedOperator",
"TaskDecorator": "airflow.sdk.bases.decorator.TaskDecorator",
"task_decorator_factory": "airflow.sdk.bases.decorator.task_decorator_factory",
},
}
add_deprecated_classes(__deprecated_classes, __name__)
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/example_dags/example_asset_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@

import pendulum

from airflow.decorators import task
from airflow.sdk import DAG, Asset, AssetAlias
from airflow.sdk import DAG, Asset, AssetAlias, task

with DAG(
dag_id="asset_s3_bucket_producer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import pendulum

from airflow.decorators import dag, task
from airflow.sdk import Asset, asset
from airflow.sdk import Asset, asset, dag, task


@asset(uri="s3://bucket/asset1_producer", schedule=None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

from __future__ import annotations

from airflow.decorators import task
from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, chain
from airflow.sdk import DAG, Asset, AssetWatcher, chain, task

file_path = "/tmp/test"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import pendulum

from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.utils.weekday import WeekDay
from airflow.sdk import chain
from airflow.sdk import chain, dag, task
from airflow.utils.trigger_rule import TriggerRule


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@

import pendulum

from airflow.decorators import task
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG
from airflow.sdk import DAG, task
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@

import pendulum

from airflow.decorators import task
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG
from airflow.sdk import DAG, task


@task.branch()
Expand Down
Loading