Skip to content

ETL pipeline that extracts Buenos Aires transport data (buses, subways, EcoBici) and loads it into Redshift data warehouse with automated monitoring & alerts.

Notifications You must be signed in to change notification settings

c-gonzalez-a/DataEngineeringCoder

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

73 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš‡ Data Engineering Project - Buenos Aires Transport Pipeline

Python Apache Airflow Docker Redshift

πŸ“‹ Table of Contents

🎯 Overview

This project implements a comprehensive Data Engineering pipeline that continuously extracts transportation data from the Buenos Aires Transport API, processes it, and loads it into a Redshift Data Warehouse. The pipeline focuses on collecting real-time information about buses, subways, and EcoBici (bike-sharing) stations within the Autonomous City of Buenos Aires (CABA).

Key Objectives

  • Real-time Data Collection: Extract dynamic transportation data every hour
  • Data Integration: Combine API data with database information
  • Data Warehouse: Store processed data in Amazon Redshift for analytics
  • Monitoring: Implement email alerts for system status and anomalies
  • Scalability: Containerized deployment with Apache Airflow orchestration

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Transport    β”‚     β”‚   Apache     β”‚    β”‚   Amazon     β”‚
β”‚   API (CABA)   │───▢│   Airflow    │───▢│   Redshift   β”‚
β”‚                β”‚     β”‚   (Docker)   β”‚    β”‚   (DWH)      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                              β–Ό
                       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                       β”‚   Email Alerts  β”‚
                       β”‚   (SMTP/Gmail)  β”‚
                       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Flow

  1. Extraction: Python scripts fetch data from Transport API endpoints
  2. Transformation: Data is cleaned, filtered, and structured
  3. Loading: Processed data is loaded into Redshift tables
  4. Monitoring: Email alerts notify about pipeline status

✨ Features

🚌 Bus Data Collection

  • Real-time vehicle positions from multiple bus companies
  • Route information and trip details
  • Speed and location tracking with timestamps
  • Agency management for different transport companies

πŸš‡ Subway Information

  • Service alerts and status updates
  • Line information and schedules
  • Real-time monitoring of subway operations

🚲 EcoBici Station Data

  • Station information (locations, capacities, neighborhoods)
  • Bike availability (mechanical and electric bikes)
  • Station status and operational data
  • Neighborhood filtering for specific areas

πŸ“§ Alert System

  • Email notifications for pipeline failures
  • SMTP integration with Gmail
  • Customizable alert messages
  • Real-time monitoring of data quality

πŸ”§ Prerequisites

Before running this project, ensure you have the following installed:

Required Software

  • Python 3.8+
  • Docker & Docker Compose
  • Git

Required Accounts & Services

  • Amazon Redshift cluster
  • Buenos Aires Transport API credentials
  • Gmail account (for email alerts)

Python Dependencies

pandas>=1.5.0
requests>=2.28.0
sqlalchemy==1.4.51
psycopg2-binary>=2.9.0
apache-airflow>=2.7.3
configparser

πŸš€ Installation

1. Clone the Repository

git clone https://github.com/yourusername/DataEngineeringCoder-.git
cd DataEngineeringCoder-

2. Set Up Environment

# Create virtual environment (optional but recommended)
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

3. Configure Docker Environment

# Set Airflow user ID (Linux/Mac)
echo -e "AIRFLOW_UID=$(id -u)" > .env

# On Windows, set AIRFLOW_UID=50000
echo "AIRFLOW_UID=50000" > .env

4. Start Airflow Services

# Initialize Airflow database
docker-compose up airflow-init

# Start all services
docker-compose up -d

βš™οΈ Configuration

1. API Credentials

Create a config/pipeline.conf file with your API credentials:

[api_transporte]
client_id = your_client_id_here
client_secret = your_client_secret_here

[RedShift]
host = your_redshift_cluster.amazonaws.com
port = 5439
dbname = your_database_name
user = your_username
pwd = your_password

2. Airflow Variables

Set up Airflow variables for email alerts:

# Access Airflow web UI at http://localhost:8080
# Go to Admin > Variables
# Add variable: GMAIL_SECRET = your_gmail_app_password

3. Environment Variables

Create a .env file for additional configuration:

AIRFLOW_UID=50000
AIRFLOW_IMAGE_NAME=apache/airflow:2.7.3
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow

πŸ“Š Usage

