Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
abafe26
Move over plugins manager to a shared library
amoghrajesh Dec 31, 2025
0847f04
remove duplicates from core
amoghrajesh Dec 31, 2025
83a44c5
sdk wrapper for plugins manager
amoghrajesh Dec 31, 2025
d8d3a25
rewrite integrate-* functions to accept parameters
amoghrajesh Dec 31, 2025
6dc25ab
comsume integrate functions in core from shared
amoghrajesh Dec 31, 2025
10073c8
comsume integrate functions in sdk from shared
amoghrajesh Dec 31, 2025
d940997
eliminate settings and conf imports in shared library
amoghrajesh Dec 31, 2025
6a4b1a4
updating wrappers in core and sdk
amoghrajesh Dec 31, 2025
9a85829
comment about module_loading
amoghrajesh Dec 31, 2025
d7e21b1
re-export to support usages
amoghrajesh Dec 31, 2025
306360b
porting some tests over
amoghrajesh Dec 31, 2025
391413e
porting some tests over
amoghrajesh Dec 31, 2025
9f188e2
porting some tests over
amoghrajesh Dec 31, 2025
e03af9a
add method tools as dependency for plugins manager
amoghrajesh Dec 31, 2025
e99e60c
add pendulum
amoghrajesh Dec 31, 2025
e52b495
adding importlib metadata
amoghrajesh Dec 31, 2025
77d7bba
adding to compat sdk and use references in providers from compat sdk …
amoghrajesh Dec 31, 2025
f344a3f
Merge branch 'main' into move-plugins-manager-to-shared
amoghrajesh Jan 3, 2026
47161f8
getting rid of one airflow dependency
amoghrajesh Jan 3, 2026
56e9387
Merge branch 'main' into move-plugins-manager-to-shared
amoghrajesh Jan 5, 2026
0ae38db
inject another dependency
amoghrajesh Jan 5, 2026
14c13ca
fixing failing tests
amoghrajesh Jan 5, 2026
d61f3ee
re export for shorter paths
amoghrajesh Jan 5, 2026
8272fb0
comments from jarek
amoghrajesh Jan 5, 2026
d00baf1
better docstring
amoghrajesh Jan 5, 2026
454a75b
better docstring
amoghrajesh Jan 5, 2026
77bad06
fixing failing tests
amoghrajesh Jan 5, 2026
ad26e35
fixing failing tests
amoghrajesh Jan 5, 2026
9db1b38
fixing failing tests
amoghrajesh Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ exclude = [
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/_shared/plugins_manager"

[tool.hatch.build.targets.custom]
path = "./hatch_build.py"
Expand Down Expand Up @@ -306,4 +307,5 @@ shared_distributions = [
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
"apache-airflow-shared-timezones",
"apache-airflow-shared-plugins-manager",
]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/plugins_manager
270 changes: 32 additions & 238 deletions airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@

from __future__ import annotations

import importlib.machinery
import importlib.util
import inspect
import logging
import os
import sys
import types
from collections.abc import Iterable
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any

from airflow import settings
from airflow._shared.module_loading import (
entry_points_with_dist,
find_path_from_directory,
import_string,
qualname,
)
from airflow._shared.plugins_manager import (
AirflowPlugin,
AirflowPluginSource as AirflowPluginSource,
PluginsDirectorySource as PluginsDirectorySource,
_load_entrypoint_plugins,
_load_plugins_from_plugin_directory,
is_valid_plugin,
)
from airflow.configuration import conf
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
Expand All @@ -46,210 +46,12 @@

if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader

if sys.version_info >= (3, 12):
from importlib import metadata
else:
import importlib_metadata as metadata
from collections.abc import Generator
from types import ModuleType

from airflow.listeners.listener import ListenerManager
from airflow.timetables.base import Timetable

log = logging.getLogger(__name__)


class AirflowPluginSource:
"""Class used to define an AirflowPluginSource."""

def __str__(self):
raise NotImplementedError

def __html__(self):
raise NotImplementedError


class PluginsDirectorySource(AirflowPluginSource):
"""Class used to define Plugins loaded from Plugins Directory."""

def __init__(self, path):
self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)

