Apache Airflow is a tool for orchestrating complex workflows and data processing pipelines. The Kedro-Airflow plugin can be used for:
- Rapid pipeline creation in the prototyping phase. You can write Python functions in Kedro without worrying about schedulers, daemons, services or having to recreate the Airflow DAG file.
- Automatic dependency resolution in Kedro. This allows you to bypass Airflow's need to specify the order of your tasks.
- Distributing Kedro tasks across many workers. You can also enable monitoring and scheduling of the tasks' runtimes.
kedro-airflow
is a Python plugin. To install it:
pip install kedro-airflow
You can use kedro-airflow
to deploy a Kedro pipeline as an Airflow DAG by following these steps:
At the root directory of the Kedro project, run:
kedro airflow create
This command will generate an Airflow DAG file located in the airflow_dags/
directory in your project.
You can pass a --pipeline
flag to generate the DAG file for a specific Kedro pipeline and an --env
flag to generate the DAG file for a specific Kedro environment.
Passing --all
will convert all registered Kedro pipelines to Airflow DAGs.
For more information about the DAGs folder, please visit Airflow documentation. The Airflow DAG configuration can be customized by editing this file.
After generating and deploying the DAG file, you will then need to package and install the Kedro pipeline into the Airflow executor's environment. Please visit the guide to Apache Airflow deployment for more details.
By default, the generated DAG file is configured to live in the same directory as your project as per this template. If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the kedro airflow create
command.
You can use the additional command line argument --jinja-file
(alias -j
) to provide an alternative path to a Jinja2 template. Note that these files have to accept the same variables as those used in the default Jinja2 template.
kedro airflow create --jinja-file=./custom/template.j2
kedro-airflow
picks up configuration from airflow.yml
in conf/base
or conf/local
of your Kedro project.
Or it could be in a folder starting with airflow
.
The parameters are read by Kedro.
Arguments can be specified globally, or per pipeline:
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5
# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
Arguments can also be passed via --params
in the command line:
kedro airflow create --params "schedule_interval='@weekly'"
These variables are passed to the Jinja2 template that creates an Airflow DAG from your pipeline.
In order to configure the config loader, update the settings.py
file in your Kedro project.
For instance, if you would like to use the name scheduler
, then change the file as follows:
CONFIG_LOADER_ARGS = {"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}}
Follow Kedro's official documentation, to see how to add templating, custom resolvers etc.
In order to pass arguments other than those specified in the default template, simply pass a custom template (see: "What if I want to use a different Jinja2 template?")
The syntax for arguments is:
{{ argument_name }}
In order to make arguments optional, one can use:
{{ argument_name | default("default_value") }}
For examples, please have a look at the default template (airflow_dag_template.j2
).
The default configuration pattern is ["airflow*", "airflow/**"]
.
In order to configure the OmegaConfigLoader
, update the settings.py
file in your Kedro project as follows:
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
# other args
"config_patterns": { # configure the pattern for configuration files
"airflow": ["airflow*", "airflow/**"]
}
}
Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]
It is possible to pass parameters when triggering an Airflow DAG from the user interface. In order to use this feature, create a custom template using the Params syntax. See "What if I want to use a different Jinja2 template?" for instructions on using custom templates.
Which Airflow Operator to use depends on the environment your project is running in.
You can set the operator to use by providing a custom template.
See "What if I want to use a different Jinja2 template?" for instructions on using custom templates.
The rich offering of operators means that the kedro-airflow
plugin is providing templates for specific operators.
The default template provided by kedro-airflow
uses the BaseOperator
.
When running Kedro nodes using Airflow, MemoryDatasets are often not shared across operators. This will cause the DAG run to fail.
MemoryDatasets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors).
Nodes that are connected through MemoryDatasets are grouped together via the --group-in-memory
flag.
This preserves the option to have logical separation in Kedro, with little computational overhead.
It is possible to use task groups by changing the template. See "What if I want to use a different Jinja2 template?" for instructions on using custom templates.
Yes! Want to help build Kedro-Airflow? Check out our guide to contributing.
Kedro-Airflow is licensed under the Apache 2.0 License.
- The Kedro-Airflow supports all Python versions that are actively maintained by the CPython core team. When a Python version reaches end of life, support for that version is dropped from
kedro-airflow
. This is not considered a breaking change.