Skip to content

ETL pipeline for time-series sensor data with PostgreSQL, strategic indexing for sub-second queries (~0.11s), real-time anomaly detection, cursor-based pagination, and Docker orchestration

License

Notifications You must be signed in to change notification settings

JohnBasrai/sensorflow-data-pipeline

Repository files navigation

sensorflow-data-pipeline

Modular sensor data pipeline in Rust: fetch, transform, analyze, store, and expose via API

Summary

sensorflow-data-pipeline is a portfolio-quality demo of a modular, end-to-end data pipeline written in Rust for processing time-series sensor data. It fetches data from a secure, paginated API, performs real-time transformations and anomaly detection, aggregates results by mesh ID, stores them in PostgreSQL, and serves the data via a RESTful API.

This project is designed to demonstrate production-realistic architecture, with a focus on clarity, testability, and performance. All prior company-specific references have been removed to present this as an independent, standalone project.


πŸ”₯ Features

  • Secure ingestion from a paginated API
  • UTC timestamp normalization (client-side timezone conversion)
  • Anomaly detection (temperature, humidity, device status)
  • Aggregation by mesh_id
  • PostgreSQL-backed storage for summary data
  • Strategic database indexing for sub-second query performance
  • REST API to serve transformed sensor data
  • Docker Compose for full environment setup

πŸš€ Quickstart

Clone the repo and start the system using Docker:

git clone https://github.com/JohnBasrai/sensorflow-data-pipeline.git
cd sensorflow-data-pipeline
cp .env.example .env
# make changes to localize if needed
docker compose up --build -d
# See Testing for how to run the tests.

This will:

  • Start the sensor data source API
  • Start PostgreSQL with optimized indexes for efficient filtering
  • Run the Rust backend to ingest, process, store, and expose transformed sensor data

You can then query the transformed data via:

API

GET /sql/readings

Returns sensor readings from Postgres (ingest-once; subsequent calls are fast).

Query params

  • device_id (aliases: device, deviceId, deviceID)
  • mesh_id (aliases: mesh, meshId, meshID)
  • timestamp_range β€” RFC3339 "start,end"; open ends allowed ("start,", ",end").
    Returns 422 on invalid input.
  • limit β€” max rows to return (default: 1000)

Examples

export BASE=http://localhost:8080
# by device
$ curl "$BASE/sql/readings?device=device-001&limit=10"

# by mesh
$ curl "$BASE/sql/readings?mesh=mesh-001&limit=10"

# by timestamp range (inclusive)
$ curl "$BASE/sql/readings?timestamp_range=2025-03-21T00:00:00Z,2025-03-21T12:00:00Z"

# Invalid timestamp_range β†’ 422 with JSON error
$ curl -i "$BASE/sql/readings?timestamp_range=not-a-timestamp"
HTTP/1.1 422 Unprocessable Entity
content-type: application/json
content-length: 119
date: Sat, 09 Aug 2025 00:03:53 GMT

{"error":"invalid timestamp_range","hint":"use RFC3339 \"start,end\" (e.g. 2025-03-21T00:00:00Z,2025-03-22T00:00:00Z)"}

πŸ“‘ Input Dataset

Sensor data is served via a paginated API running in Docker:

GET /sensor-data

Each response includes up to 100 records and a next_cursor. Continue paging until next_cursor is absent.

Example record:

{
  "mesh_id": "mesh-001",
  "device_id": "device-A",
  "timestamp": "2025-03-26T13:45:00Z",
  "temperature_c": 22.4,
  "humidity": 41.2,
  "status": "ok"
}

πŸ§ͺ Processing Pipeline

  1. Fetch & Store Raw Data

    • Ingest paginated API data and store all records in PostgreSQL with a normalized schema
  2. Transform

    • Store temperature in temperature_c only

    • Store timestamps in UTC (timezone conversion handled in frontend)

    • Flag anomalies:

      • temperature_alert if < -10Β°C or > 60Β°C
      • humidity_alert if < 10% or > 90%
      • Optional: flag non-"ok" statuses
  3. Aggregate by mesh_id

    • Compute average temperature, average humidity, and count of readings
  4. Store Summary

    • Serve transformed and aggregated records via GET /sql/readings
  5. Expose Data API

    • Serve transformed records via GET /sql/readings

πŸ“ Project Structure

  • src/ β€” Rust backend source code
  • api/ β€” Mock data API (Python + FastAPI)
  • docker-compose.yml β€” Full local test environment
  • .cargo/audit.toml β€” Advisory exceptions for secure builds

