AI-powered code security analyzer.
This service exposes a FastAPI endpoint that accepts a source code file upload, stores a “task” record in a SQLAlchemy-backed database, and processes the file asynchronously. The code is first analyzed using Semgrep (static analysis) and the findings are then passed to Mistral AI for higher-level reasoning, prioritization, and recommendations (with Gemini as fallback).
The system also supports Server-Sent Events (SSE) to notify clients in real time when analysis tasks are completed or failed.
- File upload API for security analysis (
POST /analyze/) - Static analysis using Semgrep
- AI-based enrichment using Mistral AI (with Gemini fallback)
- Async/background processing using Celery + Redis
- Task tracking stored in a SQL database (SQLite supported via
DATABASE_URL) - Results retrieval by
task_id - Real-time SSE notifications for task status updates
- Structured JSON output (severity, vulnerabilities, recommendations, score)
-
Client uploads a source code file via
POST /analyze/ -
API stores the file and creates a
Taskrecord with statuspending -
Background worker processes the task:
- Runs Semgrep on the uploaded file
- Passes Semgrep findings + source context to Mistral AI
- Mistral generates risk assessment, severity, and recommendations (falls back to Gemini if needed)
-
Task status is updated to
completedorfailed -
Clients can:
- Fetch results via REST
- Subscribe to SSE for real-time updates
-
API layer
app.pyapis/analyze_apis.py -
Persistence
db/database.pydb/models/tasks.py -
Background processing
celery_app.pytasks/process_file.py -
Static analysis
semgrep(invoked from the worker) -
AI integration
core/ai.pycore/mistral.pycore/gemini.py -
Notifications SSE endpoint for task status updates
-
Runs first on the uploaded source file
-
Detects:
- Common vulnerabilities
- Insecure patterns
- Language-specific security issues
-
Produces structured findings (rules, locations, severity)
-
Receives:
- Original source code
- Semgrep findings
-
Falls back to Gemini if Mistral is unavailable
-
Performs:
- Contextual reasoning
- Risk prioritization
- False-positive reduction
- Actionable security recommendations
-
Produces a final JSON report
This hybrid approach combines deterministic static analysis with LLM-based reasoning.
POST /analyze/
- Multipart file upload
- Creates a new task with status
pending - Enqueues background processing
GET /analyze/results/{task_id}
Returns:
pending→ analysis in progresscompleted→ full analysis resultfailed→ task failed (no result payload)
GET /analyze/tasks
Returns paginated task metadata (id, filename, status, timestamps).
GET /analyze/events
-
Server-Sent Events stream
-
Emits events when tasks are:
- completed
- failed
Useful for:
- Live dashboards
- Auto-refreshing UIs
- Removing polling on the frontend
{
"analysis_id": "uuid",
"overall_severity": "Low",
"score": 90,
"vulnerabilities": [],
"recommendations": [
"Validate and sanitize all user inputs.",
"Avoid logging sensitive data."
]
}- Python 3.10+ recommended
- Redis (required for Celery background processing)
- Semgrep (CLI must be available)
- Google Gemini API key
Key Python dependencies (see requirements.txt):
fastapi,uvicorncelery,redissqlalchemymistralaigoogle-genaipydantic-settingsaiofilessemgrep
Note: Dependencies have been split into requirements1.txt and requirements1.txt2.txt to avoid dependency conflicts between OpenTelemetry packages and other libraries. Install both files sequentially.
This project uses pydantic-settings and reads environment variables from .env.
Create a .env file in the repo root:
DATABASE_URL=sqlite:///./security_advisor.db
GEMINI_API_KEY=your_gemini_api_key_here
MISTRAL_API_KEY=your_mistral_api_key_here
REDIS_URL=redis://localhost:6379/0
REDIS_CELERY_BROKER=redis://localhost:6379/0- Semgrep provides fast, deterministic static analysis
- Mistral AI adds context-aware reasoning and prioritization (with Gemini fallback)
- Celery + Redis decouple API responsiveness from heavy analysis
- SSE enables real-time UX without polling
- Clean separation between API, analysis, and storage layers
Feel free to modify the system to use any LLM that has mastered coding and vulnerability analysis! The architecture is designed for easy extension:
-
Create a new provider file in
core/(e.g.,core/openai.py,core/claude.py) -
Implement the required function with this signature:
def generate_content_{provider}(instruction: str, context: str) -> dict: # Your LLM API call logic here # Return parsed JSON result or None on failure
-
Add the provider to the model dictionary in [core/ai.py]
model_providers = { "mistral": generate_content_mistral, "gemini": gemini_generate_content, "your_provider": generate_content_your_provider, # Add this }
-
Add API key to config:
# config.py YOUR_PROVIDER_API_KEY: str
-
Update requirements.txt with the necessary SDK
# core/openai.py
from openai import OpenAI
from config import settings
from utils.to_json import to_json
client = OpenAI(api_key=settings.OPENAI_API_KEY)
def generate_content_openai(instruction: str, context: str) -> dict:
try:
content = instruction + "\n\n" + context
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": content}]
)
return to_json(response.choices[0].message.content)
except Exception as e:
logger.error(f"Error generating content: {e}")
return NoneThe system will automatically try all available providers in order and use the first one that succeeds, providing built-in redundancy and fallback support.