Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b92672e
ci: use vertex model
nicoloboschi Feb 20, 2026
9371580
fix: allow vertexai provider without API key requirement
nicoloboschi Feb 20, 2026
6cfad76
fix: skip upgrade tests when using vertexai provider
nicoloboschi Feb 20, 2026
0726e17
fix: allow vertexai provider in embed smoke test
nicoloboschi Feb 20, 2026
41eac88
fix: skip API key check for vertexai in embed CLI command forwarding
nicoloboschi Feb 20, 2026
5ff6217
fix(ci): add GCP credentials setup step to test-api job
nicoloboschi Feb 20, 2026
9e73861
fix: support vertexai in LLMProvider factory methods and fix ADC test
nicoloboschi Feb 20, 2026
891777c
fix(ci): fix remaining test failures for GCP Vertex AI CI
nicoloboschi Feb 20, 2026
f08813b
fix(ci): stabilize flaky tests for Gemini-flash-lite and CI environment
nicoloboschi Feb 20, 2026
509753e
fix(ci): fix test isolation and skip SeaweedFS tests in CI
nicoloboschi Feb 20, 2026
40df4f4
fix(ci): fix remaining test failures
nicoloboschi Feb 20, 2026
255c63b
revert: simplify language instruction in fact extraction prompts
nicoloboschi Feb 20, 2026
7f2426e
refactor: add requires_api_key() to llm_wrapper and revert xfail markers
nicoloboschi Feb 20, 2026
ea53f55
refactor(embed): use shared PROVIDER_DEFAULT_MODELS map in cli.py
nicoloboschi Feb 20, 2026
f7dfcee
refactor(embed): use get_default_model_for_provider() instead of mirr…
nicoloboschi Feb 20, 2026
6db9dba
fix: address CI test failures with real root-cause fixes
nicoloboschi Feb 20, 2026
d9be116
fix: more CI test fixes and infrastructure improvements
nicoloboschi Feb 20, 2026
179bd9c
fix: strengthen directive and language handling in reflect
nicoloboschi Feb 20, 2026
5788bec
ci: add HuggingFace pre-download and increase timeout for client/CLI …
nicoloboschi Feb 20, 2026
1ac915f
fix(tests): add wait_for_background_tasks and fix directive isolation…
nicoloboschi Feb 20, 2026
731a05f
fix: global directives always apply in tagged reflect, improve multil…
nicoloboschi Feb 20, 2026
ee2f39d
fix(tests/agent): force search_mental_models first, relax model-depen…
nicoloboschi Feb 20, 2026
9db4682
fix: implement Gemini tool_choice support and use it to force search_…
nicoloboschi Feb 20, 2026
84bb6ac
fix: proper Gemini multi-turn history and language directive priority
nicoloboschi Feb 20, 2026
16e3991
fix(ci): increase client timeout and handle Gemini JSON control chara…
nicoloboschi Feb 20, 2026
87c92c5
fix(ci): fix consolidation JSON control chars and improve recall fall…
nicoloboschi Feb 20, 2026
a1668ad
refactor: centralize LLM JSON parsing, fix tags_match bug, remove tem…
nicoloboschi Feb 20, 2026
bec8d94
test: enable SeaweedFS S3 tests in CI
nicoloboschi Feb 20, 2026
20dcd39
fix: raise on malformed tool call args instead of silently using empt…
nicoloboschi Feb 20, 2026
d129495
feat(reflect): enforce search_observations then recall() when no ment…
nicoloboschi Feb 20, 2026
816486a
refactor: clean up consolidation pipeline and reflect agent
nicoloboschi Feb 20, 2026
a1726d3
fix: consolidation MemoryFact mapping error, directive tag isolation,…
nicoloboschi Feb 20, 2026
0647510
fix(gemini): group consecutive tool responses into a single Content f…
nicoloboschi Feb 20, 2026
5a48472
fix: add Gemini HTTP timeout, cap reflect consecutive errors, increas…
nicoloboschi Feb 20, 2026
db99d74
fix: use asyncio.wait_for(90s) instead of http_options timeout, fix f…
nicoloboschi Feb 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 283 additions & 76 deletions .github/workflows/test.yml

Large diffs are not rendered by default.

20 changes: 18 additions & 2 deletions docker/test-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ else
fi