def __str__(self):
return f"$PLUGINS_FOLDER/{self.path}"

def __html__(self):
return f"<em>$PLUGINS_FOLDER/</em>{self.path}"


class EntryPointSource(AirflowPluginSource):
"""Class used to define Plugins loaded from entrypoint."""

def __init__(self, entrypoint: metadata.EntryPoint, dist: metadata.Distribution):
self.dist = dist.metadata["Name"] # type: ignore[index]
self.version = dist.version
self.entrypoint = str(entrypoint)

def __str__(self):
return f"{self.dist}=={self.version}: {self.entrypoint}"

def __html__(self):
return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"


class AirflowPluginException(Exception):
"""Exception when loading plugin."""


class AirflowPlugin:
"""Class used to define AirflowPlugin."""

name: str | None = None
source: AirflowPluginSource | None = None
macros: list[Any] = []
admin_views: list[Any] = []
flask_blueprints: list[Any] = []
fastapi_apps: list[Any] = []
fastapi_root_middlewares: list[Any] = []
external_views: list[Any] = []
react_apps: list[Any] = []
menu_links: list[Any] = []
appbuilder_views: list[Any] = []
appbuilder_menu_items: list[Any] = []

# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links: list[Any] = []

# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links: list[Any] = []

# A list of timetable classes that can be used for DAG scheduling.
timetables: list[type[Timetable]] = []

# A list of listeners that can be used for tracking task and DAG states.
listeners: list[ModuleType | object] = []

# A list of hook lineage reader classes that can be used for reading lineage information from a hook.
hook_lineage_readers: list[type[HookLineageReader]] = []

# A list of priority weight strategy classes that can be used for calculating tasks weight priority.
priority_weight_strategies: list[type[PriorityWeightStrategy]] = []

@classmethod
def validate(cls):
"""Validate if plugin has a name."""
if not cls.name:
raise AirflowPluginException("Your plugin needs a name.")

@classmethod
def on_load(cls, *args, **kwargs):
"""
Execute when the plugin is loaded; This method is only called once during runtime.

:param args: If future arguments are passed in on call.
:param kwargs: If future arguments are passed in on call.
"""


def is_valid_plugin(plugin_obj) -> bool:
"""
Check whether a potential object is a subclass of the AirflowPlugin class.

:param plugin_obj: potential subclass of AirflowPlugin
:return: Whether or not the obj is a valid subclass of
AirflowPlugin
"""
if (
inspect.isclass(plugin_obj)
and issubclass(plugin_obj, AirflowPlugin)
and (plugin_obj is not AirflowPlugin)
):
plugin_obj.validate()
return True
return False


def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
"""
Load and register plugins AirflowPlugin subclasses from the entrypoints.

The entry_point group should be 'airflow.plugins'.
"""
log.debug("Loading plugins from entrypoints")

plugins: list[AirflowPlugin] = []
import_errors: dict[str, str] = {}
for entry_point, dist in entry_points_with_dist("airflow.plugins"):
log.debug("Importing entry_point plugin %s", entry_point.name)
try:
plugin_class = entry_point.load()
if not is_valid_plugin(plugin_class):
continue

plugin_instance: AirflowPlugin = plugin_class()
plugin_instance.source = EntryPointSource(entry_point, dist)
plugins.append(plugin_instance)
except Exception as e:
log.exception("Failed to import plugin %s", entry_point.name)
import_errors[entry_point.module] = str(e)
return plugins, import_errors


def _load_plugins_from_plugin_directory() -> tuple[list[AirflowPlugin], dict[str, str]]:
"""Load and register Airflow Plugins from plugins directory."""
if settings.PLUGINS_FOLDER is None:
raise ValueError("Plugins folder is not set")
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore", ignore_file_syntax)
plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]