🧱 Tech Stack

  • πŸ¦€ Rust
  • 🐘 PostgreSQL
  • 🐳 Docker Compose
  • πŸ“Š JSON API ingestion
  • πŸ” Secure API access (simulated)
  • πŸš€ Axum web framework (Rust)

βœ… Design and Implementation Goals

  • Code correctness and clarity
  • Modular and testable architecture
  • Docker setup and environment handling
  • PostgreSQL schema design and data integrity
  • Clean API design and filtering
  • Git hygiene and documentation

🧭 Design decisions

Timezone handling (UTC-only)

Original requirement: Store both UTC and EST.
Decision: Store UTC only; convert on the client.

Why: avoids redundancy/DST bugs, follows industry practice, supports global users.

Frontend example

const utc = sensor.timestamp_utc;               // e.g., "2025-03-21T00:00:00Z"
const local = new Date(utc).toLocaleString();   // renders in user's local timezone

Temperature units (Celsius-only)

Original requirement: Store temperature_c and temperature_f
Decision: Store/return Celsius only; clients convert to Β°F if needed.

Why: storing both is redundant and can drift (same smell as UTC+EST).

Client conversion example

const c = sensor.temperature_c;
const f = c * 9/5 + 32;

If consumers want server-side conversions without duplicating state, we can add presentation params (?units=imperial) or expose derived values via views/generated columns.

Ingest-once fast path

GET /sql/readings ingests from upstream only when the DB is empty, then serves from Postgres. Subsequent calls/tests are sub-second (~0.11s).

Validation & errors

  • timestamp_range must be RFC3339 "start,end" (open ends allowed: "start,", ",end").
  • Invalid input returns 422 with JSON { "error", "hint" }.

⚑ Performance

Database Optimization

  • Targeted SQL queries with database-level filtering
  • Strategic indexing: single-column (device_id, mesh_id, timestamp_utc) and composite indexes
  • PostgreSQL query planner automatically selects optimal indexes
  • Result: ~0.11s response times for filtered queries

Caching Strategy

  • Ingest-once pattern: data loaded on first request, cached in PostgreSQL
  • Subsequent API calls serve directly from database without re-ingestion
  • Memory-efficient: no in-memory filtering of large datasets

Testing

  1. Start the full environment:
export BASE=http://localhost:8080
docker compose up --build -d
  1. Run all tests:
cargo test
    Finished `release` profile [optimized] target(s) in 0.22s
     Running unittests src/main.rs (target/release/deps/sensorflow-data-pipeline-ac928a43cce04ee7)

running 8 tests
test models::tests::data_preservation ... ok
test models::tests::humidity_alerts ... ok
test models::tests::temperature_alerts ... ok
test models::tests::utc_timestamp_preserved ... ok
test routes::get_readings::tests::parses_full_range_and_trims ... ok
test routes::get_readings::tests::parses_open_start ... ok
test routes::get_readings::tests::rejects_missing_comma ... ok
test routes::get_readings::tests::rejects_reversed_range ... ok

test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/integration_test.rs (target/release/deps/integration_test-135dda86e65697c6)

running 3 tests
test filters_work_end_to_end ... ok
test timestamp_range_bad_returns_422 ... ok
test readings_endpoint_transforms_ok ... ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.11s

# To run just integration tests (keeps unit output quiet)
cargo test --test integration_test -- --nocapture

# Tail backend logs during tests
docker compose logs -f sensor-api

# For versbose logging
RUST_LOG=info,sqlx::query=warn docker compose up -d --build

Integration tests run sequentially to avoid overlapping the initial ingest on /sql/readings. The first run ingests and caches data in Postgres taking about 10s; subsequent runs are fast (~0.11s).

⚠️ Security Advisory Note

cargo audit reports a medium-severity advisory (RUSTSEC-2023-0071) affecting the rsa crate.

  • This crate is not used by this project directly. It is pulled in indirectly via sqlx-mysql, which is a transitive dependency of sqlx-macros-core β€” even though only the Postgres backend is enabled in Cargo.toml.
  • At this time, SQLx 0.8.x compiles all database backends unconditionally within its macros crate. This behavior is tracked upstream.
  • βœ… This application does not link to MySQL, does not use the rsa crate at runtime, and is not affected by the vulnerability.
  • The advisory is explicitly ignored via .cargo/audit.toml with rationale documented here.

πŸ“„ License

MIT License. See LICENSE for details.


✍️ Author

John Basrai john@basrai.dev

About

ETL pipeline for time-series sensor data with PostgreSQL, strategic indexing for sub-second queries (~0.11s), real-time anomaly detection, cursor-based pagination, and Docker orchestration

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published