Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Airflow orchestration components for BNPL data pipeline
"""
118 changes: 118 additions & 0 deletions airflow/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
version: '3.8'

x-airflow-common:
&airflow-common
image: apache/airflow:2.7.0-python3.11
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/gcp/credentials.json
# BNPL pipeline specific environment
AIRFLOW__WEBSERVER__SECRET_KEY: 'bnpl_data_pipeline_secret_key_2024'
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
- ./utils:/opt/airflow/utils
- ../scripts:/opt/airflow/scripts
- ~/.gcp:/opt/airflow/gcp:ro
- airflow-logs:/opt/airflow/logs
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
ports:
- "5432:5432"

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo -e "\033[1;31mERROR: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}"
exit 1
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec airflow db init
airflow users create \
--username admin \
--firstname BNPL \
--lastname Admin \
--role Admin \
--email admin@bnpl-pipeline.local \
--password admin
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
user: "0:0"
volumes:
- .:/sources

volumes:
postgres-db-volume:
airflow-logs:
126 changes: 126 additions & 0 deletions docs/BNPL_INFRASTRUCTURE_SETUP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# BNPL Pipeline Infrastructure Setup

## Overview

This document describes the infrastructure foundation for the BNPL (Buy Now, Pay Later) data engineering pipeline. This is Phase 1 of the broader BNPL analytics initiative, establishing production-grade data infrastructure for ingesting 1.8M historical transactions and enabling ML model development.

## Architecture

```
simtom API → BigQuery (raw) → dbt (transform) → Analytics/ML tables
```

**Key Components:**
- **Data Source**: simtom API (https://simtom-production.up.railway.app/stream/bnpl)
- **Storage**: BigQuery datasets for raw, intermediate, and mart layers
- **Orchestration**: Apache Airflow with LocalExecutor
- **Quality**: Great Expectations for data validation
- **Pattern**: ELT leveraging BigQuery compute

## Environment Setup

### Prerequisites
- Python 3.11+ with conda
- Docker and Docker Compose
- Google Cloud credentials configured
- Access to flit-data-platform BigQuery project

### Setup Commands
```bash
# Navigate to project
cd /Users/kevin/Documents/repos/flit-data-platform

# Activate environment
conda activate flit

# Set GCP credentials
export GOOGLE_APPLICATION_CREDENTIALS="/Users/kevin/Documents/repos/.gcp/flit-data-platform-dev-sa.json"

# Install dependencies
pip install -r requirements.txt
```

## BigQuery Infrastructure

### Datasets Created
- `flit_bnpl_raw` - Raw BNPL transaction data from simtom API
- `flit_bnpl_intermediate` - Intermediate BNPL data transformations
- `flit_bnpl_marts` - Analytics-ready BNPL data marts

## Directory Structure

```
flit-data-platform/
├── scripts/bnpl/
│ ├── __init__.py
│ └── api_client.py # Production-grade API client
├── airflow/
│ ├── docker-compose.yml # LocalExecutor + PostgreSQL
│ ├── .env # Environment configuration
│ ├── dags/ # Airflow DAGs
│ ├── plugins/ # Custom Airflow plugins
│ └── utils/ # Utility functions
├── great_expectations/
│ ├── expectations/ # Data quality expectations
│ └── checkpoints/ # Validation checkpoints
└── docs/ # Project documentation
```

## API Client

The BNPL API client (`scripts/bnpl/api_client.py`) provides production-grade features:

- Exponential backoff retry logic
- Rate limiting and request throttling
- Comprehensive logging and monitoring
- Input validation and response verification

### Key Methods
- `get_bnpl_data()` - Core API interaction method
- `get_daily_batch()` - Simplified daily data retrieval
- `test_connection()` - API connectivity verification

### Testing Connectivity
```python
from scripts.bnpl.api_client import BNPLAPIClient
client = BNPLAPIClient()
print(client.test_connection())
```

## Airflow Setup

### Starting Airflow
```bash
cd airflow
docker-compose up -d
```

### Access
- Web UI: http://localhost:8080
- Credentials: admin/admin

### Configuration
- **Executor**: LocalExecutor (production-appropriate for single-machine)
- **Database**: PostgreSQL for metadata storage
- **Volumes**: DAGs, plugins, scripts, and GCP credentials mounted

## Data Volume Strategy

- **Target**: 1.8M transactions for ML training
- **Distribution**: Leverages simtom's realistic business patterns (weekends, holidays, seasonal variations)
- **Ingestion**: Daily batches simulating production operations

## Next Steps

1. **PR 2**: Data Ingestion Engine - Historical data acquisition
2. **PR 3**: dbt Transformation Pipeline - Data modeling and transformations
3. **PR 4**: Data Quality Framework - Great Expectations implementation
4. **PR 5**: Airflow Orchestration - Production DAGs and scheduling
5. **PR 6**: ML Feature Engineering - Analytics-ready feature tables

## Important Design Decisions

1. **LocalExecutor over CeleryExecutor** - Right tool for single-machine deployment
2. **ELT over ETL** - Leverage BigQuery compute, preserve raw data
3. **Trust simtom patterns** - Don't duplicate business logic already provided by simtom
4. **Production patterns** - All architecture choices reflect real-world data engineering practices
11 changes: 11 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ pyarrow==21.0.0
python-dateutil==2.8.2
pytz==2023.3

# BNPL Pipeline Dependencies
# API client for data ingestion
requests==2.31.0
urllib3==2.0.7

# Airflow for orchestration
apache-airflow==2.8.4

# Data quality and validation
great-expectations==0.18.19

# Optional: For local development and testing
pytest==7.4.0
python-dotenv==1.0.0
Expand Down
8 changes: 8 additions & 0 deletions scripts/bnpl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
BNPL Data Pipeline

This package contains data ingestion, processing, and utility modules
for the BNPL (Buy Now, Pay Later) transaction data pipeline.
"""

__version__ = "1.0.0"
Loading