if conf.getboolean("core", "LOAD_EXAMPLES"):
log.debug("Note: Loading plugins from examples as well: %s", settings.PLUGINS_FOLDER)
from airflow.example_dags import plugins as example_plugins

example_plugins_folder = next(iter(example_plugins.__path__))
example_files = find_path_from_directory(example_plugins_folder, ".airflowignore", ignore_file_syntax)
plugin_search_locations.append((example_plugins.__name__, example_files))

plugins: list[AirflowPlugin] = []
import_errors: dict[str, str] = {}
for module_prefix, plugin_files in plugin_search_locations:
for file_path in plugin_files:
path = Path(file_path)
if not path.is_file() or path.suffix != ".py":
continue
mod_name = f"{module_prefix}.{path.stem}" if module_prefix else path.stem

try:
loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
spec = importlib.util.spec_from_loader(mod_name, loader)
if not spec:
log.error("Could not load spec for module %s at %s", mod_name, file_path)
continue
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
loader.exec_module(mod)

for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
plugin_instance: AirflowPlugin = mod_attr_value()
plugin_instance.source = PluginsDirectorySource(file_path)
plugins.append(plugin_instance)
except Exception as e:
log.exception("Failed to import plugin %s", file_path)
import_errors[file_path] = str(e)
return plugins, import_errors


def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
from airflow.providers_manager import ProvidersManager

Expand All @@ -273,19 +75,6 @@ def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
return plugins, import_errors


def make_module(name: str, objects: list[Any]) -> ModuleType | None:
"""Create new module."""
if not objects:
return None
log.debug("Creating module %s", name)
name = name.lower()
module = types.ModuleType(name)
module._name = name.split(".")[-1] # type: ignore
module._objects = objects # type: ignore
module.__dict__.update((o.__name__, o) for o in objects)
return module


def ensure_plugins_loaded() -> None:
"""
Load plugins from plugins directory and entrypoints.
Expand Down Expand Up @@ -329,7 +118,16 @@ def __register_plugins(plugin_instances: list[AirflowPlugin], errors: dict[str,
import_errors.update(errors)

with Stats.timer() as timer:
__register_plugins(*_load_plugins_from_plugin_directory())
load_examples = conf.getboolean("core", "LOAD_EXAMPLES")
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
__register_plugins(
*_load_plugins_from_plugin_directory(
plugins_folder=settings.PLUGINS_FOLDER,
load_examples=load_examples,
example_plugins_module="airflow.example_dags.plugins" if load_examples else None,
ignore_file_syntax=ignore_file_syntax,
)
)
__register_plugins(*_load_entrypoint_plugins())

if not settings.LAZY_LOAD_PROVIDERS:
Expand Down Expand Up @@ -492,31 +290,27 @@ def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
@cache
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
from airflow._shared.plugins_manager import (
integrate_macros_plugins as _integrate_macros_plugins,
)
from airflow.sdk.execution_time import macros

log.debug("Integrate Macros plugins")

for plugin in _get_plugins()[0]:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")

macros_module = make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros)

if macros_module:
sys.modules[macros_module.__name__] = macros_module
# Register the newly created module on airflow.macros such that it
# can be accessed when rendering templates.
setattr(macros, plugin.name, macros_module)
plugins, _ = _get_plugins()
_integrate_macros_plugins(
target_macros_module=macros,
macros_module_name_prefix="airflow.sdk.execution_time.macros",
plugins=plugins,
)


def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
"""Add listeners from plugins."""
for plugin in _get_plugins()[0]:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")
from airflow._shared.plugins_manager import (
integrate_listener_plugins as _integrate_listener_plugins,
)

for listener in plugin.listeners:
listener_manager.add_listener(listener)
plugins, _ = _get_plugins()
_integrate_listener_plugins(listener_manager, plugins=plugins)


def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
Expand Down
Loading
Loading