Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/actions/breeze/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ description: 'Sets up Python and Breeze'
inputs:
python-version:
description: 'Python version to use'
default: "3.9"
default: "3.10"
use-uv:
description: 'Whether to use uv tool'
required: true
Expand Down
8 changes: 1 addition & 7 deletions .github/actions/prepare_all_ci_images/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,8 @@ runs:
# TODO: Currently we cannot loop through the list of python versions and have dynamic list of
# tasks. Instead we hardcode all possible python versions and they - but
# this should be implemented in stash action as list of keys to download.
# That includes 3.8 - 3.12 as we are backporting it to v2-10-test branch
# That includes 3.9 - 3.12 as we are backporting it to v3-0-test branch
# This is captured in https://github.com/apache/airflow/issues/45268
- name: "Restore CI docker image ${{ inputs.platform }}:3.8"
uses: ./.github/actions/prepare_single_ci_image
with:
platform: ${{ inputs.platform }}
python: "3.8"
python-versions-list-as-string: ${{ inputs.python-versions-list-as-string }}
- name: "Restore CI docker image ${{ inputs.platform }}:3.9"
uses: ./.github/actions/prepare_single_ci_image
with:
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/test-providers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ jobs:
breeze release-management generate-issue-content-providers
--only-available-in-dist --disable-progress
if: matrix.package-format == 'wheel'
- name: Remove Python 3.9-incompatible provider distributions
run: |
echo "Removing Python 3.9-incompatible provider: cloudant"
rm -vf dist/*cloudant*
- name: "Generate source constraints from CI image"
shell: bash
run: >
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ARG AIRFLOW_USER_HOME_DIR=/home/airflow
# latest released version here
ARG AIRFLOW_VERSION="3.0.2"

ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
ARG PYTHON_BASE_IMAGE="python:3.10-slim-bookworm"


# You can swap comments between those two args to test pip from the main version
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
# WARNING: THIS DOCKERFILE IS NOT INTENDED FOR PRODUCTION USE OR DEPLOYMENT.
#
ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
ARG PYTHON_BASE_IMAGE="python:3.10-slim-bookworm"

##############################################################################################
# This is the script image where we keep all inlined bash scripts needed in other segments
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Apache Airflow is tested with:

| | Main version (dev) | Stable version (3.0.2) |
|------------|------------------------|------------------------|
| Python | 3.9, 3.10, 3.11, 3.12 | 3.9, 3.10, 3.11, 3.12 |
| Python | 3.10, 3.11, 3.12 | 3.9, 3.10, 3.11, 3.12 |
| Platform | AMD64/ARM64(\*) | AMD64/ARM64(\*) |
| Kubernetes | 1.30, 1.31, 1.32, 1.33 | 1.30, 1.31, 1.32, 1.33 |
| PostgreSQL | 13, 14, 15, 16, 17 | 13, 14, 15, 16, 17 |
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/extra-packages-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ with a consistent set of dependencies based on constraint files provided by Airf
:substitutions:

pip install apache-airflow[google,amazon,apache-spark]==|version| \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"

Note, that this will install providers in the versions that were released at the time of Airflow |version| release. You can later
upgrade those providers manually if you want to use latest versions of the providers.
Expand Down
10 changes: 5 additions & 5 deletions airflow-core/docs/installation/installing-from-pypi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Typical command to install Airflow from scratch in a reproducible way from PyPI

.. code-block:: bash

pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"


Typically, you can add other dependencies and providers as separate command after the reproducible
Expand Down Expand Up @@ -112,7 +112,7 @@ but you can pick your own set of extras and providers to install.

.. code-block:: bash

pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"


.. note::
Expand Down Expand Up @@ -143,15 +143,15 @@ performing dependency resolution.

.. code-block:: bash

pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"
pip install "apache-airflow==|version|" apache-airflow-providers-google==10.1.1

You can also downgrade or upgrade other dependencies this way - even if they are not compatible with
those dependencies that are stored in the original constraints file:

.. code-block:: bash

pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"
pip install "apache-airflow[celery]==|version|" dbt-core==0.20.0

.. warning::
Expand Down Expand Up @@ -194,7 +194,7 @@ one provided by the community.

.. code-block:: bash

pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt"
pip install "apache-airflow[celery]==|version|" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt"
pip install "apache-airflow==|version|" dbt-core==0.20.0
pip freeze > my-constraints.txt

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ This quick start guide will help you bootstrap an Airflow standalone instance on

Officially supported installation methods is with``pip`.

Run ``pip install apache-airflow[EXTRAS]==AIRFLOW_VERSION --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-AIRFLOW_VERSION/constraints-PYTHON_VERSION.txt"``, for example ``pip install "apache-airflow[celery]==3.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.9.txt"`` to install Airflow in a reproducible way.
Run ``pip install apache-airflow[EXTRAS]==AIRFLOW_VERSION --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-AIRFLOW_VERSION/constraints-PYTHON_VERSION.txt"``, for example ``pip install "apache-airflow[celery]==3.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.10.txt"`` to install Airflow in a reproducible way.



Expand Down Expand Up @@ -75,7 +75,7 @@ This quick start guide will help you bootstrap an Airflow standalone instance on
PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 3.0.0 with python 3.9: https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.9.txt
# For example this would install 3.0.0 with python 3.10: https://raw.githubusercontent.com/apache/airflow/constraints-|version|/constraints-3.10.txt

uv pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import logging
import os
import shutil
from collections.abc import Iterable
from collections.abc import Callable, Iterable
from pathlib import Path
from subprocess import run
from typing import Any, Callable
from typing import Any

from hatchling.builders.config import BuilderConfig
from hatchling.builders.plugin.interface import BuilderInterface
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/api_fastapi/auth/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import time
import uuid
from base64 import urlsafe_b64encode
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Literal, overload
from typing import TYPE_CHECKING, Any, Literal, overload

import attrs
import httpx
Expand Down Expand Up @@ -68,7 +68,7 @@ def key_to_jwk_dict(key: AllowedKeys, kid: str | None = None):
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from jwt.algorithms import OKPAlgorithm, RSAAlgorithm

if isinstance(key, (RSAPrivateKey, Ed25519PrivateKey)):
if isinstance(key, RSAPrivateKey | Ed25519PrivateKey):
key = key.public_key()

if isinstance(key, RSAPublicKey):
Expand Down
21 changes: 9 additions & 12 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterable
from collections.abc import Callable, Iterable
from datetime import datetime
from enum import Enum
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Callable,
Generic,
Literal,
Optional,
TypeVar,
Union,
overload,
)

Expand Down Expand Up @@ -267,7 +264,7 @@ def __init__(
self.filter_option: FilterOptionEnum = filter_option

def to_orm(self, select: Select) -> Select:
if isinstance(self.value, (list, str)) and not self.value and self.skip_none:
if isinstance(self.value, list | str) and not self.value and self.skip_none:
return select
if self.value is None and self.skip_none:
return select
Expand Down Expand Up @@ -520,14 +517,14 @@ def depends_float(

# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
OptionalDateTimeQuery = Annotated[Union[str, None], AfterValidator(_safe_parse_datetime_optional)]
OptionalDateTimeQuery = Annotated[str | None, AfterValidator(_safe_parse_datetime_optional)]

# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter.depends)]
QueryOffset = Annotated[OffsetFilter, Depends(OffsetFilter.depends)]
QueryPausedFilter = Annotated[
FilterParam[Optional[bool]],
Depends(filter_param_factory(DagModel.is_paused, Optional[bool], filter_name="paused")),
FilterParam[bool | None],
Depends(filter_param_factory(DagModel.is_paused, bool | None, filter_name="paused")),
]
QueryExcludeStaleFilter = Annotated[_ExcludeStaleFilter, Depends(_ExcludeStaleFilter.depends)]
QueryDagIdPatternSearch = Annotated[
Expand All @@ -544,8 +541,8 @@ def depends_float(

# DagRun
QueryLastDagRunStateFilter = Annotated[
FilterParam[Optional[DagRunState]],
Depends(filter_param_factory(DagRun.state, Optional[DagRunState], filter_name="last_dag_run_state")),
FilterParam[DagRunState | None],
Depends(filter_param_factory(DagRun.state, DagRunState | None, filter_name="last_dag_run_state")),
]


Expand Down Expand Up @@ -696,8 +693,8 @@ def _optional_boolean(value: bool | None) -> bool | None:
return value if value is not None else False


QueryIncludeUpstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
QueryIncludeDownstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
QueryIncludeUpstream = Annotated[bool, AfterValidator(_optional_boolean)]
QueryIncludeDownstream = Annotated[bool, AfterValidator(_optional_boolean)]

state_priority: list[None | TaskInstanceState] = [
TaskInstanceState.FAILED,
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/api_fastapi/common/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

from __future__ import annotations

from typing import Any, Callable
from collections.abc import Callable
from typing import Any

from fastapi import APIRouter
from fastapi.types import DecoratedCallable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

from __future__ import annotations

from typing import Annotated, Callable
from collections.abc import Callable
from typing import Annotated

from pydantic import BeforeValidator, Field

Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/api_fastapi/core_api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# under the License.
from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Callable
from typing import TYPE_CHECKING, Annotated
from urllib.parse import ParseResult, urljoin, urlparse

from fastapi import Depends, HTTPException, Request, status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def handle_bulk_create(self, action: BulkCreateAction, results: BulkActionRespon

for variable in action.entities:
if variable.key in create_keys:
should_serialize_json = isinstance(variable.value, (dict, list))
should_serialize_json = isinstance(variable.value, dict | list)
Variable.set(
key=variable.key,
value=variable.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def get_child_task_map(parent_task_id: str, task_node_map: dict[str, dict[str, A


def _count_tis(node: int | MappedTaskGroup | MappedOperator, run_id: str, session: SessionDep) -> int:
if not isinstance(node, (MappedTaskGroup, MappedOperator)):
if not isinstance(node, MappedTaskGroup | MappedOperator):
return node
with contextlib.suppress(NotFullyPopulated, NotMapped):
return DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import uuid
from datetime import timedelta
from enum import Enum
from typing import Annotated, Any, Literal, Union
from typing import Annotated, Any, Literal

from pydantic import (
AwareDatetime,
Expand Down Expand Up @@ -213,14 +213,12 @@ def ti_state_discriminator(v: dict[str, str] | StrictBaseModel) -> str:
# It is called "_terminal_" to avoid future conflicts if we added an actual state named "terminal"
# and "_other_" is a catch-all for all other states that are not covered by the other schemas.
TIStateUpdate = Annotated[
Union[
Annotated[TITerminalStatePayload, Tag("_terminal_")],
Annotated[TISuccessStatePayload, Tag("success")],
Annotated[TITargetStatePayload, Tag("_other_")],
Annotated[TIDeferredStatePayload, Tag("deferred")],
Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")],
Annotated[TIRetryStatePayload, Tag("up_for_retry")],
],
Annotated[TITerminalStatePayload, Tag("_terminal_")]
| Annotated[TISuccessStatePayload, Tag("success")]
| Annotated[TITargetStatePayload, Tag("_other_")]
| Annotated[TIDeferredStatePayload, Tag("deferred")]
| Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")]
| Annotated[TIRetryStatePayload, Tag("up_for_retry")],
Discriminator(ti_state_discriminator),
]

Expand Down
8 changes: 4 additions & 4 deletions airflow-core/src/airflow/api_fastapi/execution_api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import sys
import time
from typing import Any, Optional
from typing import Any

import structlog
import svcs
Expand Down Expand Up @@ -55,8 +55,8 @@ class JWTBearer(HTTPBearer):

def __init__(
self,
path_param_name: Optional[str] = None,
required_claims: Optional[dict[str, Any]] = None,
path_param_name: str | None = None,
required_claims: dict[str, Any] | None = None,
):
super().__init__(auto_error=False)
self.path_param_name = path_param_name
Expand All @@ -66,7 +66,7 @@ async def __call__( # type: ignore[override]
self,
request: Request,
services=DepContainer,
) -> Optional[TIToken]:
) -> TIToken | None:
creds = await super().__call__(request)
if not creds:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing auth token")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def _create_ti_state_update_query_and_update_state(
dag_bag: DagBagDep,
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload, TIRetryStatePayload, TISuccessStatePayload)):
if isinstance(ti_patch_payload, TITerminalStatePayload | TIRetryStatePayload | TISuccessStatePayload):
ti = session.get(TI, ti_id_str)
updated_state = ti_patch_payload.state
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from __future__ import annotations

from typing import Optional

from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema

from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext
Expand All @@ -30,7 +28,7 @@ class DowngradeUpstreamMapIndexes(VersionChange):
description = __doc__

instructions_to_migrate_to_previous_version = (
schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, int]]), # type: ignore
schema(TIRunContext).field("upstream_map_indexes").had(type=dict[str, int] | None), # type: ignore
)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Annotated, Literal, Union
from typing import TYPE_CHECKING, Annotated, Literal

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -86,6 +86,6 @@ class DagCallbackRequest(BaseCallbackRequest):


CallbackRequest = Annotated[
Union[DagCallbackRequest, TaskCallbackRequest],
DagCallbackRequest | TaskCallbackRequest,
Field(discriminator="type"),
]
Loading
Loading