This repository provides a working, dockerized reference implementation of end-to-end observability for distributed energy resources (DER): solar inverters, batteries, and EV chargers. It demonstrates how to ingest → normalize → store → analyze → visualize high-frequency telemetry with production-like patterns.
- FastAPI ingestion API with
/health
and/ingest
endpoints (app/ingestion_api.py
). - Pydantic
TelemetryPacket
model withts
,site_id
,vendor
,device_type
,device_id
,data
,meta
(app/models.py
). - Normalization pipeline to coerce mixed vendor packets into canonical rows with explicit
metric
,value
,unit
(app/normalize.py
). - TimescaleDB/Postgres persistence with JSONB
meta
and upsert for latest telemetry (app/db.py
,docker-compose.yml
). - Streamlit dashboard with live refresh showing Fleet Power Flow, Power Metrics, Battery SOC & Inverter Temp, Recent Telemetry, and Telemetry Gaps (
dashboard/app.py
). - Gap/anomaly utilities for data quality and availability risk estimation (
app/anomalies.py
). - Deterministic simulators for solar inverter, battery, and EVSE to generate test telemetry (
simulator/*.py
). - Dockerized stack: TimescaleDB, API (uvicorn), Dashboard (Streamlit) wired via
docker-compose.yml
.
[Simulators] ──HTTP JSON──▶ [FastAPI Ingestion] ──normalize──▶ [TimescaleDB]
│
└──▶ [Streamlit Dashboard]
- Ingestion API: Receives a list of
TelemetryPacket
JSON objects and normalizes them into tidy rows. - Storage: Persists to Postgres/TimescaleDB (time-series optimized). JSONB
meta
preserves vendor specifics. - Analysis: Gap and outlier detection (
anomalies.py
), including estimated lost DR capacity from gaps. - Visualization: Real-time charts plus Telemetry Gaps panel for data quality and availability.
Prereqs: Docker + Docker Compose
# build and start containers in background
docker compose up --build -d
# check logs
docker compose logs -f api
docker compose logs -f dashboard
docker compose logs -f simulator
# stop and remove containers/volumes
docker compose down -v
This starts three services:
- db – TimescaleDB on
localhost:5432
(db:telemetry
, user/pass:postgres/postgres
) - api – FastAPI on
http://localhost:8000
- dashboard – Streamlit on
http://localhost:8501
Health:
GET http://localhost:8000/health
→{"status":"ok"}
Schema is defined in sql/init.sql
. To load it:
docker compose exec db psql -U postgres -d telemetry -f sql/init.sql
POST /ingest
accepts a JSON array of packets:
[
{
"ts": "2025-01-01T00:00:01Z",
"site_id": "SITE_001",
"vendor": "acme",
"device_type": "inverter",
"device_id": "INV-01",
"data": {"ac_power_kw": 4.2, "dc_voltage_v": 380},
"meta": {"fw": "1.2.3"}
}
]
curl -X POST http://localhost:8000/ingest -H "Content-Type: application/json" -d '[{"ts":"2025-01-01T00:00:01Z","site_id":"SITE_001","vendor":"acme",
"device_type":"inverter","device_id":"INV-01",
"data":{"ac_power_kw":4.2},"meta":{"fw":"1.2.3"}}]'
DATABASE_URL
– API DSN (default:postgresql://postgres:postgres@db:5432/telemetry
)DSN
– dashboard DSN (default: same as above)API_URL
– simulator target API (default:http://api:8000/ingest
)HEALTH_URL
– simulator health check (default:http://api:8000/health
)
Generate realistic test data without hardware:
simulator/solar_inverter.py
simulator/battery.py
simulator/ev_charger.py
simulator/run_all.py
(runs all of the above; waits for API health, then streams packets at configurable Hz)
Example:
python -m distributed-energy-observability.simulator.run_all --hz 15 --site SITE_001
Open http://localhost:8501 to view:
- Fleet Power Flow (1s avg)
- Power Metrics (1s avg)
- Battery SOC & Inverter Temp (past 3 min)
- Recent Telemetry
- Telemetry Gaps (with
max_gap_s
threshold)
Auto-refresh cadence and lookback window are adjustable in the UI.
Columns:
ts
,site_id
,device_type
,device_id
,metric
,value
,unit
,meta
Utilities in app/anomalies.py
:
gap_windows(df, max_gap_s=5.0)
zscore_outliers(df, threshold=3.0)
estimate_lost_dr_kw(gap_df)
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
Run services manually:
export DATABASE_URL=postgresql://postgres:postgres@localhost:5432/telemetry
uvicorn distributed-energy-observability.app.ingestion_api:app --reload --port 8000
streamlit run distributed-energy-observability/dashboard/app.py
Use Black, Ruff, isort with config in pyproject.toml
.
- Add Kafka/Redpanda buffer for backpressure
- Add IsolationForest anomaly detection with labeled faults
- Add Grafana dashboards alongside Streamlit
- Add DR event simulator to show derated capacity under telemetry loss
- Add multi-site roll-ups and service-level objectives (% devices meeting freshness targets)
distributed-energy-observability/
├─ app/
├─ dashboard/
├─ simulator/
├─ sql/
│ └─ init.sql
├─ assets/
│ ├─ dashboard-hero.png
│ └─ docker-up-demo.gif
├─ Dockerfile
├─ docker-compose.yml
└─ requirements.txt
MIT