Skip to content

[Feature] Resolve (some) templated fields during DAG Parsing #1289

Open
@tatiana

Description

Description

Allow users to configure DbtDag/DbtTaskGroup selectors via the Airflow UI (https://astronomer.github.io/astronomer-cosmos/configuration/selecting-excluding.html#selecting-excluding) and render the DAG based on this.

If we find a good solution for this, we could potentially extend it to other templated fields.

Use case/motivation

This request has popped up a few times, including by Emanuele in the #airflow-dbt channel on 29 October 2024:

Hello everyone! 🌞
I am currently testing Cosmos on Airflow and I am having serious issues using the select parameter. What I'd like to is to be able, from the Airflow UI, to run the DBT DAG with a specific select param so that only some models are run. Does someone have a working example that they could share?
The docs say that the select template field cannot be templated via DbtDag and DbtTaskGroup because both need to select dbt nodes during DAG parsing. (Docs) so I am not really sure how to do this.
Thanks a lot for the help!

However, I believe we didn't log this request - yet.

Possible solution

We could try to do something similar to the following, but this will likely mean the DAG processor to make additional calls to the database. Would love thoughts and other ideas.

class TemplatedDAG(DAG):
    template_fields = ['dag_param']  # Specify fields that should be templated

    def __init__(self, *args, dag_param=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.dag_param = dag_param
        self.parse_dag_template_fields()

    def parse_dag_template_fields(self):
        """Render template fields for the DAG itself at parse time."""
        # Use a context for parsing, including DAG-level parameters and Airflow macros
        context = {
            'ds': '{{ ds }}',  # Default Airflow macros for example
            'execution_date': days_ago(1),  # Can be any default date
        }
        
        # Render each template field defined in `template_fields`
        for field in self.template_fields:
            value = getattr(self, field, None)
            if isinstance(value, str):  # Ensure the field is a string (template) before rendering
                rendered_value = Template(value).render(context)
                setattr(self, field, rendered_value)

Related issues

No response

Are you willing to submit a PR?

  • Yes, I am willing to submit a PR!

Metadata

Assignees

No one assigned

    Labels

    area:parsingRelated to parsing DAG/DBT improvement, issues, or fixesarea:renderingRelated to rendering, like Jinja, Airflow tasks, etcarea:selectorRelated to selector, like DAG selector, DBT selector, etcdo-not-staleRelated to stale job and dosubotenhancementNew feature or requesttriage-neededItems need to be reviewed / assigned to milestone

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions