Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ Embeds text and stores vectors locally.

Matches controls to supporting evidence via RAG, then enriches with LLM analysis.

- **`mapper.py`** — Core mapping: query expansion → vector search → min-score filtering
- **`enrichment.py`** — Orchestrates the 5-step LLM pipeline: relevance filter → rationale → meta-classify → gap → resolve. Entry point: `enrich_with_rationale()`
- **`mapper.py`** — Core mapping: query expansion → batch embedding → vector search → min-score filtering (`top_k=5`, `min_score=0.50`)
- **`enrichment.py`** — Streaming per-control async pipeline: merged relevance+rationale → meta-classify (7B, only unmapped) → gap rationale (7B, async) → resolve. Dual-model architecture: 14B for accuracy-critical evaluation, 7B for simple tasks.
- **`meta_requirements.py`** — Governance/documentation meta-requirement classification
- **`map_command.py`** — CLI wiring + format dispatch via `_FORMAT_REGISTRY`
- **`map_command.py`** — CLI wiring + format dispatch via `_FORMAT_REGISTRY`. Supports `--concurrency` and `--cache` flags.
- **`expansion_map.json`** — Domain synonym data for query expansion

**Performance:** Merged relevance+rationale prompt halves LLM calls per chunk. Model tiering (14B rationale, 7B meta/gap) doubles throughput for simple tasks. Batch embedding, concurrent `asyncio`, and optional SQLite cache (`--cache`) provide further gains. Embedder uses `@functools.cache` to share the model across pipeline stages.

**Output:** `MappedResult` objects with rationales.

### 4. Harmonize (`ctrlmap.map.cluster`)
Expand All @@ -61,14 +63,15 @@ Deduplicates overlapping controls across frameworks.
|--------|----------------|
| `ctrlmap.models.schemas` | Pydantic V2 data models (`ParsedChunk`, `SecurityControl`, `MappedResult`, etc.) |
| `ctrlmap.models.oscal` | OSCAL JSON catalog parser |
| `ctrlmap.llm.client` | Ollama client (connection handling, prompt formatting) |
| `ctrlmap.llm.client` | Ollama client with async support, transparent cache integration |
| `ctrlmap.llm.structured_output` | LLM response → `MappingRationale \| InsufficientEvidence` |
| `ctrlmap.llm._json_utils` | Shared JSON extraction utilities for LLM responses |
| `ctrlmap.llm.prompts/` | Externalized prompt templates (`.txt` files) |
| `ctrlmap.llm.cache` | SQLite-backed LLM response cache (wired into `call_llm_async`) |
| `ctrlmap.llm.prompts/` | Externalized prompt templates (`.txt` files) including merged relevance+rationale |
| `ctrlmap.export.*` | Output formatters (CSV, Markdown, OSCAL, HTML) |
| `ctrlmap.eval_command` | CLI subcommand for the RAG evaluation harness |
| `ctrlmap.eval_ragas` | RAGAS integration for retrieval quality metrics |
| `ctrlmap._defaults` | Centralized default constants (model names) |
| `ctrlmap._defaults` | Centralized default constants (`DEFAULT_LLM_MODEL`, `DEFAULT_FAST_MODEL`) |
| `ctrlmap._console` | Shared Rich console instances |

## Data Flow
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,5 @@ dev = [
"pre-commit>=4.0.0",
"mkdocs-material>=9.0.0",
"mkdocstrings[python]>=0.24.0",
"pytest-asyncio>=1.3.0",
]
7 changes: 5 additions & 2 deletions src/ctrlmap/_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
"""

DEFAULT_LLM_MODEL: str = "qwen2.5:14b"
"""Default Ollama model name used for rationale generation, control
extraction, and meta-requirement classification."""
"""Default Ollama model for accuracy-critical compliance evaluation."""

DEFAULT_FAST_MODEL: str = "qwen2.5:7b"
"""Smaller Ollama model for simpler LLM tasks (meta-classification,
gap rationale, control extraction). Runs ~2x faster than the 14B model."""

DEFAULT_EMBEDDING_MODEL: str = "all-MiniLM-L6-v2"
"""Default Sentence-Transformers model for local text vectorization."""
4 changes: 2 additions & 2 deletions src/ctrlmap/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""

from ctrlmap.index.embedder import Embedder
from ctrlmap.index.query import QueryResult, query
from ctrlmap.index.query import QueryResult, query, query_by_embedding
from ctrlmap.index.vector_store import VectorStore

__all__ = ["Embedder", "QueryResult", "VectorStore", "query"]
__all__ = ["Embedder", "QueryResult", "VectorStore", "query", "query_by_embedding"]
16 changes: 15 additions & 1 deletion src/ctrlmap/index/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,37 @@

from __future__ import annotations

import functools
from typing import cast

from sentence_transformers import SentenceTransformer

from ctrlmap._defaults import DEFAULT_EMBEDDING_MODEL


@functools.cache
def _load_model(model_name: str) -> SentenceTransformer:
"""Load a SentenceTransformer model (cached per model name).

First call loads the model (~1-2s); subsequent calls return
the cached instance immediately.
"""
return SentenceTransformer(model_name)


class Embedder:
"""Local embedding pipeline backed by Sentence-Transformers.

