- Overview
- Architecture
- Features
- Prerequisites
- Installation
- Configuration
- Usage
- Project Structure
- API Documentation
- Database Schema
- Monitoring & Alerts
- License
This project implements a comprehensive Data Engineering pipeline that continuously extracts transportation data from the Buenos Aires Transport API, processes it, and loads it into a Redshift Data Warehouse. The pipeline focuses on collecting real-time information about buses, subways, and EcoBici (bike-sharing) stations within the Autonomous City of Buenos Aires (CABA).
- Real-time Data Collection: Extract dynamic transportation data every hour
- Data Integration: Combine API data with database information
- Data Warehouse: Store processed data in Amazon Redshift for analytics
- Monitoring: Implement email alerts for system status and anomalies
- Scalability: Containerized deployment with Apache Airflow orchestration
ββββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Transport β β Apache β β Amazon β
β API (CABA) βββββΆβ Airflow βββββΆβ Redshift β
β β β (Docker) β β (DWH) β
ββββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β Email Alerts β
β (SMTP/Gmail) β
βββββββββββββββββββ
- Extraction: Python scripts fetch data from Transport API endpoints
- Transformation: Data is cleaned, filtered, and structured
- Loading: Processed data is loaded into Redshift tables
- Monitoring: Email alerts notify about pipeline status
- Real-time vehicle positions from multiple bus companies
- Route information and trip details
- Speed and location tracking with timestamps
- Agency management for different transport companies
- Service alerts and status updates
- Line information and schedules
- Real-time monitoring of subway operations
- Station information (locations, capacities, neighborhoods)
- Bike availability (mechanical and electric bikes)
- Station status and operational data
- Neighborhood filtering for specific areas
- Email notifications for pipeline failures
- SMTP integration with Gmail
- Customizable alert messages
- Real-time monitoring of data quality
Before running this project, ensure you have the following installed:
- Python 3.8+
- Docker & Docker Compose
- Git
- Amazon Redshift cluster
- Buenos Aires Transport API credentials
- Gmail account (for email alerts)
pandas>=1.5.0
requests>=2.28.0
sqlalchemy==1.4.51
psycopg2-binary>=2.9.0
apache-airflow>=2.7.3
configparsergit clone https://github.com/yourusername/DataEngineeringCoder-.git
cd DataEngineeringCoder-# Create virtual environment (optional but recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt# Set Airflow user ID (Linux/Mac)
echo -e "AIRFLOW_UID=$(id -u)" > .env
# On Windows, set AIRFLOW_UID=50000
echo "AIRFLOW_UID=50000" > .env# Initialize Airflow database
docker-compose up airflow-init
# Start all services
docker-compose up -dCreate a config/pipeline.conf file with your API credentials:
[api_transporte]
client_id = your_client_id_here
client_secret = your_client_secret_here
[RedShift]
host = your_redshift_cluster.amazonaws.com
port = 5439
dbname = your_database_name
user = your_username
pwd = your_passwordSet up Airflow variables for email alerts:
# Access Airflow web UI at http://localhost:8080
# Go to Admin > Variables
# Add variable: GMAIL_SECRET = your_gmail_app_passwordCreate a .env file for additional configuration:
AIRFLOW_UID=50000
AIRFLOW_IMAGE_NAME=apache/airflow:2.7.3
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow-
Start Airflow Services:
docker-compose up -d
-
Access Airflow Web UI:
- Open browser:
http://localhost:8080 - Login:
airflow/airflow
- Open browser:
-
Enable DAGs:
- Navigate to DAGs section
- Enable
my_daily_dagfor data ingestion - Enable
dag_smtp_email_automaticofor email alerts
# Run data ingestion manually
python scripts/main.py
# Or execute specific functions
python -c "from scripts.main import data_ingestion; data_ingestion()"- Airflow Web UI: Monitor DAG runs and task status
- Email Alerts: Check your configured email for notifications
- Logs: View detailed logs in Airflow UI or
./logsdirectory
DataEngineeringCoder-/
βββ dags/ # Apache Airflow DAGs
β βββ data_ingestion.py # Main ETL pipeline DAG
β βββ dag_email.py # Email alerts DAG
βββ scripts/ # Core Python scripts
β βββ main.py # Main ETL logic
β βββ utils.py # Utility functions
β βββ email_utils.py # Email functionality
β βββ alert_utils.py # Alert processing
βββ config/ # Configuration files
β βββ pipeline.conf # API and DB credentials
βββ logs/ # Airflow logs (auto-generated)
βββ docker-compose.yaml # Docker services configuration
βββ main.ipynb # Development notebook
βββ README.md # This file
The pipeline integrates with the Buenos Aires Transport API:
- Endpoint:
/colectivos/vehiclePositionsSimple - Parameters:
agency_id(9, 145, 155) - Data: Real-time bus positions, routes, speeds
- Endpoint:
/ecobici/gbfs/stationInformation - Data: Station locations, capacities, neighborhoods
- Endpoint:
/ecobici/gbfs/stationStatus - Data: Bike availability, station status
- Endpoint:
/subtes/alerts - Data: Service alerts, line status
- Primera Junta (agency_id: 145)
- La Nueva Metropol (agency_id: 9)
- TALP (agency_id: 155)
- EcoBici Stations (filtered by neighborhoods)
CREATE TABLE agencies (
agency_id INTEGER,
agency_name VARCHAR(100)
);CREATE TABLE bus_positions (
id INTEGER,
agency_id INTEGER,
route_id INTEGER,
latitude NUMERIC,
longitude NUMERIC,
speed NUMERIC,
timestamp TIMESTAMP,
route_short_name VARCHAR,
trip_headsign VARCHAR
);CREATE TABLE ecobici_stations (
station_id INTEGER,
name VARCHAR,
address VARCHAR,
capacity INTEGER,
lat NUMERIC,
lon NUMERIC,
neighborhood VARCHAR
);CREATE TABLE ecobici_stations_status (
station_id INTEGER,
num_bikes_available_mechanical INTEGER,
num_bikes_available_ebike INTEGER,
num_bikes_available INTEGER,
num_bikes_disabled INTEGER,
status VARCHAR,
last_reported TIMESTAMP
);- SMTP Server: Gmail SMTP
- Frequency: Configurable (currently every minute)
- Content: Pipeline status, data quality alerts, error notifications
- Pipeline Failures: DAG execution errors
- Data Quality Issues: Missing or invalid data
- API Connection Problems: Network or authentication issues
- Database Errors: Connection or query failures
# Email settings in scripts/email_utils.py
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587
SENDER_EMAIL = 'your_email@gmail.com'
RECIPIENT_EMAIL = 'your_email@gmail.com'This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Note: This project is designed for educational and development purposes. For production use, additional security measures, error handling, and monitoring should be implemented.