# Check for required environment variables
if [ "$NEEDS_LLM" = true ] && [ -z "${HINDSIGHT_API_LLM_API_KEY:-}" ]; then
if [ "$NEEDS_LLM" = true ] && [ "$LLM_PROVIDER" != "vertexai" ] && [ -z "${HINDSIGHT_API_LLM_API_KEY:-}" ]; then
echo -e "${RED}Error: HINDSIGHT_API_LLM_API_KEY environment variable is required for API/standalone images${NC}"
echo "Set it with: export HINDSIGHT_API_LLM_API_KEY=your-api-key"
exit 2
Expand Down Expand Up @@ -123,9 +123,25 @@ else
# Build docker run command with required and optional env vars
DOCKER_CMD="docker run -d --name $CONTAINER_NAME"
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_PROVIDER=$LLM_PROVIDER"
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_API_KEY=${HINDSIGHT_API_LLM_API_KEY}"
if [ -n "${HINDSIGHT_API_LLM_API_KEY:-}" ]; then
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_API_KEY=${HINDSIGHT_API_LLM_API_KEY}"
fi
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_MODEL=$LLM_MODEL"

# Add Vertex AI config if provider is vertexai
if [ "$LLM_PROVIDER" = "vertexai" ]; then
if [ -n "${HINDSIGHT_API_LLM_VERTEXAI_SERVICE_ACCOUNT_KEY:-}" ]; then
DOCKER_CMD="$DOCKER_CMD -v ${HINDSIGHT_API_LLM_VERTEXAI_SERVICE_ACCOUNT_KEY}:/tmp/gcp-credentials.json:ro"
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_VERTEXAI_SERVICE_ACCOUNT_KEY=/tmp/gcp-credentials.json"
fi
if [ -n "${HINDSIGHT_API_LLM_VERTEXAI_PROJECT_ID:-}" ]; then
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_VERTEXAI_PROJECT_ID=${HINDSIGHT_API_LLM_VERTEXAI_PROJECT_ID}"
fi
if [ -n "${HINDSIGHT_API_LLM_VERTEXAI_REGION:-}" ]; then
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_LLM_VERTEXAI_REGION=${HINDSIGHT_API_LLM_VERTEXAI_REGION}"
fi
fi

# Add optional embeddings provider config
if [ -n "${HINDSIGHT_API_EMBEDDINGS_PROVIDER:-}" ]; then
DOCKER_CMD="$DOCKER_CMD -e HINDSIGHT_API_EMBEDDINGS_PROVIDER=${HINDSIGHT_API_EMBEDDINGS_PROVIDER}"
Expand Down
2 changes: 1 addition & 1 deletion hindsight-api/hindsight_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def normalize_config_dict(config: dict[str, Any]) -> dict[str, Any]:
"groq": "openai/gpt-oss-120b",
"ollama": "gemma3:12b",
"lmstudio": "local-model",
"vertexai": "gemini-2.0-flash-001",
"vertexai": "google/gemini-2.5-flash-lite",
"openai-codex": "gpt-5.2-codex",
"claude-code": "claude-sonnet-4-5-20250929",
"mock": "mock-model",
Expand Down
229 changes: 68 additions & 161 deletions hindsight-api/hindsight_api/engine/consolidation/consolidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel

from ...config import get_config
from ..memory_engine import fq_table
from ..retain import embedding_utils
Expand All @@ -31,10 +33,22 @@

from ...api.http import RequestContext
from ..memory_engine import MemoryEngine
from ..response_models import MemoryFact, RecallResult

logger = logging.getLogger(__name__)


class _ConsolidationAction(BaseModel):
action: str # "update" | "create"
text: str
reason: str = ""
learning_id: str | None = None # required for "update" actions


class _ConsolidationResponse(BaseModel):
actions: list[_ConsolidationAction]


class ConsolidationPerfLog:
"""Performance logging for consolidation operations."""

