Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tokenizers
orjson
redis>=5.0.0
xxhash>=3.0.0
lz4>=4.0.0
# Tree-sitter 0.25+ with individual language packages
tree_sitter>=0.25.0
tree_sitter_python>=0.23.0
Expand Down
11 changes: 10 additions & 1 deletion scripts/indexing_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,16 @@ def _copy_repo_state_for_clone(
# Cache with TTL support
_COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {}
_COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps
_COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) # 5 min default
try:
_ttl_raw = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300"))
# Validate TTL is finite and positive
import math
if not math.isfinite(_ttl_raw) or _ttl_raw <= 0:
_COLLECTION_SCHEMA_CACHE_TTL = 300.0
else:
_COLLECTION_SCHEMA_CACHE_TTL = _ttl_raw
except (ValueError, TypeError):
_COLLECTION_SCHEMA_CACHE_TTL = 300.0 # 5 min default
_SNAPSHOT_REFRESHED: Set[str] = set()
_MAPPING_INDEX_CACHE: Dict[str, Any] = {"ts": 0.0, "work_dir": "", "value": {}}

Expand Down
27 changes: 24 additions & 3 deletions scripts/upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,9 +908,13 @@ async def admin_acl_grant(
async def admin_collections_status(request: Request):
_require_admin_session(request)
try:
collections = list_collections(include_deleted=False)
if AUTH_ENABLED:
collections = list_collections(include_deleted=False)
else:
# Demo mode: get collections directly from Qdrant
collections = list_qdrant_collections()
except AuthDisabledError:
raise HTTPException(status_code=404, detail="Auth disabled")
collections = list_qdrant_collections()
except Exception:
raise HTTPException(status_code=500, detail="Failed to load collections")

Expand All @@ -934,7 +938,11 @@ async def event_generator():
break

try:
collections = list_collections(include_deleted=False)
if AUTH_ENABLED:
collections = list_collections(include_deleted=False)
else:
# Demo mode: get collections directly from Qdrant
collections = list_qdrant_collections()
enriched = await asyncio.to_thread(
lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR)
)
Expand All @@ -945,6 +953,19 @@ async def event_generator():
last_data = current_data
yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n"

except AuthDisabledError:
# Fallback to Qdrant collections
try:
collections = list_qdrant_collections()
enriched = await asyncio.to_thread(
lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR)
)
current_data = json.dumps(enriched, default=str)
if current_data != last_data:
last_data = current_data
yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"

Expand Down
14 changes: 6 additions & 8 deletions scripts/watch_index_core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from __future__ import annotations

import hashlib
import logging
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import xxhash

