Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented May 15, 2025

closes: #48476

What the issue is?

Airflow is supposed to support macros -- both present internally like: uuid, datetime.now, ds_add etc in conjunction with the user defined macros that are registered via plugins.

Check references here: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html and a usage example here: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/plugins.html#example.

Why is it important?

Airflow 2 supported this feature and wouldve built tons of dags around it which would break for those users during if not this is not supported in airflow 3 and would serve as a blocker for those users.

Some examples of callouts: https://apache-airflow.slack.com/archives/CCZRF2U5A/p1746707742015169 which we referred to this issue: #48476.

Approach

Using the older approach itself where we have a utility: integrate_macros_plugins present that registers the user defined and global macros with airflow core by making the module by users under: airflow.sdk.definitions and it can be accessed using MacrosAccessor in the context:

  • The macros module in integrate_macros_plugins has been updated to refer to the macros module in airflow.sdk.definitions instead of the older one: airflow.macros

Testing

Steps

1. Creating few macros:

  • emojiify_macro that takes some text and converts it into an emoji and returns
  • support_ticket_macro returns a support ticket link based on dag id and task id

2. Define these in an airflow plugin:

from airflow.plugins_manager import AirflowPlugin

def emojiify_macro(text):
    return f"🎉 {text} 🚀"

def support_ticket_macro(dag_id, task_id):
    base_url = "https://support.mycompany.com/tickets"
    ticket_id = f"{dag_id}:{task_id}"
    return f"{base_url}/{ticket_id}"

class MyMacroPlugin(AirflowPlugin):
    name = "amogh_macro_plugin"

    macros = [
        emojiify_macro,
        support_ticket_macro,
    ]

3. Ensure that the plugin has been loaded:

image

4. Now check if the macros are loaded in that plugin:

Run airflow plugin in CLI:

root@f3dfe327e53c:/opt/airflow# airflow plugins
                   | operator_extra_li |                   |                    |
name               | nks               | macros            | appbuilder_views   | source
===================+===================+===================+====================+===================
amogh_macro_plugin |                   | macros_plugin.emo |                    | $PLUGINS_FOLDER/ma
                   |                   | jiify_macro,macro |                    | cros_plugin.py
                   |                   | s_plugin.support_ |                    |
                   |                   | ticket_macro      |                    |

5. Define dags that can check the plugins out

DAG:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def print_macro_examples(**context):
    emoji_message = context["templates_dict"]["emoji_message"]
    ticket_url = context["templates_dict"]["ticket_url"]

    print(emoji_message)
    print(f"If something fails, open a support ticket at: {ticket_url}")

with DAG(
    dag_id="amogh-plugin-dag",
    schedule=None,
    catchup=False,
) as dag:
    task_use_macros = PythonOperator(
        task_id="use_plugin_macros",
        python_callable=print_macro_examples,
        templates_dict={
            "emoji_message": "{{ macros.amogh_macro_plugin.emojiify_macro('The task: `use_plugin_macros` is running!') }}",
            "ticket_url": "{{ macros.amogh_macro_plugin.support_ticket_macro(dag_id='amogh-plugin-dag', task_id='use_plugin_macros') }}",
        },
    )


Result:
image

Logs:

Log message source details: sources=["/root/airflow/logs/dag_id=amogh-plugin-dag/run_id=manual__2025-05-15T08:37:03.977081+00:00/task_id=use_plugin_macros/attempt=1.log"]
[[2](http://localhost:28080/dags/amogh-plugin-dag/runs/manual__2025-05-15T08:37:03.977081+00:00/tasks/use_plugin_macros?try_number=1#2)025-05-15, 14:07:04] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-15, 14:07:04] INFO - Filling up the DagBag from /files/dags/dags/amogh-plugin-test.py: source="airflow.models.dagbag.DagBag"
[2025-05-15, 1[4](http://localhost:28080/dags/amogh-plugin-dag/runs/manual__2025-05-15T08:37:03.977081+00:00/tasks/use_plugin_macros?try_number=1#4):07:04] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-1[5](http://localhost:28080/dags/amogh-plugin-dag/runs/manual__2025-05-15T08:37:03.977081+00:00/tasks/use_plugin_macros?try_number=1#5), 14:07:05] INFO - 🎉 The task: `use_plugin_macros` is running! 🚀: chan="stdout": source="task"
[2025-05-15, 14:07:05] INFO - If something fails, open a support ticket at: https://support.mycompany.com/tickets/amogh-plugin-dag:use_plugin_macros: chan="stdout": source="task"

TODO:

  • Write some tests supporting this change in task runner

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

Working on the test failures

@amoghrajesh amoghrajesh added this to the Airflow 3.0.2 milestone May 16, 2025
Copy link
Contributor

@zach-overflow zach-overflow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amoghrajesh, I'm excited to see this! One small suggestion: It probably would make sense to remove the doc warning I added in https://github.com/apache/airflow/pull/50357/files

@amoghrajesh amoghrajesh self-assigned this May 17, 2025
@amoghrajesh
Copy link
Contributor Author

Thanks @zach-overflow! I will revert that off once I land this one

@amoghrajesh amoghrajesh changed the title Support user-defined macros via plugins in Airflow 3 Support macros defined via plugins in Airflow 3 May 19, 2025
@amoghrajesh amoghrajesh merged commit f008411 into apache:main May 19, 2025
71 checks passed
@amoghrajesh amoghrajesh deleted the integrate-user-defined-macros branch May 19, 2025 14:56
amoghrajesh added a commit that referenced this pull request May 20, 2025
(cherry picked from commit f008411)

Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
potiuk pushed a commit that referenced this pull request May 21, 2025
dadonnelly316 pushed a commit to dadonnelly316/airflow that referenced this pull request May 26, 2025
kaxil pushed a commit that referenced this pull request Jun 3, 2025
sanederchik pushed a commit to sanederchik/airflow that referenced this pull request Jun 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement user defined macros registered via plugin in task SDK

4 participants