A modern ELT (Extract, Load, Transform) pipeline implementation using dbt, Snowflake, and Apache Airflow. This project demonstrates industry best practices for data engineering, including dimensional modeling, testing, and orchestration.
- Modern ELT Architecture: Leverages Snowflake's compute power for transformations
- Dimensional Modeling: Implementation of fact tables and data marts
- Automated Testing: Built-in data quality checks and custom test cases
- Airflow Orchestration: Using Astronomer's Cosmos for dbt integration
- RBAC Implementation: Snowflake role-based access control
- Modular Design: Separated staging, intermediate, and mart layers
graph LR
A[Source Data] -->|Extract & Load| B[Snowflake Raw]
B -->|dbt Transform| C[Staging Layer]
C -->|dbt Transform| D[Intermediate Layer]
D -->|dbt Transform| E[Mart Layer]
F[Airflow] -->|Orchestrate| B
F -->|Orchestrate| C
F -->|Orchestrate| D
F -->|Orchestrate| E
- dbt: Data transformation and modeling
- Snowflake: Cloud data warehouse
- Apache Airflow: Workflow orchestration
- Astronomer Cosmos: dbt-Airflow integration
- Python: 3.8+
-
Clone the repository
-
Install dependencies
pip install -r requirements.txt
-
Configure Snowflake
-- Run in Snowflake create warehouse dbt_wh with warehouse_size='x-small'; create database if not exists dbt_db; create role if not exists dbt_role; grant usage on warehouse dbt_wh to role dbt_role; grant all on database dbt_db to role dbt_role;
-
Setup dbt
dbt init data_pipeline
-
Configure Airflow
mkdir dbt-dag astro dev init astro dev start
modern-elt-pipeline/
├── dbt/
│ ├── models/
│ │ ├── staging/
│ │ │ ├── stg_tpch_orders.sql
│ │ │ ├── stg_tpch_line_items.sql
│ │ │ └── tpch_sources.yml
│ │ └── marts/
│ │ ├── intermediate/
│ │ └── fct_orders.sql
│ ├── macros/
│ │ └── pricing.sql
│ ├── tests/
│ └── dbt_project.yml
├── airflow/
│ └── dags/
│ └── dbt_dag.py
├── requirements.txt
├── Dockerfile
└── README.md
models:
snowflake_workshop:
staging:
materialized: view
marts:
materialized: table
Utilized GCP
{
"account": "<account_locator>-<account_name>",
"warehouse": "dbt_wh",
"database": "dbt_db",
"role": "dbt_role",
"insecure_mode": false
}
Run the dbt tests:
dbt test
Run specific tests:
dbt test --select test_type:singular
stg_tpch_orders
: Clean order datastg_tpch_line_items
: Processed line items with surrogate keys
fct_orders
: Core fact table with order metricsint_order_items
: Intermediate table for order analysis
This project is licensed under the MIT License - see the LICENSE file for details.