A professional-grade, agentic blueprint for enterprise-level Text-to-SQL analytics, built on the Open WebUI Pipelines framework. This system uses LangGraph to orchestrate specialized LLM agents that generate safe SQL, query enterprise databases, run higher-level analytics (trends/anomalies), and return a structured response with optional matplotlib visualizations.
Architecture Overview
- Host Framework: Open WebUI Pipelines
- Orchestration: LangGraph (Router → Experts → Aggregation)
- Pipeline Type: Manifold (Exposes multiple model IDs like
gpt-4o-mini,o3-mini) - Database: Agnostic (Oracle, PostgreSQL, SQL Server via SQLAlchemy)
Who this is for
- AI Engineers building agentic analytics over enterprise databases
- Data/Platform teams that want a modular Text-to-SQL stack with guardrails
- Anyone who wants a clean reference architecture for Router → Experts → Aggregation → Visualization
- Key Concepts
- High-Level Architecture
- Repository Layout
- Open WebUI Integration
- Execution Flow: What Happens on a Query
- Agents
- State Model
- SQL Safety & Guardrails
- Visualization & Output Contracts
- Configuration
- Local Setup
- Running the App
- Testing
- Docker
- Examples
- Extending the System
- Troubleshooting
- Contributing
- License
This project is implemented as a Manifold Pipeline. Unlike a standard pipe, a manifold allows a single backend script to register multiple models in the Open WebUI interface. In this case, it exposes different LLM "brains" for the same SQL analytics engine.
The system uses a Router to classify user intent and dispatch the query to one specialized Expert Agent (or sometimes multiple, depending on your graph design). Each expert is optimized for a particular domain:
- Audit trail / activity logs
- Error log analysis (ELMAH-style)
- Trends / time-series analytics
- Anomaly detection
LangGraph models the workflow as a state machine:
- Each node reads/writes to a shared
AgentState - Conditional edges route to the right expert agent
- You can add loops (repair/refine) when SQL fails or outputs are incomplete
graph TD
UI[Open WebUI Interface] --> Gateway[FastAPI Pipeline Gateway]
Gateway --> Pipeline[StaffConnect Manifold Pipeline]
subgraph LangGraph Orchestrator
Pipeline --> Router{Intent Router}
Router -- "Audit" --> AA[Audit Trail Agent]
Router -- "Errors" --> EA[ELMAH Agent]
Router -- "Trends" --> TA[Trend Agent]
Router -- "Anomaly" --> ANA[Anomaly Agent]
AA & EA & TA & ANA --> Collector[Unified Context Collector]
end
Collector --> Viz[Visualization Engine]
Viz --> UI
Top-level folders and files in this repo:
.github/ # CI workflows and repo automation
blueprints/ # Reusable LangGraph blueprints / templates
docs/ # Documentation notes / design references
examples/ # Example queries, scripts, or sample runs
pipelines/ # Core agent graph(s) + agent implementations
staffconnect_chat.py # Main Open WebUI Pipeline entry point
staffconnect_chat_files/# Agent logic, chains, and LangGraph definitions
common_files/ # Modularized utilities (SQL, Viz, LLM, Logging)
tests/ # Unit/integration tests
utils/pipelines/ # Framework-level helpers (Auth, Misc, Stream)
config.py # Central configuration loader (env, flags)
schemas.py # Pydantic / typed schemas used across agents
main.py # Entry point (FastAPI Gateway serving the pipelines)
docker-compose.yaml # Container orchestration
Dockerfile # Container build
env.example # Example env vars
dev.sh / start.sh # Convenience scripts
CONTRIBUTING.md # Contribution guide
LICENSE # MIT license
If you’re new to the codebase, read in this order:
main.py→ app entry, graph build, execution looppipelines/staffconnect_chat.py→ Open WebUI Pipeline entry & Gateway wiringpipelines/staffconnect_chat_files/→ router + agent nodes + graph wiringschemas.py→ state + response contractspipelines/common_files/→ SQL cleaning, execution, chart creationtests/→ what “correct behavior” means
Configuration is handled via Open WebUI Valves. You can set these directly in the WebUI settings:
DATABASE_URL: Generic SQLAlchemy connection string.OPENAI_API_KEY: LLM access.CHART_DIRECTORY_STAFFCONNECT: Path where generated analytic plots are stored.
The main.py file at the root acts as a standalone FastAPI server that is OpenAI-API compatible. You can connect Open WebUI to this gateway to host your custom pipelines.
This is the conceptual runtime pipeline (matches the repo’s structure and file responsibilities):
- User query enters the system: (CLI loop, API layer in
main.py, or via Open WebUI). - State is initialized: (
AgentState) with raw user input, conversation history (optional), and db connection settings. - Router node runs: Classifies intent (audit/errors/trends/anomaly/general sql) and writes
state.route = <agent_name>. - Expert agent runs: Loads schema context, generates SQL or analysis plan, executes SQL safely, and returns structured data + explanation.
- Collector node aggregates: Normalizes outputs into one common contract and attaches metadata (timing, rows, warnings).
- Visualization node optionally runs: If agent output requests a plot (or data looks plottable), it creates chart images / base64 payload.
- Final formatter returns: human-readable answer + machine-readable JSON payload + optional chart payload.
This repo’s modular design supports 4 core expert agents.
Intent: audit/activity investigation Typical outputs:
- top events by user/action/time window
- suspicious access patterns
- drill-down by entity IDs
Intent: error logs / failures / stack traces Typical outputs:
- error frequency trends
- top exception types + root cause hints
- correlation with deployments/time windows
Intent: time-series / KPI style analytics Typical outputs:
- daily/weekly aggregation
- moving averages
- seasonality hints
- plot-ready series
Intent: detecting outliers Typical outputs:
- z-score/IQR based outliers
- abrupt shifts
- rare event spikes
- “why flagged” explanations
Implementation note (recommended pattern)
Each agent should implement a consistent interface like run(state: AgentState) -> AgentState or invoke(state) -> dict then merged into state. Refer to BaseAgent in pipelines/staffconnect_chat_files/base_agent.py.
State is the backbone of LangGraph workflows. Refer to schemas.py for the actual schema.
query: User inputroute: Router decisionmessages: Conversation history (optional)sql: Generated SQLparams: Bound parameters (if supported)rows: Query results (normalized)analysis: Computed analytics (trend/anomaly)chart: Chart payload (path/base64/bytes)errors: Error stack / LLM failureswarnings: Safety filters triggeredmetadata: Timing, rowcount, tokens, model name, etc.
Text-to-SQL systems fail in predictable ways; this repo includes tests aimed at SQL cleaning + LLM parsing.
- Read-only by default: allow only
SELECT. - Block keywords:
DROP,TRUNCATE,ALTER,GRANT,REVOKE, etc. - Enforce row limits: (
FETCH FIRST N ROWS ONLY). - Enforce schema/table allowlist: (optional).
- Parameterize values: whenever possible.
- Validate against known schema: (table/column existence).
If SQL fails:
- capture DB error (
ORA-xxxxx). - feed error back to the agent (repair prompt).
- regenerate SQL with constraints.
- retry with capped attempts.
- Agent returns data + suggested plot type (line/bar/hist).
- Viz layer generates matplotlib chart.
- Chart is returned inline (base64) or saved to disk (path) depending on configuration in Valves.
Your final response should be both:
- Human readable: explanation + key insights.
- Machine readable: JSON including route used, SQL executed, rows summary, chart payload, and warnings/errors.
Configuration is centralized in config.py and Open WebUI Valves.
# LLM
OPENAI_API_KEY=...
OPENAI_MODEL=...
# Database (SQLAlchemy URL)
DATABASE_URL=your_database_url_here
# Runtime
LOG_LEVEL=INFO
MAX_ROWS=500
SQL_READ_ONLY=trueenv.example is the canonical list—keep it updated as you add features.
- Python 3.11+
- Appropriate Database Drivers (e.g., Oracle Instant Client, Psycopg2 for Postgres, PyODBC for SQL Server)
- An OpenAI API Key
git clone https://github.com/s8m21/Multi-Agent-SQL-Pipeline.git
cd Multi-Agent-SQL-Pipeline
pip install -r requirements.txtcp env.example .env
# edit .env with your values- Oracle:
oracle+cx_oracle://user:password@host:port/?service_name=service - PostgreSQL:
postgresql://user:password@host:port/dbname - SQL Server:
mssql+pyodbc://user:password@host:port/dbname?driver=ODBC+Driver+17+for+SQL+Server
python main.pyAdd http://localhost:9099 to Open WebUI as an OpenAI connection.
For testing purposes, you can execute individual agent scripts or the main entry.
A test suite is included in tests/.
pytestFocus on: SQL cleaning functions, Router classification, and Repair loop behavior.
- Build:
docker build -t staffconnect-sql-pipeline . - Compose:
docker compose up --build
Check examples/ for runnable demo scripts or sample prompts.
- "Show top 10 users by failed logins last 7 days"
- "Plot error count by day for the last 30 days"
- Add a Agent: Create in
pipelines/staffconnect_chat_files/, implementrun(state), and add to Router. - Add a Tool: Place in
pipelines/common_files/and keep it pure and safe-by-default.
- Database client: Ensure the appropriate client library (e.g., Oracle Instant Client) is in your PATH.
- LLM parsing: Use strict output prompts or JSON repair passes.
- SQL failures: Feed database-specific error codes back to the repair loop.
See CONTRIBUTING.md.
MIT. See LICENSE.