Simulated multi-source data environment for pipeline development and testing.
A single docker compose up -d gives you 6 realistic data sources covering all major source types: relational, REST API, flat file, event stream, object storage, and document store.
Built for the datainsight.at DE Prompt Toolkit use cases.
git clone https://github.com/datainsightat/virtual_data_source.git
cd virtual_data_source
docker compose up -dAll services start in the background. The MinIO initializer and Kafka producer start automatically once their dependencies are ready.
| Service | Port | Type | Description |
|---|---|---|---|
| PostgreSQL | 5432 | Relational | E-commerce schema: users, orders, order_items, addresses |
| FastAPI | 8000 | REST API | Product catalog with categories, products, reviews |
| Nginx CSV | 8080 | Flat File | Transaction logs with intentional edge cases |
| Kafka | 9094 (host) / 9092 (internal) | Event Stream | Live order, user, and inventory events |
| MinIO | 9000/9001 | Object Storage | S3-compatible data lake with Parquet/CSV/NDJSON |
| MongoDB | 27017 | Document Store | Clickstream sessions, product reviews, pipeline run logs |
Connection:
Host: localhost
Port: 5432
User: admin
Password: password
Database: sim_db
Schema:
users— customer accounts with loyalty points and activity flagsaddresses— multiple addresses per user, with default flagorders— order header with status, amounts, shipping addressorder_items— line items linked to orders and product SKUs
PGPASSWORD=password psql -h localhost -p 5432 -U admin -d sim_dbBase URL: http://localhost:8000
| Endpoint | Description |
|---|---|
GET /api/status |
Health check |
GET /api/categories |
All product categories |
GET /api/products |
Products (filter: category_id, min_price, max_price, in_stock_only) |
GET /api/products/{id} |
Single product with nested specifications |
GET /api/products/{id}/reviews |
Product reviews |
curl "http://localhost:8000/api/products?in_stock_only=true"
curl "http://localhost:8000/api/products/1/reviews"Base URL: http://localhost:8080
| File | Description |
|---|---|
data.csv |
Transaction logs with edge cases |
Columns: transaction_id, order_number, user_id, timestamp, amount, currency, payment_method, card_last_4, status, merchant_account, ip_address, device_type, location_country
Edge cases included: duplicate transaction_ids, null card_last_4 on card payments, geolocation conflicts, mixed currencies, orphaned order references.
curl http://localhost:8080/data.csvBootstrap Server (from host): localhost:9094
Bootstrap Server (from Docker containers): kafka:9092
Three topics with continuous event production:
| Topic | Event Types |
|---|---|
orders.events |
order_created, order_updated, order_shipped, order_delivered, order_cancelled |
users.events |
user_registered, user_updated, loyalty_points_changed, user_login |
inventory.events |
stock_updated, product_low_stock, product_restocked, product_out_of_stock |
Consume with kafkacat:
# Install: apt install kafkacat
kafkacat -b localhost:9094 -t orders.events -C -o end
kafkacat -b localhost:9094 -t users.events -C -o end
kafkacat -b localhost:9094 -t inventory.events -C -o endConsume with Python:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders.events',
bootstrap_servers='localhost:9094',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
for msg in consumer:
print(msg.value)API Endpoint: http://localhost:9000
Web Console: http://localhost:9001 (minioadmin / minioadmin)
Bucket: data-lake
data-lake/
├── raw/
│ ├── orders/year=2024/month=01/orders.parquet
│ ├── users/year=2024/month=01/users.parquet
│ ├── events/date=2024-01-15/events.ndjson
│ └── transactions/year=2024/month=01/transactions.csv
└── processed/
├── marts/orders_mart/orders_mart.parquet
└── marts/user_segments/user_segments.parquet
Access with Python (boto3):
import boto3
s3 = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin'
)
# List files
objects = s3.list_objects_v2(Bucket='data-lake', Prefix='raw/orders/')
for obj in objects.get('Contents', []):
print(obj['Key'])Access with DuckDB (httpfs):
INSTALL httpfs;
LOAD httpfs;
SET s3_endpoint='localhost:9000';
SET s3_access_key_id='minioadmin';
SET s3_secret_access_key='minioadmin';
SET s3_use_ssl=false;
SET s3_url_style='path';
SELECT * FROM read_parquet('s3://data-lake/raw/orders/**/*.parquet');Connection:
URI: mongodb://admin:password@localhost:27017/sim_db?authSource=admin
Database: sim_db
Collections:
| Collection | Description |
|---|---|
user_sessions |
Web/app sessions with nested event sequences, UTM params, conversion data |
product_reviews |
Extended reviews with sentiment scores, tags, locale |
pipeline_runs |
Pipeline execution metadata with per-task breakdown |
# Connect with mongosh
mongosh "mongodb://admin:password@localhost:27017/sim_db?authSource=admin"
# Example queries
db.user_sessions.find({ converted: true })
db.product_reviews.find({ "sentiment.label": "positive" }).sort({ helpful_votes: -1 })
db.pipeline_runs.find({ status: "failed" })docker compose down # Stop and remove containers
docker compose down -v # Also remove all data volumes (fresh start)This environment is the foundation for these use cases on datainsight.at:
| Case | Title | Sources Used |
|---|---|---|
| #009 | Multi-Source ELT Pipeline | PostgreSQL + FastAPI + CSV |
| #010 | Data Quality Gauntlet | CSV (edge cases) + PostgreSQL |
| #011 | API-to-Warehouse Ingestion | FastAPI |
| #012 | Agentic Data Pipeline | All 6 sources |
- Docker
- Docker Compose v2
Built and maintained by H.A.R.L.I.E. 🌀 — the autonomous Data Engineering Collective