Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from __future__ import annotations

import pendulum
from workday import AfterWorkdayTimetable

from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator

Expand Down
52 changes: 32 additions & 20 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import TYPE_CHECKING, Any, Iterable

from airflow import settings
from airflow.configuration import conf
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
Expand All @@ -45,6 +46,7 @@
except ImportError:
from importlib import metadata # type: ignore[no-redef]
from types import ModuleType
from typing import Generator

from airflow.hooks.base import BaseHook
from airflow.listeners.listener import ListenerManager
Expand Down Expand Up @@ -262,28 +264,38 @@ def load_plugins_from_plugin_directory():
"""Load and register Airflow Plugins from plugins directory."""
global import_errors
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]

for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"):
path = Path(file_path)
if not path.is_file() or path.suffix != ".py":
continue
mod_name = path.stem
if conf.getboolean("core", "LOAD_EXAMPLES"):
log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER)
from airflow.example_dags import plugins

try:
loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
spec = importlib.util.spec_from_loader(mod_name, loader)
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
loader.exec_module(mod)
log.debug("Importing plugin module %s", file_path)

for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
plugin_instance = mod_attr_value()
plugin_instance.source = PluginsDirectorySource(file_path)
register_plugin(plugin_instance)
except Exception as e:
log.exception("Failed to import plugin %s", file_path)
import_errors[file_path] = str(e)
example_plugins_folder = next(iter(plugins.__path__))
example_files = find_path_from_directory(example_plugins_folder, ".airflowignore")
plugin_search_locations.append((plugins.__name__, example_files))

for module_prefix, plugin_files in plugin_search_locations:
for file_path in plugin_files:
path = Path(file_path)
if not path.is_file() or path.suffix != ".py":
continue
mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem

try:
loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
spec = importlib.util.spec_from_loader(mod_name, loader)
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
loader.exec_module(mod)

for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
plugin_instance = mod_attr_value()
plugin_instance.source = PluginsDirectorySource(file_path)
register_plugin(plugin_instance)
except Exception as e:
log.exception("Failed to import plugin %s", file_path)
import_errors[file_path] = str(e)


def load_providers_plugins():
Expand Down
18 changes: 10 additions & 8 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,9 @@ def test_cli_backfill_depends_on_past_run_backwards(self, mock_run, cli_arg: str
disable_retry=False,
)

@mock.patch("workday.AfterWorkdayTimetable.get_next_workday")
@mock.patch("airflow.models.taskinstance.TaskInstance.dry_run")
@mock.patch("airflow.cli.commands.dag_command.DagRun")
def test_backfill_with_custom_timetable(self, mock_dagrun, mock_dry_run, mock_get_next_workday):
def test_backfill_with_custom_timetable(self, mock_dagrun, mock_dry_run):
"""
when calling `dags backfill` on dag with custom timetable, the DagRun object should be created with
data_intervals.
Expand All @@ -441,8 +440,6 @@ def test_backfill_with_custom_timetable(self, mock_dagrun, mock_dry_run, mock_ge
start_date + timedelta(days=1),
start_date + timedelta(days=2),
]
mock_get_next_workday.side_effect = workdays

cli_args = self.parser.parse_args(
[
"dags",
Expand All @@ -455,7 +452,10 @@ def test_backfill_with_custom_timetable(self, mock_dagrun, mock_dry_run, mock_ge
"--dry-run",
]
)
dag_command.dag_backfill(cli_args)
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable

with mock.patch.object(AfterWorkdayTimetable, "get_next_workday", side_effect=workdays):
dag_command.dag_backfill(cli_args)
assert "data_interval" in mock_dagrun.call_args.kwargs

def test_next_execution(self, tmp_path):
Expand Down Expand Up @@ -979,17 +979,19 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_render_dag):
mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value, tis=[])])
assert "SOURCE" in output

@mock.patch("workday.AfterWorkdayTimetable", side_effect=lambda: mock.MagicMock(active_runs_limit=None))
@mock.patch("airflow.models.dag._get_or_create_dagrun")
def test_dag_test_with_custom_timetable(self, mock__get_or_create_dagrun, _):
def test_dag_test_with_custom_timetable(self, mock__get_or_create_dagrun):
"""
when calling `dags test` on dag with custom timetable, the DagRun object should be created with
data_intervals.
"""
cli_args = self.parser.parse_args(
["dags", "test", "example_workday_timetable", DEFAULT_DATE.isoformat()]
)
dag_command.dag_test(cli_args)
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable

with mock.patch.object(AfterWorkdayTimetable, "get_next_workday", side_effect=[DEFAULT_DATE]):
dag_command.dag_test(cli_args)
assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs

@mock.patch("airflow.models.dag._get_or_create_dagrun")
Expand Down
5 changes: 3 additions & 2 deletions tests/plugins/test_plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_loads_filesystem_plugins(self, caplog):
with mock.patch("airflow.plugins_manager.plugins", []):
plugins_manager.load_plugins_from_plugin_directory()

assert 7 == len(plugins_manager.plugins)
assert len(plugins_manager.plugins) == 9
for plugin in plugins_manager.plugins:
if "AirflowTestOnLoadPlugin" in str(plugin):
assert "postload" == plugin.name
Expand All @@ -226,7 +226,7 @@ def test_loads_filesystem_plugins_exception(self, caplog, tmp_path):
with conf_vars({("core", "plugins_folder"): os.fspath(tmp_path)}):
plugins_manager.load_plugins_from_plugin_directory()

assert plugins_manager.plugins == []
assert len(plugins_manager.plugins) == 3 # three are loaded from examples

received_logs = caplog.text
assert "Failed to import plugin" in received_logs
Expand Down Expand Up @@ -386,6 +386,7 @@ def test_registering_plugin_listeners(self):
listener_names = [el.__name__ if inspect.ismodule(el) else qualname(el) for el in listeners]
# sort names as order of listeners is not guaranteed
assert [
"airflow.example_dags.plugins.event_listener",
"tests.listeners.class_listener.ClassBasedListener",
"tests.listeners.empty_listener",
] == sorted(listener_names)
Expand Down
28 changes: 0 additions & 28 deletions tests/plugins/workday.py

This file was deleted.