Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ function start_api_server_with_examples(){
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
echo
echo "${COLOR_BLUE}Initializing database${COLOR_RESET}"
Expand Down
38 changes: 0 additions & 38 deletions airflow/api/auth/backend/session.py

This file was deleted.

9 changes: 3 additions & 6 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def inversed_deprecated_sections(self):
"api": {
"auth_backends": (
re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
"airflow.api.auth.backend.session",
"airflow.providers.fab.auth_manager.api.auth.backend.session",
"3.0",
),
},
Expand Down Expand Up @@ -686,10 +686,7 @@ def _upgrade_auth_backends(self):
This is required by the UI for ajax queries.
"""
old_value = self.get("api", "auth_backends", fallback="")
if (
old_value.find("airflow.api.auth.backend.session") == -1
and old_value.find("airflow.providers.fab.auth_manager.api.auth.backend.session") == -1
):
if "airflow.providers.fab.auth_manager.api.auth.backend.session" not in old_value:
new_value = old_value + ",airflow.providers.fab.auth_manager.api.auth.backend.session"
self._update_env_var(section="api", name="auth_backends", new_value=new_value)
self.upgraded_values[("api", "auth_backends")] = old_value
Expand All @@ -700,7 +697,7 @@ def _upgrade_auth_backends(self):
os.environ.pop(old_env_var, None)

warnings.warn(
"The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
"The auth_backends setting in [api] missed airflow.providers.fab.auth_manager.api.auth.backend.session "
"in the running config, which is needed by the UI. Please update your config before "
"Apache Airflow 3.0.",
FutureWarning,
Expand Down
7 changes: 0 additions & 7 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,6 @@ class PodReconciliationError(AirflowException): # type: ignore[no-redef]
"""Raised when an error is encountered while trying to merge pod configs."""


class RemovedInAirflow3Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in Airflow3."""

deprecated_since: str | None = None
"Indicates the airflow version that started raising this deprecation warning"


class RemovedInAirflow4Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in Airflow4."""

Expand Down
20 changes: 0 additions & 20 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from typing import TYPE_CHECKING, Any, Optional

import pendulum
from deprecated import deprecated

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
Expand Down Expand Up @@ -587,24 +585,6 @@ def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError

@deprecated(
reason="Replaced by function `revoke_task`.",
category=RemovedInAirflow3Warning,
action="ignore",
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.

Tasks can get stuck in queued. If such a task is detected, it will be marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED`
if it doesn't.

:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError

def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.
Expand Down
32 changes: 1 addition & 31 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
import time
from collections import Counter, defaultdict, deque
from collections.abc import Collection, Iterable, Iterator
from contextlib import ExitStack, suppress
from contextlib import ExitStack
from datetime import date, timedelta
from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any, Callable

from deprecated import deprecated
from sqlalchemy import and_, delete, exists, func, select, text, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload
Expand All @@ -42,7 +41,6 @@
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_loader import ExecutorLoader
Expand Down Expand Up @@ -1727,12 +1725,6 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
)
session.commit()
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
# just continue immediately.
self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue

def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
Expand Down Expand Up @@ -1779,28 +1771,6 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
)
ti.set_state(TaskInstanceState.FAILED, session=session)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
category=RemovedInAirflow3Warning,
action="ignore",
)
def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
"""
Try to invoke stuck in queued cleanup for older executor interface.

TODO: remove in airflow 3.0

Here we handle case where the executor pre-dates the interface change that
introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.

"""
with suppress(NotImplementedError):
for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self.log.warning(
"Task instance %s stuck in queued. Will be set to failed.",
ti_repr,
)

def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
session.execute(
update(TI)
Expand Down
4 changes: 2 additions & 2 deletions clients/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,11 @@ that uses the API to run the tests. To do that, you need to:

```ini
[api]
auth_backend = airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
auth_backend = airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
```

You can also set it by env variable:
`export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth`
`export AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth`

* configure your airflow webserver to load example dags
In the `[core]` section of your `airflow.cfg` set:
Expand Down
2 changes: 1 addition & 1 deletion clients/python/test_python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# configured also with the basic_auth as backend additionally to regular session backend needed
# by the UI. In the `[api]` section of your `airflow.cfg` set:
#
# auth_backend = airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
# auth_backend = airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
#
# Make sure that your user/name are configured properly - using the user/password that has admin
# privileges in Airflow
Expand Down
1 change: 0 additions & 1 deletion contributing-docs/testing/unit_tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ Handling warnings
By default, in the new tests selected warnings are prohibited:

* ``airflow.exceptions.AirflowProviderDeprecationWarning``
* ``airflow.exceptions.RemovedInAirflow3Warning``

That mean if one of this warning appear during test run and do not captured the test will failed.

Expand Down
2 changes: 1 addition & 1 deletion dev/README_RELEASE_PYTHON_CLIENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ and allows you to test the client in a real environment.
variable in `files/airflow-breeze-config/init.sh`:

```shell
export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
```

Expand Down
1 change: 0 additions & 1 deletion docs/apache-airflow/core-concepts/executor/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ The following methods aren't required to override to have a functional Airflow e
* ``start``: The Airflow scheduler job will call this method after it initializes the executor object. Any additional setup required by the executor can be completed here.
* ``end``: The Airflow scheduler job will call this method as it is tearing down. Any synchronous cleanup required to finish running jobs should be done here.
* ``terminate``: More forcefully stop the executor, even killing/stopping in-flight tasks instead of synchronously waiting for completion.
* ``cleanup_stuck_queued_tasks``: If tasks are stuck in the queued state for longer than ``task_queued_timeout`` then they are collected by the scheduler and provided to the executor to have an opportunity to handle them (perform any graceful cleanup/teardown) via this method and return the Task Instances for a warning message displayed to users.
* ``try_adopt_task_instances``: Tasks that have been abandoned (e.g. from a scheduler job that died) are provided to the executor to adopt or otherwise handle them via this method. Any tasks that cannot be adopted (by default the BaseExecutor assumes all cannot be adopted) should be returned.
* ``get_cli_commands``: Executors may vend CLI commands to users by implementing this method, see the `CLI`_ section below for more details.
* ``get_task_log``: Executors may vend log messages to Airflow task logs by implementing this method, see the `Logging`_ section below for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: >-
airflow.providers.fab.auth_manager.api.auth.backend.basic_auth,airflow.api.auth.backend.session
airflow.providers.fab.auth_manager.api.auth.backend.basic_auth,airflow.providers.fab.auth_manager.api.auth.backend.session
AIRFLOW__WORKERS__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
Expand Down
16 changes: 16 additions & 0 deletions newsfragments/47264.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Removed leftover deprecations prior to 3.0.0.

* Removed the ``RemovedInAirflow3Warning`` warning class.
* Removed the deprecated module ``airflow.api.auth.backend.session``. Please use ``airflow.providers.fab.auth_manager.api.auth.backend.session`` instead.
* Removed the deprecated ``cleanup_stuck_queued_tasks`` method from the ``BaseExecutor`` interface. It is replaced by function ``revoke_task``.

* Types of change

* [ ] Dag changes
* [ ] Config changes
* [ ] API changes
* [ ] CLI changes
* [ ] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [x] Code interface changes
1 change: 0 additions & 1 deletion providers/edge/docs/edge_executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ before use. The following features have been initially tested and are working:
optimized for scalability. This will need to be considered in future releases. A dedicated performance
assessment is to be completed ensuring that in a hybrid setup other executors are not impacted before
version 1.0.0 is to be released.
- Stuck tasks in queue are not explicitly handled as ``cleanup_stuck_queued_tasks()`` is not implemented.


Architecture
Expand Down
2 changes: 1 addition & 1 deletion providers/fab/docs/auth-manager/api-authentication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ check the user session:
.. code-block:: ini

[api]
auth_backends = airflow.api.auth.backend.session
auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session

.. versionchanged:: 1.10.11

Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ filterwarnings = [
# Instead of that, we use a separate parameter and dynamically add it into `filterwarnings` marker.
# Add airflow.exceptions.RemovedInAirflow4Warning when min provider version for providers is 3.0
forbidden_warnings = [
"airflow.exceptions.RemovedInAirflow3Warning",
"airflow.exceptions.AirflowProviderDeprecationWarning",
]
python_files = [
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/pre_commit/check_deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@


allowed_warnings: dict[str, tuple[str, ...]] = {
"airflow": ("airflow.exceptions.RemovedInAirflow3Warning",),
"providers": ("airflow.exceptions.AirflowProviderDeprecationWarning",),
}
compatible_decorators: frozenset[tuple[str, ...]] = frozenset(
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/testing/summarize_captured_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"pydantic.warnings.PydanticDeprecatedSince20": "!!",
"celery.exceptions.CPendingDeprecationWarning": "!!",
"pytest.PytestWarning": "!!",
"airflow.exceptions.RemovedInAirflow3Warning": "!",
"airflow.exceptions.AirflowProviderDeprecationWarning": "!",
}
# Always print messages for these warning categories
Expand Down
2 changes: 1 addition & 1 deletion scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ function start_api_server_with_examples(){
return
fi
export AIRFLOW__CORE__LOAD_EXAMPLES=True
export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth
export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
echo
echo "${COLOR_BLUE}Initializing database${COLOR_RESET}"
Expand Down
8 changes: 0 additions & 8 deletions tests/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@ def example_not_excluded_dags(xfail_db_exception: bool = False):
f"Skipping {candidate} because providers are not included for {default_branch} branch."
)
continue
# Do not raise an error for airflow.exceptions.RemovedInAirflow3Warning.
# We should not rush to enforce new syntax updates in providers
# because a version of Airflow that deprecates certain features may not yet be released.
# Instead, it is advisable to periodically review the warning reports and implement manual
# updates as needed.
param_marks.append(
pytest.mark.filterwarnings("default::airflow.exceptions.RemovedInAirflow3Warning")
)
if candidate.endswith(IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING):
param_marks.append(
pytest.mark.filterwarnings(
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def test_auth_backends_adds_session(self):
"api": {
"auth_backends": (
re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
"airflow.api.auth.backend.session",
"airflow.providers.fab.auth_manager.api.auth.backend.session",
"3.0",
),
},
Expand Down
Loading