import scripts.ingest_code as idx
from scripts.workspace_state import (
Expand Down Expand Up @@ -128,7 +128,7 @@ def _maybe_handle_staging_file(
if not (is_staging_enabled() and state_env and collection):
return False

_text, file_hash = _read_text_and_sha1(path)
_text, file_hash = _read_text_and_hash(path)
if file_hash:
try:
cached_hash = get_cached_file_hash(str(path), repo_name) if repo_name else None
Expand Down Expand Up @@ -348,17 +348,15 @@ def _process_paths(
logger.debug(f"Suppressed exception: {e}")


def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]:
def _read_text_and_hash(path: Path) -> tuple[Optional[str], str]:
"""Read file text and compute xxhash64 for consistency with pipeline."""
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except Exception:
text = None
if not text:
return text, ""
try:
file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest()
except Exception:
file_hash = ""
file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest()
return text, file_hash


Expand All @@ -378,7 +376,7 @@ def _run_indexing_strategy(
except Exception as e:
logger.debug(f"Suppressed exception: {e}")

text, file_hash = _read_text_and_sha1(path)
text, file_hash = _read_text_and_hash(path)
ok = False
if text is not None:
try:
Expand Down
57 changes: 52 additions & 5 deletions scripts/workspace_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,66 @@ def _redis_retry(fn, retries: int = 2, delay: float = 0.1):
raise last_err # type: ignore


# LZ4 compression for Redis state storage
# Provides ~50-70% memory reduction with minimal CPU overhead
_LZ4_AVAILABLE = False
try:
import lz4.frame as lz4_frame
_LZ4_AVAILABLE = True
except ImportError:
lz4_frame = None # type: ignore

# Prefix to identify LZ4-compressed values in Redis
_LZ4_PREFIX = b"LZ4:"

def _redis_compress(data: bytes) -> bytes:
"""Compress data with LZ4 if available, return with prefix."""
if not _LZ4_AVAILABLE or lz4_frame is None:
return data
try:
compressed = lz4_frame.compress(data, compression_level=0) # fastest
# Only use compression if it actually saves space
if len(compressed) + len(_LZ4_PREFIX) < len(data):
return _LZ4_PREFIX + compressed
return data
except Exception:
return data

def _redis_decompress(data: bytes) -> bytes:
"""Decompress LZ4 data if prefixed, otherwise return as-is."""
if not data:
return data
if data.startswith(_LZ4_PREFIX):
if not _LZ4_AVAILABLE or lz4_frame is None:
logger.warning("LZ4 compressed data found but lz4 not available")
return data
try:
return lz4_frame.decompress(data[len(_LZ4_PREFIX):])
except Exception as e:
logger.debug(f"LZ4 decompress failed: {e}")
return data
return data


def _redis_get_json(kind: str, path: Path) -> Optional[Dict[str, Any]]:
client = _get_redis_client()
if client is None:
return None
key = _redis_key_for_path(kind, path)
try:
raw = _redis_retry(lambda: client.get(key))
# Get raw bytes for decompression
raw = _redis_retry(lambda: client.execute_command("GET", key))
except Exception as e:
logger.debug(f"Redis get failed for {key}: {e}")
return None
if not raw:
return None
try:
obj = json.loads(raw)
# Handle both string and bytes responses
if isinstance(raw, str):
raw = raw.encode("utf-8")
decompressed = _redis_decompress(raw)
obj = json.loads(decompressed.decode("utf-8"))
except Exception as e:
logger.debug(f"Redis JSON decode failed for {key}: {e}")
return None
Expand All @@ -172,12 +218,13 @@ def _redis_set_json(kind: str, path: Path, obj: Dict[str, Any]) -> bool:
return False
key = _redis_key_for_path(kind, path)
try:
payload = json.dumps(obj, ensure_ascii=False)
payload = json.dumps(obj, ensure_ascii=False).encode("utf-8")
compressed = _redis_compress(payload)
except Exception as e:
logger.debug(f"Failed to JSON serialize redis payload for {key}: {e}")
logger.debug(f"Failed to serialize/compress redis payload for {key}: {e}")
return False
try:
_redis_retry(lambda: client.set(key, payload))
_redis_retry(lambda: client.execute_command("SET", key, compressed))
return True
except Exception as e:
logger.debug(f"Redis set failed for {key}: {e}")
Expand Down
6 changes: 3 additions & 3 deletions tests/test_watch_index_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def test_processor_handles_cache_read_errors(monkeypatch, tmp_path):
test_file = tmp_path / "test.txt"
test_file.write_text("test content", encoding="utf-8")

# Exercise _read_text_and_sha1 - should handle errors gracefully and return content + hash
text, sha1 = proc_mod._read_text_and_sha1(test_file)
# Exercise _read_text_and_hash - should handle errors gracefully and return content + hash
text, file_hash = proc_mod._read_text_and_hash(test_file)
assert text == "test content"
assert sha1 is not None and len(sha1) == 40 # SHA1 hex length
assert file_hash is not None and len(file_hash) == 16 # xxhash64 hex length


def test_handler_move_event_handles_cache_errors(monkeypatch, tmp_path):
Expand Down