Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow-core/docs/administration-and-deployment/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ definitions in Airflow.
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
# Will show up in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def integrate_macros_plugins() -> None:
global plugins
global macros_modules

from airflow import macros
from airflow.sdk.definitions import macros

if macros_modules is not None:
return
Expand All @@ -525,7 +525,7 @@ def integrate_macros_plugins() -> None:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")

macros_module = make_module(f"airflow.macros.{plugin.name}", plugin.macros)
macros_module = make_module(f"airflow.sdk.definitions.macros.{plugin.name}", plugin.macros)

if macros_module:
macros_modules.append(macros_module)
Expand Down
15 changes: 8 additions & 7 deletions airflow-core/tests/unit/plugins/test_plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,18 @@ def test_registering_plugin_macros(self, request):
"""
Tests whether macros that originate from plugins are being registered correctly.
"""
from airflow import macros
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sdk.definitions import macros

def cleanup_macros():
"""Reloads the airflow.macros module such that the symbol table is reset after the test."""
"""Reloads the macros module such that the symbol table is reset after the test."""
# We're explicitly deleting the module from sys.modules and importing it again
# using import_module() as opposed to using importlib.reload() because the latter
# does not undo the changes to the airflow.macros module that are being caused by
# does not undo the changes to the airflow.sdk.definitions.macros module that are being caused by
# invoking integrate_macros_plugins()
del sys.modules["airflow.macros"]
importlib.import_module("airflow.macros")

del sys.modules["airflow.sdk.definitions.macros"]
importlib.import_module("airflow.sdk.definitions.macros")

request.addfinalizer(cleanup_macros)

Expand All @@ -253,12 +254,12 @@ class MacroPlugin(AirflowPlugin):
# Ensure the macros for the plugin have been integrated.
integrate_macros_plugins()
# Test whether the modules have been created as expected.
plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}")
plugin_macros = importlib.import_module(f"airflow.sdk.definitions.macros.{MacroPlugin.name}")
for macro in MacroPlugin.macros:
# Verify that the macros added by the plugin are being set correctly
# on the plugin's macro module.
assert hasattr(plugin_macros, macro.__name__)
# Verify that the symbol table in airflow.macros has been updated with an entry for
# Verify that the symbol table in airflow.sdk.definitions.macros has been updated with an entry for
# this plugin, this is necessary in order to allow the plugin's macros to be used when
# rendering templates.
assert hasattr(macros, MacroPlugin.name)
Expand Down
3 changes: 3 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def __rich_repr__(self):
def get_template_context(self) -> Context:
# TODO: Move this to `airflow.sdk.execution_time.context`
# once we port the entire context logic from airflow/utils/context.py ?
from airflow.plugins_manager import integrate_macros_plugins

integrate_macros_plugins()

dag_run_conf: dict[str, Any] | None = None
if from_server := self._ti_context_from_server:
Expand Down