A real-time streaming analytics pipeline for cryptocurrency price tracking with Apache Kafka, PostgreSQL, and Streamlit.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ CoinGecko │────▶│ Python │────▶│ Apache │────▶│ PostgreSQL │
│ API │ │ Producer │ │ Kafka │ │ Database │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
┌───────────────────────────────────────┤
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Price │ │ Streamlit │
│ Alerts │ │ Dashboard │
└─────────────┘ └─────────────┘
- Real-time Price Tracking - Fetches crypto prices every 30 seconds
- Stream Processing - Apache Kafka for reliable message streaming
- Price Alerts - Get notified when prices change > 5%
- Interactive Dashboard - Streamlit UI with live charts
- Batch Aggregations - 5-minute windowed statistics
- CI/CD Pipeline - GitHub Actions for automated testing
- Docker & Docker Compose
- Python 3.10+
- Make (optional)
# 1. Clone the repository
git clone https://github.com/YOUR_USERNAME/pulse-stream.git
cd pulse-stream
# 2. Initialize project
make init
# 3. Start services
make up
# 4. Initialize database tables
docker exec -i pulse_postgres psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init-db.sql
# 5. Run the pipeline (in separate terminals)
make produce # Fetch crypto prices
make consume # Write to database
make alerts # Monitor price changes
make dashboard # View at http://localhost:8501pulse-stream/
├── producers/ # Data ingestion
│ ├── crypto_producer.py # CoinGecko API → Kafka
│ └── reddit_producer.py # Reddit API → Kafka (optional)
├── consumers/ # Stream processing
│ ├── db_writer.py # Kafka → PostgreSQL
│ ├── alert_handler.py # Price change alerts
│ └── spark_processor.py # Windowed aggregations
├── common/ # Shared utilities
│ ├── config.py # Environment configuration
│ ├── utils.py # Serialization, retries
│ └── logger.py # Structured logging
├── database/ # Database layer
│ ├── models.py # SQLAlchemy models
│ └── init-db.sql # Schema initialization
├── dashboard/ # Visualization
│ └── app.py # Streamlit dashboard
├── infra/ # Infrastructure
│ └── docker-compose.yml # Kafka, PostgreSQL, Zookeeper
├── tests/ # Unit tests
├── .github/workflows/ # CI/CD
│ ├── test.yml # Automated testing
│ └── lint.yml # Code linting
├── .env.example # Environment template
├── Makefile # Common commands
└── requirements.txt # Python dependencies
| Command | Description |
|---|---|
make init |
Initialize project |
make up |
Start Docker services |
make down |
Stop services |
make produce |
Run crypto producer |
make consume |
Run database writer |
make alerts |
Run price alert handler |
make spark-batch |
Run batch aggregations |
make dashboard |
Launch Streamlit dashboard |
make test |
Run unit tests |
make clean |
Stop services and remove data |
Configuration is managed via environment variables in .env:
| Variable | Description | Default |
|---|---|---|
POSTGRES_USER |
Database username | Required |
POSTGRES_PASSWORD |
Database password | Required |
POSTGRES_DB |
Database name | Required |
KAFKA_BOOTSTRAP_SERVERS |
Kafka broker | localhost:9092 |
CRYPTO_POLL_INTERVAL |
Fetch interval (seconds) | 30 |
ALERT_PRICE_THRESHOLD |
Alert threshold (%) | 5.0 |
| Technology | Purpose |
|---|---|
| Apache Kafka | Message streaming |
| PostgreSQL | Data persistence |
| Streamlit | Real-time dashboard |
| SQLAlchemy | ORM |
| PySpark | Batch aggregations |
| Docker Compose | Container orchestration |
| GitHub Actions | CI/CD |
make testTests run automatically on every push via GitHub Actions.
Edit common/config.py:
coins: list = field(default_factory=lambda: [
"bitcoin", "ethereum", "solana", "cardano", "ripple",
"dogecoin" # Add here
])Edit .env:
ALERT_PRICE_THRESHOLD=3.0 # Alert on 3% changeMIT License
Built for learning real-time data engineering 🚀