Production-ready ETL (Extract, Transform, Load) pipeline for automated data processing. Built with Python for reliability, scalability, and ease of maintenance.
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Extract β β β Transform β β β Load β β β Database β
β β β β β β β β
β β’ CSV β β β’ Clean β β β’ Batch β β β’ PostgreSQLβ
β β’ API β β β’ Validate β β β’ Insert β β β’ MySQL β
β β’ Database β β β’ Transform β β β’ Update β β β’ SQLite β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β β
[Logging] [Validation] [Monitoring]
- β Multi-source Extraction - CSV, databases, APIs
- β Data Transformation - Cleaning, validation, aggregation
- β Batch Loading - Efficient bulk inserts
- β Error Handling - Comprehensive error management
- β Logging - Detailed execution logs
- β Configuration - YAML-based configuration
- π Data Validation - Quality checks before loading
- π Incremental Updates - Process only new/changed data
- π Scheduled Execution - Cron/systemd compatible
- π§ Notifications - Email alerts on failures
- π― Modular Design - Easy to extend and customize
- Python 3.8+
- pandas - Data manipulation
- SQLAlchemy - Database ORM
- PyYAML - Configuration management
- psycopg2 - PostgreSQL adapter
- PyMySQL - MySQL adapter
# Ubuntu/Debian
sudo apt-get update
sudo apt-get install python3 python3-pip python3-venv
# Optional: Database clients for testing
sudo apt-get install postgresql-client mysql-clientpip install -r requirements.txt# Clone repository
git clone https://github.com/YOUR_USERNAME/etl-data-pipeline.git
cd etl-data-pipeline
# Create virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt# Copy example configuration
cp config/config.example.yaml config/config.yaml
# Edit configuration
nano config/config.yamlExample config.yaml:
source:
type: csv
path: data/input.csv
database:
host: localhost
port: 5432
name: mydb
user: dbuser
password: ${DB_PASSWORD} # Use environment variable
pipeline:
batch_size: 1000
validate_data: true
log_level: INFO# Set database password
export DB_PASSWORD='your_password'
# Run pipeline
python3 src/pipeline.pyetl-data-pipeline/
βββ config/
β βββ config.example.yaml # Configuration template
βββ src/
β βββ extract.py # Data extraction module
β βββ transform.py # Data transformation module
β βββ load.py # Data loading module
β βββ pipeline.py # Main pipeline orchestrator
βββ logs/ # Execution logs
βββ tests/ # Unit tests
βββ requirements.txt # Python dependencies
βββ README.md # This file
# config.yaml
source:
type: csv
path: sales_data.csv
database:
host: localhost
port: 5432
name: analytics
user: etl_user
password: ${DB_PASSWORD}
pipeline:
batch_size: 1000
validate_data: truepython3 src/pipeline.py# Custom extraction from source DB
from extract import DataExtractor
config = {
'source': {
'type': 'database',
'query': 'SELECT * FROM orders WHERE date >= CURRENT_DATE - 1'
}
}
extractor = DataExtractor(config)
data = extractor.extract()# Extend extractor for API support
class APIExtractor(DataExtractor):
def extract_from_api(self, url):
response = requests.get(url)
return pd.DataFrame(response.json())# Edit crontab
crontab -e
# Add daily execution at 2 AM
0 2 * * * cd /path/to/etl-data-pipeline && /path/to/venv/bin/python3 src/pipeline.py >> logs/cron.log 2>&1Create /etc/systemd/system/etl-pipeline.service:
[Unit]
Description=ETL Data Pipeline
After=network.target
[Service]
Type=oneshot
User=etl_user
WorkingDirectory=/path/to/etl-data-pipeline
Environment="DB_PASSWORD=your_password"
ExecStart=/path/to/venv/bin/python3 src/pipeline.py
[Install]
WantedBy=multi-user.targetCreate timer /etc/systemd/system/etl-pipeline.timer:
[Unit]
Description=Run ETL Pipeline Daily
[Timer]
OnCalendar=daily
OnCalendar=02:00
Persistent=true
[Install]
WantedBy=timers.targetEnable and start:
sudo systemctl enable etl-pipeline.timer
sudo systemctl start etl-pipeline.timer- Pipeline logs:
logs/pipeline_YYYYMMDD.log - Error logs:
logs/error_YYYYMMDD.log
2024-01-15 02:00:01 - INFO - Starting ETL Pipeline
2024-01-15 02:00:02 - INFO - STEP 1: Extract
2024-01-15 02:00:05 - INFO - Extracted 10000 rows
2024-01-15 02:00:05 - INFO - STEP 2: Transform
2024-01-15 02:00:08 - INFO - Transformed 10000 rows
2024-01-15 02:00:08 - INFO - STEP 3: Load
2024-01-15 02:00:12 - INFO - Successfully loaded 10000 rows
2024-01-15 02:00:12 - INFO - Pipeline completed in 11.5s
# Run tests (when implemented)
python3 -m pytest tests/
# Run with coverage
python3 -m pytest --cov=src tests/Issue: ModuleNotFoundError: No module named 'pandas'
# Solution: Install dependencies
pip install -r requirements.txtIssue: Database connection failed
# Solution: Check credentials and connection
psql -h localhost -U dbuser -d mydb # Test connectionIssue: Permission denied on logs directory
# Solution: Create logs directory
mkdir -p logs
chmod 755 logs- Daily Data Imports - Automated daily data ingestion
- API to Database Sync - Real-time or scheduled API data sync
- Data Migration - One-time or recurring data migrations
- Report Generation - Transform raw data for reporting
- Data Warehouse Loading - ETL for data warehouse population
- Never commit passwords - Use environment variables
- Secure configuration files - Restrict file permissions
- Use connection pooling - For production deployments
- Validate input data - Prevent SQL injection
- Encrypt sensitive data - In transit and at rest
Feel free to fork this repository and customize for your needs. This is a template designed to be extended.
MIT License - Free for personal and commercial use
Available for freelance ETL and data engineering projects.
Specialties:
- Data pipeline development
- Database optimization
- Automation scripting
- Linux system integration
For issues or questions:
- Open an issue on GitHub
- Check logs in
logs/directory - Review configuration in
config/
- Support for more data sources (MongoDB, S3, etc.)
- Web dashboard for monitoring
- Parallel processing for large datasets
- Data quality metrics
- Automated data profiling