DQ-Flow is a lightweight, production-style data quality framework that validates transactional data.
Run the full data quality pipeline end-to-end:
python3 dq_flow/runner.pyIn regulated environments (finance, trading, credit risk, etc.), "bad data" cannot be allowed to flow into reporting, dashboards, or regulatory submissions. DQ-Flow acts as a gate: it scans incoming data, flags issues, and produces a traceable, auditable record of data quality.
This is the type of control that risk, compliance, audit, and data governance teams expect in mature data orgs.
-
Data ingestion and normalization
Loads raw transaction data and FX reference data, normalizes types, parses timestamps, and standardizes currency codes. -
Deterministic data quality checks
Runs rule-based validation such as:amount_positive: amount must be > 0valid_timestamp: timestamps must parsecurrency_supported: currency must exist in approved FX tablefx_mapped: all foreign currency trades must have an FX mappingno_null_trade_id: trade IDs cannot be missing
-
Automated anomaly / outlier surfacing (extensible)
Framework supports adding statistical or ML-driven anomaly checks (e.g. IsolationForest, z-score) for suspicious spikes. -
Audit logging & governance trail
Every pipeline run is written to a local SQLite database (dq_audit.db). For each check, the system records:- run_id (timestamped batch ID)
- which rule ran
- status (PASS/FAIL)
- number of impacted rows
- sample IDs of bad records
- UTC timestamp
This simulates the type of evidence compliance and audit teams ask for during reviews.
-
Human-readable & machine-readable reporting
Each run generates a JSON report inreports/with:- total rows scanned
- list of all checks
- failed row counts
- severity levels
- generation timestamp
- Ingest raw data from
data/transactions_raw.csvand FX mappings fromdata/fx_rates.csv. - Normalize and standardize the data (
dq_flow/ingest.py). - Run all validation checks (
dq_flow/validators.py). - Generate a structured data quality report (
dq_flow/runner.py→reports/). - Persist the full audit trail to SQLite for traceability (
dq_flow/db.py).
dq-flow/
├── dq_flow/
│ ├── __init__.py
│ ├── ingest.py # data loading + normalization
│ ├── validators.py # data quality rules
│ ├── anomaly.py # placeholder for advanced anomaly detection
│ ├── db.py # audit log persistence (SQLite)
│ └── runner.py # pipeline orchestrator
├── data/
│ ├── transactions_raw.csv
│ └── fx_rates.csv
├── reports/
│ └── dq_report_<timestamp>.json
├── requirements.txt
├── .gitignore
└── README.md
## 📊 Results Snapshot
Below is a sample output generated by **DQ-Flow** after scanning a dataset of 10 records.
The report summarizes each validation check, the number of failed rows, and overall data health.
```json
{
"run_id": "20251025_011916",
"scanned_rows": 10,
"checks": [
{"check_name": "no_null_trade_id", "status": "PASS", "failed_rows": 0},
{"check_name": "amount_positive", "status": "FAIL", "failed_rows": 2},
{"check_name": "valid_timestamp", "status": "FAIL", "failed_rows": 1},
{"check_name": "currency_supported", "status": "FAIL", "failed_rows": 1},
{"check_name": "fx_mapped", "status": "FAIL", "failed_rows": 1}
],
"generated_at_utc": "2025-10-25T01:19:16.239369"
}
