Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@
from typing import TYPE_CHECKING, Any

from airflow import settings
from airflow._shared.module_loading import entry_points_with_dist, import_string, qualname
from airflow._shared.module_loading import (
entry_points_with_dist,
find_path_from_directory,
import_string,
qualname,
)
from airflow.configuration import conf
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
)
from airflow.utils.file import find_path_from_directory

if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader
Expand Down Expand Up @@ -205,15 +209,16 @@ def _load_plugins_from_plugin_directory() -> tuple[list[AirflowPlugin], dict[str
if settings.PLUGINS_FOLDER is None:
raise ValueError("Plugins folder is not set")
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore", ignore_file_syntax)
plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]

if conf.getboolean("core", "LOAD_EXAMPLES"):
log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER)
from airflow.example_dags import plugins as example_plugins

example_plugins_folder = next(iter(example_plugins.__path__))
example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
example_files = find_path_from_directory(example_plugins_folder, ".airflowignore", ignore_file_syntax)
plugin_search_locations.append((example_plugins.__name__, example_files))

plugins: list[AirflowPlugin] = []
Expand Down
193 changes: 23 additions & 170 deletions airflow-core/src/airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
from collections.abc import Generator
from io import TextIOWrapper
from pathlib import Path
from re import Pattern
from typing import NamedTuple, Protocol, overload

from pathspec.patterns import GitWildMatchPattern
from typing import overload

from airflow.configuration import conf

Expand All @@ -38,93 +35,6 @@
MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"


class _IgnoreRule(Protocol):
"""Interface for ignore rules for structural subtyping."""

@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""
Build an ignore rule from the supplied pattern.

``base_dir`` and ``definition_file`` should be absolute paths.
"""

@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a candidate absolute path against a list of rules."""


class _RegexpIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for regexp ignore rules."""

pattern: Pattern
base_dir: Path

@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
try:
return _RegexpIgnoreRule(re.compile(pattern), base_dir)
except re.error as e:
log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
return None

@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path."""
for rule in rules:
if not isinstance(rule, _RegexpIgnoreRule):
raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
return True
return False


class _GlobIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for glob ignore rules."""

wild_match_pattern: GitWildMatchPattern
relative_to: Path | None = None

@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
relative_to: Path | None = None
if pattern.strip() == "/":
# "/" doesn't match anything in gitignore
log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
return None
if pattern.startswith("/") or "/" in pattern.rstrip("/"):
# See https://git-scm.com/docs/gitignore
# > If there is a separator at the beginning or middle (or both) of the pattern, then the
# > pattern is relative to the directory level of the particular .gitignore file itself.
# > Otherwise the pattern may also match at any level below the .gitignore level.
relative_to = definition_file.parent

ignore_pattern = GitWildMatchPattern(pattern)
return _GlobIgnoreRule(wild_match_pattern=ignore_pattern, relative_to=relative_to)

@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path, accounting for exclusion rules and ordering."""
matched = False
for rule in rules:
if not isinstance(rule, _GlobIgnoreRule):
raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(rule)}")
rel_obj = path.relative_to(rule.relative_to) if rule.relative_to else Path(path.name)
if path.is_dir():
rel_path = f"{rel_obj.as_posix()}/"
else:
rel_path = rel_obj.as_posix()
if (
rule.wild_match_pattern.include is not None
and rule.wild_match_pattern.match_file(rel_path) is not None
):
matched = rule.wild_match_pattern.include

return matched


ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")


Expand Down Expand Up @@ -164,84 +74,6 @@ def open_maybe_zipped(fileloc, mode="r"):
return open(fileloc, mode=mode)


def _find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
ignore_rule_type: type[_IgnoreRule],
) -> Generator[str, None, None]:
"""
Recursively search the base path and return the list of file paths that should not be ignored.

:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name containing regular expressions for files that should be ignored.
:param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.

:return: a generator of file paths which should not be ignored.
"""
# A Dict of patterns, keyed using resolved, absolute paths
patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}

for root, dirs, files in os.walk(base_dir_path, followlinks=True):
patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])

ignore_file_path = Path(root) / ignore_file_name
if ignore_file_path.is_file():
with open(ignore_file_path) as ifile:
patterns_to_match_excluding_comments = [
re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")
]
# append new patterns and filter out "None" objects, which are invalid patterns
patterns += [
p
for p in [
ignore_rule_type.compile(pattern, Path(base_dir_path), ignore_file_path)
for pattern in patterns_to_match_excluding_comments
if pattern
]
if p is not None
]
# evaluation order of patterns is important with negation
# so that later patterns can override earlier patterns

dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
# explicit loop for infinite recursion detection since we are following symlinks in this walk
for sd in dirs:
dirpath = (Path(root) / sd).resolve()
if dirpath in patterns_by_dir:
raise RuntimeError(
"Detected recursive loop when walking DAG directory "
f"{base_dir_path}: {dirpath} has appeared more than once."
)
patterns_by_dir.update({dirpath: patterns.copy()})

for file in files:
if file != ignore_file_name:
abs_file_path = Path(root) / file
if not ignore_rule_type.match(abs_file_path, patterns):
yield str(abs_file_path)


def find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob"),
) -> Generator[str, None, None]:
"""
Recursively search the base path for a list of file paths that should not be ignored.

:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
:param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob

:return: a generator of file paths.
"""
if ignore_file_syntax == "glob" or not ignore_file_syntax:
return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
if ignore_file_syntax == "regexp":
return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")


def list_py_file_paths(
directory: str | os.PathLike[str] | None,
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
Expand All @@ -268,9 +100,12 @@ def list_py_file_paths(

def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
"""Find file paths of all DAG files."""
from airflow._shared.module_loading.file_discovery import find_path_from_directory

file_paths = []
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")

for file_path in find_path_from_directory(directory, ".airflowignore"):
for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
path = Path(file_path)
try:
if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
Expand Down Expand Up @@ -353,3 +188,21 @@ def get_unique_dag_module_name(file_path: str) -> str:
org_mod_name = re.sub(r"[.-]", "_", Path(file_path).stem)
return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
raise ValueError("file_path should be a string to generate unique module name")


def __getattr__(name: str):
if name == "find_path_from_directory":
import warnings

from airflow._shared.module_loading import find_path_from_directory
from airflow.utils.deprecation_tools import DeprecatedImportWarning

warnings.warn(
"Importing find_path_from_directory from airflow.utils.file is deprecated "
"and will be removed in a future version. "
"Use airflow._shared.module_loading.find_path_from_directory instead.",
DeprecatedImportWarning,
stacklevel=2,
)
return find_path_from_directory
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/plugins/test_plugin_ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from unittest.mock import patch

from airflow import settings
from airflow.utils.file import find_path_from_directory
from airflow._shared.module_loading import find_path_from_directory


def populate_dir(root_path):
Expand Down
Loading
Loading