Skip to content

Real-time streaming analytics pipeline for cryptocurrency prices using Kafka, PostgreSQL, PySpark, and Streamlit

Notifications You must be signed in to change notification settings

sofaquitegud/pulse-stream

Repository files navigation

Pulse-Stream 📈

Tests Lint

A real-time streaming analytics pipeline for cryptocurrency price tracking with Apache Kafka, PostgreSQL, and Streamlit.

🏗️ Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  CoinGecko  │────▶│   Python    │────▶│   Apache    │────▶│  PostgreSQL │
│    API      │     │  Producer   │     │    Kafka    │     │   Database  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
                                                                    │
                            ┌───────────────────────────────────────┤
                            ▼                                       ▼
                    ┌─────────────┐                         ┌─────────────┐
                    │   Price     │                         │  Streamlit  │
                    │   Alerts    │                         │  Dashboard  │
                    └─────────────┘                         └─────────────┘

✨ Features

  • Real-time Price Tracking - Fetches crypto prices every 30 seconds
  • Stream Processing - Apache Kafka for reliable message streaming
  • Price Alerts - Get notified when prices change > 5%
  • Interactive Dashboard - Streamlit UI with live charts
  • Batch Aggregations - 5-minute windowed statistics
  • CI/CD Pipeline - GitHub Actions for automated testing

🚀 Quick Start

Prerequisites

  • Docker & Docker Compose
  • Python 3.10+
  • Make (optional)

Setup

# 1. Clone the repository
git clone https://github.com/YOUR_USERNAME/pulse-stream.git
cd pulse-stream

# 2. Initialize project
make init

# 3. Start services
make up

# 4. Initialize database tables
docker exec -i pulse_postgres psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init-db.sql

# 5. Run the pipeline (in separate terminals)
make produce    # Fetch crypto prices
make consume    # Write to database
make alerts     # Monitor price changes
make dashboard  # View at http://localhost:8501

📁 Project Structure

pulse-stream/
├── producers/              # Data ingestion
│   ├── crypto_producer.py  # CoinGecko API → Kafka
│   └── reddit_producer.py  # Reddit API → Kafka (optional)
├── consumers/              # Stream processing
│   ├── db_writer.py        # Kafka → PostgreSQL
│   ├── alert_handler.py    # Price change alerts
│   └── spark_processor.py  # Windowed aggregations
├── common/                 # Shared utilities
│   ├── config.py           # Environment configuration
│   ├── utils.py            # Serialization, retries
│   └── logger.py           # Structured logging
├── database/               # Database layer
│   ├── models.py           # SQLAlchemy models
│   └── init-db.sql         # Schema initialization
├── dashboard/              # Visualization
│   └── app.py              # Streamlit dashboard
├── infra/                  # Infrastructure
│   └── docker-compose.yml  # Kafka, PostgreSQL, Zookeeper
├── tests/                  # Unit tests
├── .github/workflows/      # CI/CD
│   ├── test.yml            # Automated testing
│   └── lint.yml            # Code linting
├── .env.example            # Environment template
├── Makefile                # Common commands
└── requirements.txt        # Python dependencies

🛠️ Commands

Command Description
make init Initialize project
make up Start Docker services
make down Stop services
make produce Run crypto producer
make consume Run database writer
make alerts Run price alert handler
make spark-batch Run batch aggregations
make dashboard Launch Streamlit dashboard
make test Run unit tests
make clean Stop services and remove data

🔧 Configuration

Configuration is managed via environment variables in .env:

Variable Description Default
POSTGRES_USER Database username Required
POSTGRES_PASSWORD Database password Required
POSTGRES_DB Database name Required
KAFKA_BOOTSTRAP_SERVERS Kafka broker localhost:9092
CRYPTO_POLL_INTERVAL Fetch interval (seconds) 30
ALERT_PRICE_THRESHOLD Alert threshold (%) 5.0

📊 Technologies

Technology Purpose
Apache Kafka Message streaming
PostgreSQL Data persistence
Streamlit Real-time dashboard
SQLAlchemy ORM
PySpark Batch aggregations
Docker Compose Container orchestration
GitHub Actions CI/CD

🧪 Testing

make test

Tests run automatically on every push via GitHub Actions.

📈 Extending

Add a New Cryptocurrency

Edit common/config.py:

coins: list = field(default_factory=lambda: [
    "bitcoin", "ethereum", "solana", "cardano", "ripple",
    "dogecoin"  # Add here
])

Change Alert Threshold

Edit .env:

ALERT_PRICE_THRESHOLD=3.0  # Alert on 3% change

📝 License

MIT License


Built for learning real-time data engineering 🚀

About

Real-time streaming analytics pipeline for cryptocurrency prices using Kafka, PostgreSQL, PySpark, and Streamlit

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published