Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions hindsight-api/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,15 @@ async def _handle_batch_retain(self, task_dict: dict[str, Any]):
f"[BATCH_RETAIN_TASK] Starting background batch retain for bank_id={bank_id}, {len(contents)} items"
)

# Use internal request context for background tasks (skips tenant auth when schema is pre-set)
# Restore tenant_id/api_key_id from task payload so downstream operations
# (e.g., consolidation and mental model refreshes) can attribute usage.
from hindsight_api.models import RequestContext

internal_context = RequestContext(internal=True)
internal_context = RequestContext(
internal=True,
tenant_id=task_dict.get("_tenant_id"),
api_key_id=task_dict.get("_api_key_id"),
)
await self.retain_batch_async(bank_id=bank_id, contents=contents, request_context=internal_context)

logger.info(f"[BATCH_RETAIN_TASK] Completed background batch retain for bank_id={bank_id}")
Expand All @@ -565,7 +570,13 @@ async def _handle_consolidation(self, task_dict: dict[str, Any]):

from .consolidation import run_consolidation_job

internal_context = RequestContext(internal=True)
# Restore tenant_id/api_key_id from task payload so downstream operations
# (e.g., mental model refreshes) can attribute usage to the correct org.
internal_context = RequestContext(
internal=True,
tenant_id=task_dict.get("_tenant_id"),
api_key_id=task_dict.get("_api_key_id"),
)
result = await run_consolidation_job(
memory_engine=self,
bank_id=bank_id,
Expand Down Expand Up @@ -5458,6 +5469,13 @@ async def submit_async_retain(
task_payload: dict[str, Any] = {"contents": contents}
if document_tags:
task_payload["document_tags"] = document_tags
# Pass tenant_id and api_key_id through task payload so the worker
# can propagate request context to downstream operations (e.g.,
# consolidation and mental model refreshes triggered after retain).
if request_context.tenant_id:
task_payload["_tenant_id"] = request_context.tenant_id
if request_context.api_key_id:
task_payload["_api_key_id"] = request_context.api_key_id

result = await self._submit_async_operation(
bank_id=bank_id,
Expand Down Expand Up @@ -5490,11 +5508,21 @@ async def submit_async_consolidation(
Dict with operation_id
"""
await self._authenticate_tenant(request_context)

# Pass tenant_id and api_key_id through task payload so the worker
# can provide request context to extension hooks (e.g., usage metering
# for mental model refreshes triggered by consolidation).
task_payload: dict[str, Any] = {}
if request_context.tenant_id:
task_payload["_tenant_id"] = request_context.tenant_id
if request_context.api_key_id:
task_payload["_api_key_id"] = request_context.api_key_id

return await self._submit_async_operation(
bank_id=bank_id,
operation_type="consolidation",
task_type="consolidation",
task_payload={},
task_payload=task_payload,
dedupe_by_bank=True,
)

Expand Down
Loading