Starting the Pipeline

  1. Start Airflow Services:

    docker-compose up -d
  2. Access Airflow Web UI:

    • Open browser: http://localhost:8080
    • Login: airflow / airflow
  3. Enable DAGs:

    • Navigate to DAGs section
    • Enable my_daily_dag for data ingestion
    • Enable dag_smtp_email_automatico for email alerts

Manual Execution

# Run data ingestion manually
python scripts/main.py

# Or execute specific functions
python -c "from scripts.main import data_ingestion; data_ingestion()"

Monitoring Pipeline

  • Airflow Web UI: Monitor DAG runs and task status
  • Email Alerts: Check your configured email for notifications
  • Logs: View detailed logs in Airflow UI or ./logs directory

πŸ“ Project Structure

DataEngineeringCoder-/
β”œβ”€β”€ dags/                          # Apache Airflow DAGs
β”‚   β”œβ”€β”€ data_ingestion.py         # Main ETL pipeline DAG
β”‚   └── dag_email.py              # Email alerts DAG
β”œβ”€β”€ scripts/                       # Core Python scripts
β”‚   β”œβ”€β”€ main.py                   # Main ETL logic
β”‚   β”œβ”€β”€ utils.py                  # Utility functions
β”‚   β”œβ”€β”€ email_utils.py            # Email functionality
β”‚   └── alert_utils.py            # Alert processing
β”œβ”€β”€ config/                        # Configuration files
β”‚   └── pipeline.conf             # API and DB credentials
β”œβ”€β”€ logs/                         # Airflow logs (auto-generated)
β”œβ”€β”€ docker-compose.yaml           # Docker services configuration
β”œβ”€β”€ main.ipynb                    # Development notebook
└── README.md                     # This file

πŸ”Œ API Documentation

Transport API Endpoints

The pipeline integrates with the Buenos Aires Transport API:

Bus Data

  • Endpoint: /colectivos/vehiclePositionsSimple
  • Parameters: agency_id (9, 145, 155)
  • Data: Real-time bus positions, routes, speeds

EcoBici Data

  • Endpoint: /ecobici/gbfs/stationInformation
  • Data: Station locations, capacities, neighborhoods
  • Endpoint: /ecobici/gbfs/stationStatus
  • Data: Bike availability, station status

Subway Data

  • Endpoint: /subtes/alerts
  • Data: Service alerts, line status

Data Sources

  • Primera Junta (agency_id: 145)
  • La Nueva Metropol (agency_id: 9)
  • TALP (agency_id: 155)
  • EcoBici Stations (filtered by neighborhoods)

πŸ—„οΈ Database Schema

Redshift Tables

agencies

CREATE TABLE agencies (
    agency_id INTEGER,
    agency_name VARCHAR(100)
);

bus_positions

CREATE TABLE bus_positions (
    id INTEGER,
    agency_id INTEGER,
    route_id INTEGER,
    latitude NUMERIC,
    longitude NUMERIC,
    speed NUMERIC,
    timestamp TIMESTAMP,
    route_short_name VARCHAR,
    trip_headsign VARCHAR
);

ecobici_stations

CREATE TABLE ecobici_stations (
    station_id INTEGER,
    name VARCHAR,
    address VARCHAR,
    capacity INTEGER,
    lat NUMERIC,
    lon NUMERIC,
    neighborhood VARCHAR
);

ecobici_stations_status

CREATE TABLE ecobici_stations_status (
    station_id INTEGER,
    num_bikes_available_mechanical INTEGER,
    num_bikes_available_ebike INTEGER,
    num_bikes_available INTEGER,
    num_bikes_disabled INTEGER,
    status VARCHAR,
    last_reported TIMESTAMP
);

πŸ“§ Monitoring & Alerts

Email Alert System

  • SMTP Server: Gmail SMTP
  • Frequency: Configurable (currently every minute)
  • Content: Pipeline status, data quality alerts, error notifications

Alert Types

  • Pipeline Failures: DAG execution errors
  • Data Quality Issues: Missing or invalid data
  • API Connection Problems: Network or authentication issues
  • Database Errors: Connection or query failures

Configuration

# Email settings in scripts/email_utils.py
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587
SENDER_EMAIL = 'your_email@gmail.com'
RECIPIENT_EMAIL = 'your_email@gmail.com'

πŸ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.


Note: This project is designed for educational and development purposes. For production use, additional security measures, error handling, and monitoring should be implemented.

About

ETL pipeline that extracts Buenos Aires transport data (buses, subways, EcoBici) and loads it into Redshift data warehouse with automated monitoring & alerts.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published