Args:
model_name: The Sentence-Transformers model to load.
Defaults to ``all-MiniLM-L6-v2`` (lightweight, CPU-friendly).

The underlying model is cached per ``model_name`` and shared across
all ``Embedder`` instances in the same process.
"""

def __init__(self, model_name: str = DEFAULT_EMBEDDING_MODEL) -> None:
self._model = SentenceTransformer(model_name)
self._model = _load_model(model_name)

def embed_text(self, text: str) -> list[float]:
"""Embed a single text string into a float vector.
Expand Down
61 changes: 61 additions & 0 deletions src/ctrlmap/index/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,64 @@ def query(
)

return query_results


def query_by_embedding(
*,
store: VectorStore,
collection_name: str,
embedding: list[float],
top_k: int = 5,
filters: dict[str, str] | None = None,
) -> list[QueryResult]:
"""Perform an ANN similarity search using a pre-computed embedding.

This avoids recomputing the embedding when the caller has already
embedded the query text (e.g. via :meth:`Embedder.embed_batch`).

Args:
store: The VectorStore instance to query.
collection_name: Name of the ChromaDB collection to search.
embedding: Pre-computed embedding vector.
top_k: Maximum number of results to return (default: 5).
filters: Optional metadata filters (AND logic).

Returns:
A list of ``QueryResult`` objects sorted by similarity (descending).
"""
collection = store.get_or_create_collection(collection_name)

# Build ChromaDB where clause from filters
where: dict[str, Any] | None = None
if filters:
conditions: list[dict[str, Any]] = [{k: {"$eq": v}} for k, v in filters.items()]
where = conditions[0] if len(conditions) == 1 else {"$and": conditions}

results = collection.query(
query_embeddings=[embedding], # type: ignore[arg-type]
n_results=top_k,
where=where,
include=["documents", "metadatas", "distances"],
)

query_results: list[QueryResult] = []

ids = (results.get("ids") or [[]])[0]
documents = (results.get("documents") or [[]])[0]
metadatas = (results.get("metadatas") or [[]])[0]
distances = (results.get("distances") or [[]])[0]

for i, chunk_id in enumerate(ids):
distance = distances[i] if distances else 0.0
score = max(0.0, 1.0 - distance)

query_results.append(
QueryResult(
chunk_id=chunk_id,
raw_text=documents[i] if documents else "",
score=score,
metadata=dict(metadatas[i]) if metadatas else {},
)
)

return query_results
99 changes: 99 additions & 0 deletions src/ctrlmap/llm/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Content-addressable LLM response cache.

Stores LLM responses keyed by SHA-256(model + prompt) in a SQLite
database for fast retrieval during iterative development cycles.

Usage::

cache = LLMCache(cache_dir=Path(".ctrlmap_cache"))
cached = cache.get(model="llama3", prompt="...")
if cached is None:
response = llm_call(...)
cache.put(model="llama3", prompt="...", response=response)
"""

from __future__ import annotations

import hashlib
import sqlite3
from pathlib import Path


class LLMCache:
"""SQLite-backed LLM response cache.

Args:
cache_dir: Directory for the SQLite database file.
"""

def __init__(self, cache_dir: Path) -> None:
cache_dir.mkdir(parents=True, exist_ok=True)
self._db_path = cache_dir / "llm_cache.db"
self._conn = sqlite3.connect(str(self._db_path))
self._conn.execute(
"CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, response TEXT NOT NULL)"
)
self._conn.commit()
self._hits = 0
self._misses = 0

@staticmethod
def _make_key(model: str, prompt: str) -> str:
"""Generate a SHA-256 cache key from model + prompt.

Args:
model: The LLM model name.
prompt: The full prompt text.

Returns:
A hex-encoded SHA-256 digest.
"""
return hashlib.sha256(f"{model}::{prompt}".encode()).hexdigest()

def get(self, *, model: str, prompt: str) -> str | None:
"""Look up a cached response.

Args:
model: The LLM model name.
prompt: The full prompt text.

Returns:
The cached response string, or ``None`` on a miss.
"""
key = self._make_key(model, prompt)
row = self._conn.execute("SELECT response FROM cache WHERE key = ?", (key,)).fetchone()
if row is not None:
self._hits += 1
return str(row[0])
self._misses += 1
return None

def put(self, *, model: str, prompt: str, response: str) -> None:
"""Store a response in the cache.

Args:
model: The LLM model name.
prompt: The full prompt text.
response: The LLM response to cache.
"""
key = self._make_key(model, prompt)
self._conn.execute(
"INSERT OR REPLACE INTO cache (key, response) VALUES (?, ?)",
(key, response),
)
self._conn.commit()

def clear(self) -> None:
"""Remove all cached entries."""
self._conn.execute("DELETE FROM cache")
self._conn.commit()
self._hits = 0
self._misses = 0

def stats(self) -> dict[str, int]:
"""Return cache hit/miss statistics.

Returns:
A dict with ``hits`` and ``misses`` counts.
"""
return {"hits": self._hits, "misses": self._misses}
Loading