Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions src/agentready/services/llm_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def get(self, cache_key: str) -> DiscoveredSkill | None:
Returns:
Cached DiscoveredSkill or None if miss/expired
"""
cache_file = self.cache_dir / f"{cache_key}.json"
# Security: Validate cache_key to prevent path traversal
cache_file = self._get_safe_cache_path(cache_key)
if cache_file is None:
logger.warning(f"Invalid cache key rejected: {cache_key}")
return None

if not cache_file.exists():
logger.debug(f"Cache miss: {cache_key}")
Expand Down Expand Up @@ -65,7 +69,11 @@ def set(self, cache_key: str, skill: DiscoveredSkill):
cache_key: Unique cache key
skill: DiscoveredSkill to cache
"""
cache_file = self.cache_dir / f"{cache_key}.json"
# Security: Validate cache_key to prevent path traversal
cache_file = self._get_safe_cache_path(cache_key)
if cache_file is None:
logger.warning(f"Invalid cache key rejected: {cache_key}")
return

try:
data = {
Expand All @@ -81,6 +89,38 @@ def set(self, cache_key: str, skill: DiscoveredSkill):
except Exception as e:
logger.warning(f"Cache write error for {cache_key}: {e}")

def _get_safe_cache_path(self, cache_key: str) -> Path | None:
"""Validate cache key and return safe path.

Security: Prevents path traversal attacks by validating cache_key
contains no directory separators and resolves within cache_dir.

Args:
cache_key: Cache key to validate

Returns:
Validated Path or None if invalid
"""
# Reject keys with path separators (/, \)
if "/" in cache_key or "\\" in cache_key:
return None

# Reject keys with null bytes or other dangerous characters
if "\0" in cache_key or ".." in cache_key:
return None

# Construct path and resolve to canonical form
cache_file = (self.cache_dir / f"{cache_key}.json").resolve()

# Ensure resolved path is within cache directory
try:
cache_file.relative_to(self.cache_dir.resolve())
except ValueError:
# Path is outside cache_dir
return None

return cache_file

@staticmethod
def generate_key(attribute_id: str, score: float, evidence_hash: str) -> str:
"""Generate cache key from finding attributes.
Expand Down
Loading