diff --git a/airflow/compat/asyncio.py b/airflow/compat/asyncio.py deleted file mode 100644 index a999ce604d268c..00000000000000 --- a/airflow/compat/asyncio.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -try: - from asyncio import create_task -except ImportError: - # create_task is not present in Python 3.6. Once Airflow is at 3.7+, we can - # remove this helper. - def create_task(*args, **kwargs): # type: ignore - """A version of create_task that always errors.""" - raise RuntimeError("Airflow's async functionality is only available on Python 3.7+") - - -__all__ = ["create_task"] diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py index 2c0a0bd5bffeed..ac7d22a6b1da93 100644 --- a/airflow/jobs/triggerer_job.py +++ b/airflow/jobs/triggerer_job.py @@ -26,7 +26,6 @@ from sqlalchemy import func -from airflow.compat.asyncio import create_task from airflow.configuration import conf from airflow.jobs.base_job import BaseJob from airflow.models.trigger import Trigger @@ -236,7 +235,7 @@ async def arun(self): The loop in here runs trigger addition/deletion/cleanup. Actual triggers run in their own separate coroutines. """ - watchdog = create_task(self.block_watchdog()) + watchdog = asyncio.create_task(self.block_watchdog()) last_status = time.time() while not self.stop: # Run core logic @@ -263,7 +262,7 @@ async def create_triggers(self): trigger_id, trigger_instance = self.to_create.popleft() if trigger_id not in self.triggers: self.triggers[trigger_id] = { - "task": create_task(self.run_trigger(trigger_id, trigger_instance)), + "task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)), "name": f"{trigger_instance!r} (ID {trigger_id})", "events": 0, } diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 823287dcb18568..4370f36c3af63c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -78,7 +78,7 @@ from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable from airflow.timetables.simple import NullTimetable, OnceTimetable -from airflow.typing_compat import Literal, RePatternType +from airflow.typing_compat import Literal from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle from airflow.utils.dates import cron_presets, date_range as utils_date_range @@ -1998,7 +1998,7 @@ def sub_dag(self, *args, **kwargs): def partial_subset( self, - task_ids_or_regex: Union[str, RePatternType, Iterable[str]], + task_ids_or_regex: Union[str, re.Pattern, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False, @@ -2026,7 +2026,7 @@ def partial_subset( memo = {id(self.task_dict): None, id(self._task_group): None} dag = copy.deepcopy(self, memo) # type: ignore - if isinstance(task_ids_or_regex, (str, RePatternType)): + if isinstance(task_ids_or_regex, (str, re.Pattern)): matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)] else: matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex] diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py index 9f1185f76c2959..163889b8a2975d 100644 --- a/airflow/typing_compat.py +++ b/airflow/typing_compat.py @@ -28,12 +28,3 @@ from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore except ImportError: from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable # type: ignore # noqa - - -# Before Py 3.7, there is no re.Pattern class -try: - from re import Pattern as RePatternType # type: ignore -except ImportError: - import re - - RePatternType = type(re.compile('', 0)) # type: ignore diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py index c4c1c390b2cc74..de038be48b1adb 100644 --- a/airflow/utils/log/secrets_masker.py +++ b/airflow/utils/log/secrets_masker.py @@ -24,8 +24,6 @@ from airflow.compat.functools import cache, cached_property if TYPE_CHECKING: - from airflow.typing_compat import RePatternType - RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]] @@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker": class SecretsMasker(logging.Filter): """Redact secrets from logs""" - replacer: Optional["RePatternType"] = None + replacer: Optional[re.Pattern] = None patterns: Set[str] ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 8f2f222d71b499..1f22694faea381 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -21,7 +21,6 @@ import pendulum import pytest -from airflow.compat.asyncio import create_task from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone @@ -72,7 +71,7 @@ async def test_datetime_trigger_timing(): # Create a task that runs the trigger for a short time then cancels it trigger = DateTimeTrigger(future_moment) - trigger_task = create_task(trigger.run().__anext__()) + trigger_task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) # It should not have produced a result @@ -81,7 +80,7 @@ async def test_datetime_trigger_timing(): # Now, make one waiting for en event in the past and do it again trigger = DateTimeTrigger(past_moment) - trigger_task = create_task(trigger.run().__anext__()) + trigger_task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert trigger_task.done() is True