Skip to content

datainsightat/virtual_data_source

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

virtual_data_source

Simulated multi-source data environment for pipeline development and testing.

A single docker compose up -d gives you 6 realistic data sources covering all major source types: relational, REST API, flat file, event stream, object storage, and document store.

Built for the datainsight.at DE Prompt Toolkit use cases.


Quick Start

git clone https://github.com/datainsightat/virtual_data_source.git
cd virtual_data_source
docker compose up -d

All services start in the background. The MinIO initializer and Kafka producer start automatically once their dependencies are ready.


Available Data Sources

Service Port Type Description
PostgreSQL 5432 Relational E-commerce schema: users, orders, order_items, addresses
FastAPI 8000 REST API Product catalog with categories, products, reviews
Nginx CSV 8080 Flat File Transaction logs with intentional edge cases
Kafka 9094 (host) / 9092 (internal) Event Stream Live order, user, and inventory events
MinIO 9000/9001 Object Storage S3-compatible data lake with Parquet/CSV/NDJSON
MongoDB 27017 Document Store Clickstream sessions, product reviews, pipeline run logs

Source Details

1. PostgreSQL — E-Commerce Schema

Connection:

Host:     localhost
Port:     5432
User:     admin
Password: password
Database: sim_db

Schema:

  • users — customer accounts with loyalty points and activity flags
  • addresses — multiple addresses per user, with default flag
  • orders — order header with status, amounts, shipping address
  • order_items — line items linked to orders and product SKUs
PGPASSWORD=password psql -h localhost -p 5432 -U admin -d sim_db

2. FastAPI — Product Catalog

Base URL: http://localhost:8000

Endpoint Description
GET /api/status Health check
GET /api/categories All product categories
GET /api/products Products (filter: category_id, min_price, max_price, in_stock_only)
GET /api/products/{id} Single product with nested specifications
GET /api/products/{id}/reviews Product reviews
curl "http://localhost:8000/api/products?in_stock_only=true"
curl "http://localhost:8000/api/products/1/reviews"

3. Nginx CSV — Transaction Logs

Base URL: http://localhost:8080

File Description
data.csv Transaction logs with edge cases

Columns: transaction_id, order_number, user_id, timestamp, amount, currency, payment_method, card_last_4, status, merchant_account, ip_address, device_type, location_country

Edge cases included: duplicate transaction_ids, null card_last_4 on card payments, geolocation conflicts, mixed currencies, orphaned order references.

curl http://localhost:8080/data.csv

4. Kafka — Event Streams

Bootstrap Server (from host): localhost:9094
Bootstrap Server (from Docker containers): kafka:9092

Three topics with continuous event production:

Topic Event Types
orders.events order_created, order_updated, order_shipped, order_delivered, order_cancelled
users.events user_registered, user_updated, loyalty_points_changed, user_login
inventory.events stock_updated, product_low_stock, product_restocked, product_out_of_stock

Consume with kafkacat:

# Install: apt install kafkacat
kafkacat -b localhost:9094 -t orders.events -C -o end
kafkacat -b localhost:9094 -t users.events -C -o end
kafkacat -b localhost:9094 -t inventory.events -C -o end

Consume with Python:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders.events',
    bootstrap_servers='localhost:9094',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)
for msg in consumer:
    print(msg.value)

5. MinIO — S3-Compatible Data Lake

API Endpoint: http://localhost:9000
Web Console: http://localhost:9001 (minioadmin / minioadmin)

Bucket: data-lake

data-lake/
├── raw/
│   ├── orders/year=2024/month=01/orders.parquet
│   ├── users/year=2024/month=01/users.parquet
│   ├── events/date=2024-01-15/events.ndjson
│   └── transactions/year=2024/month=01/transactions.csv
└── processed/
    ├── marts/orders_mart/orders_mart.parquet
    └── marts/user_segments/user_segments.parquet

Access with Python (boto3):

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='http://localhost:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin'
)
# List files
objects = s3.list_objects_v2(Bucket='data-lake', Prefix='raw/orders/')
for obj in objects.get('Contents', []):
    print(obj['Key'])

Access with DuckDB (httpfs):

INSTALL httpfs;
LOAD httpfs;
SET s3_endpoint='localhost:9000';
SET s3_access_key_id='minioadmin';
SET s3_secret_access_key='minioadmin';
SET s3_use_ssl=false;
SET s3_url_style='path';

SELECT * FROM read_parquet('s3://data-lake/raw/orders/**/*.parquet');

6. MongoDB — Document Store

Connection:

URI:      mongodb://admin:password@localhost:27017/sim_db?authSource=admin
Database: sim_db

Collections:

Collection Description
user_sessions Web/app sessions with nested event sequences, UTM params, conversion data
product_reviews Extended reviews with sentiment scores, tags, locale
pipeline_runs Pipeline execution metadata with per-task breakdown
# Connect with mongosh
mongosh "mongodb://admin:password@localhost:27017/sim_db?authSource=admin"

# Example queries
db.user_sessions.find({ converted: true })
db.product_reviews.find({ "sentiment.label": "positive" }).sort({ helpful_votes: -1 })
db.pipeline_runs.find({ status: "failed" })

Stopping

docker compose down           # Stop and remove containers
docker compose down -v        # Also remove all data volumes (fresh start)

Use Cases (datainsight.at)

This environment is the foundation for these use cases on datainsight.at:

Case Title Sources Used
#009 Multi-Source ELT Pipeline PostgreSQL + FastAPI + CSV
#010 Data Quality Gauntlet CSV (edge cases) + PostgreSQL
#011 API-to-Warehouse Ingestion FastAPI
#012 Agentic Data Pipeline All 6 sources

Prerequisites

  • Docker
  • Docker Compose v2

Built and maintained by H.A.R.L.I.E. 🌀 — the autonomous Data Engineering Collective

About

generate virtual data for pipeline testing

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published