Expand Down Expand Up @@ -445,8 +459,7 @@ async def _process_memory(
# Find related observations using the full recall system
# SECURITY: Pass tags to ensure observations don't leak across security boundaries
t0 = time.time()
related_observations = await _find_related_observations(
conn=conn,
recall_result = await _find_related_observations(
memory_engine=memory_engine,
bank_id=bank_id,
query=fact_text,
Expand All @@ -462,7 +475,7 @@ async def _process_memory(
actions = await _consolidate_with_llm(
memory_engine=memory_engine,
fact_text=fact_text,
observations=related_observations, # Can be empty list
recall_result=recall_result,
mission=mission,
)
if perf:
Expand All @@ -483,7 +496,7 @@ async def _process_memory(
bank_id=bank_id,
memory_id=memory_id,
action=action,
observations=related_observations,
observations=recall_result.results,
source_fact_tags=fact_tags, # Pass source fact's tags for security
source_occurred_start=memory.get("occurred_start"),
source_occurred_end=memory.get("occurred_end"),
Expand Down Expand Up @@ -537,7 +550,7 @@ async def _execute_update_action(
bank_id: str,
memory_id: uuid.UUID,
action: dict[str, Any],
observations: list[dict[str, Any]],
observations: list["MemoryFact"],
source_fact_tags: list[str] | None = None,
source_occurred_start: datetime | None = None,
source_occurred_end: datetime | None = None,
Expand Down Expand Up @@ -566,28 +579,27 @@ async def _execute_update_action(
return {"action": "skipped", "reason": "missing_learning_id_or_text"}

# Find the observation
model = next((m for m in observations if str(m["id"]) == learning_id), None)
model = next((m for m in observations if m.id == learning_id), None)
if not model:
return {"action": "skipped", "reason": "learning_not_found"}

# Build history entry
history = list(model.get("history", []))
history.append(
# Build history entry (history is fetched fresh from DB on update to avoid stale state)
history = [
{
"previous_text": model["text"],
"previous_text": model.text,
"changed_at": datetime.now(timezone.utc).isoformat(),
"reason": reason,
"source_memory_id": str(memory_id),
}
)
]

# Update source_memory_ids
source_ids = list(model.get("source_memory_ids", []))
source_ids = list(model.source_fact_ids or [])
source_ids.append(memory_id)

# SECURITY: Merge source fact's tags into existing observation tags
# This ensures all contributors can see the observation they contributed to
existing_tags = set(model.get("tags", []) or [])
existing_tags = set(model.tags or [])
source_tags = set(source_fact_tags or [])
merged_tags = list(existing_tags | source_tags) # Union of both tag sets
if source_tags and source_tags != existing_tags:
Expand Down Expand Up @@ -723,13 +735,12 @@ async def _create_memory_links(


async def _find_related_observations(
conn: "Connection",
memory_engine: "MemoryEngine",
bank_id: str,
query: str,
request_context: "RequestContext",
tags: list[str] | None = None,
) -> list[dict[str, Any]]:
) -> "RecallResult":
"""
Find observations related to the given query using optimized recall.

Expand Down Expand Up @@ -774,96 +785,51 @@ async def _find_related_observations(
request_context=request_context,
tags=tags, # Filter by source memory's tags
tags_match=tags_match, # Use strict matching for security
include_source_facts=True, # Embed source facts so we avoid a separate DB fetch
max_source_facts_tokens=-1, # No token limit — we need all source facts for consolidation
_quiet=True, # Suppress logging
)
finally:
if recall_span:
recall_span.end()

# If no observations returned, return empty list
if not recall_result.results:
return []

# Batch fetch all observations in a single query (no artificial limit)
observation_ids = [uuid.UUID(obs.id) for obs in recall_result.results]

rows = await conn.fetch(
f"""
SELECT id, text, proof_count, history, tags, source_memory_ids, created_at, updated_at,
occurred_start, occurred_end, mentioned_at
FROM {fq_table("memory_units")}
WHERE id = ANY($1) AND bank_id = $2 AND fact_type = 'observation'
""",
observation_ids,
bank_id,
)

# Build results list preserving recall order
id_to_row = {row["id"]: row for row in rows}
results = []

for obs in recall_result.results:
obs_id = uuid.UUID(obs.id)
if obs_id not in id_to_row:
continue

row = id_to_row[obs_id]
history = row["history"]
if isinstance(history, str):
history = json.loads(history)
elif history is None:
history = []
return recall_result

# Fetch source memories to include their text and dates
source_memory_ids = row["source_memory_ids"] or []
source_memories = []

if source_memory_ids:
source_rows = await conn.fetch(
f"""
SELECT text, occurred_start, occurred_end, mentioned_at, event_date
FROM {fq_table("memory_units")}
WHERE id = ANY($1) AND bank_id = $2
ORDER BY created_at ASC
LIMIT 5
""",
source_memory_ids[:5], # Limit to first 5 source memories for token efficiency
bank_id,
)

for src_row in source_rows:
source_memories.append(
{
"text": src_row["text"],
"occurred_start": src_row["occurred_start"],
"occurred_end": src_row["occurred_end"],
"mentioned_at": src_row["mentioned_at"],
"event_date": src_row["event_date"],
}
)

results.append(
{
"id": row["id"],
"text": row["text"],
"proof_count": row["proof_count"] or 1,
"tags": row["tags"] or [],
"source_memories": source_memories,
"occurred_start": row["occurred_start"],
"occurred_end": row["occurred_end"],
"mentioned_at": row["mentioned_at"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
)

return results
def _build_observations_for_llm(
observations: "list[MemoryFact]",
source_facts: "dict[str, MemoryFact]",
) -> list[dict[str, Any]]:
"""Serialize MemoryFact observations into dicts for the consolidation LLM prompt."""
obs_list = []
for obs in observations:
obs_data: dict[str, Any] = {
"id": obs.id,
"text": obs.text,
"proof_count": len(obs.source_fact_ids or []) or 1,
"tags": obs.tags or [],
}
if obs.occurred_start:
obs_data["occurred_start"] = obs.occurred_start
if obs.occurred_end:
obs_data["occurred_end"] = obs.occurred_end
if obs.mentioned_at:
obs_data["mentioned_at"] = obs.mentioned_at
source_memories = [
{"text": sf.text, "occurred_start": sf.occurred_start}
for sid in (obs.source_fact_ids or [])[:3]
if (sf := source_facts.get(sid)) is not None
]
if source_memories:
obs_data["source_memories"] = source_memories
obs_list.append(obs_data)
return obs_list


async def _consolidate_with_llm(
memory_engine: "MemoryEngine",
fact_text: str,
observations: list[dict[str, Any]],
recall_result: "RecallResult",
mission: str,
) -> list[dict[str, Any]]:
"""
Expand All @@ -884,40 +850,11 @@ async def _consolidate_with_llm(
- {"action": "create", "text": "...", "reason": "..."}
- [] if fact is purely ephemeral (no durable knowledge)
"""
# Format observations as JSON with source memories and dates
if observations:
obs_list = []
for obs in observations:
obs_data = {
"id": str(obs["id"]),
"text": obs["text"],
"proof_count": obs["proof_count"],
"tags": obs["tags"],
"created_at": obs["created_at"].isoformat() if obs.get("created_at") else None,
"updated_at": obs["updated_at"].isoformat() if obs.get("updated_at") else None,
}

# Include temporal info if available
if obs.get("occurred_start"):
obs_data["occurred_start"] = obs["occurred_start"].isoformat()
if obs.get("occurred_end"):
obs_data["occurred_end"] = obs["occurred_end"].isoformat()
if obs.get("mentioned_at"):
obs_data["mentioned_at"] = obs["mentioned_at"].isoformat()

# Include source memories (up to 3 for brevity)
if obs.get("source_memories"):
obs_data["source_memories"] = [
{
"text": sm["text"],
"event_date": sm["event_date"].isoformat() if sm.get("event_date") else None,
"occurred_start": sm["occurred_start"].isoformat() if sm.get("occurred_start") else None,
}
for sm in obs["source_memories"][:3] # Limit to 3 for token efficiency
]

obs_list.append(obs_data)
observations = recall_result.results
source_facts = recall_result.source_facts or {}

if observations:
obs_list = _build_observations_for_llm(observations, source_facts)
observations_text = json.dumps(obs_list, indent=2)
else:
observations_text = "[]"
Expand All @@ -942,42 +879,12 @@ async def _consolidate_with_llm(
{"role": "user", "content": user_prompt},
]

try:
result = await memory_engine._consolidation_llm_config.call(
messages=messages,
skip_validation=True, # Raw JSON response
scope="consolidation",
)
# Parse JSON response - should be an array
if isinstance(result, str):
# Strip markdown code fences (some models wrap JSON in ```json ... ```)
clean = result.strip()
if clean.startswith("```"):
clean = clean.split("\n", 1)[1] if "\n" in clean else clean[3:]
if clean.endswith("```"):
clean = clean[:-3]
clean = clean.strip()
result = json.loads(clean)
# Ensure result is a list
if isinstance(result, list):
return result
# Handle legacy single-action format for backward compatibility
if isinstance(result, dict):
if result.get("related_ids") and result.get("consolidated_text"):
# Convert old format to new format
return [
{
"action": "update",
"learning_id": result["related_ids"][0],
"text": result["consolidated_text"],
"reason": result.get("reason", ""),
}
]
return []
return []
except Exception as e:
logger.warning(f"Error in consolidation LLM call: {e}")
return []
response: _ConsolidationResponse = await memory_engine._consolidation_llm_config.call(
messages=messages,
response_format=_ConsolidationResponse,
scope="consolidation",
)
return [a.model_dump() for a in response.actions]


async def _create_observation_directly(
Expand Down
Loading