-
Notifications
You must be signed in to change notification settings - Fork 204
Session persistence #302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Session persistence #302
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
"""File-based session manager for local filesystem storage.""" | ||
|
||
import json | ||
import logging | ||
import os | ||
import shutil | ||
import tempfile | ||
from dataclasses import asdict | ||
from typing import Any, Optional, cast | ||
|
||
from ..types.exceptions import SessionException | ||
from ..types.session import Session, SessionAgent, SessionMessage | ||
from .repository_session_manager import RepositorySessionManager | ||
from .session_repository import SessionRepository | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SESSION_PREFIX = "session_" | ||
AGENT_PREFIX = "agent_" | ||
MESSAGE_PREFIX = "message_" | ||
|
||
|
||
class FileSessionManager(RepositorySessionManager, SessionRepository): | ||
"""File-based session manager for local filesystem storage. | ||
|
||
Creates the following filesystem structure for the session storage: | ||
/<sessions_dir>/ | ||
└── session_<session_id>/ | ||
├── session.json # Session metadata | ||
└── agents/ | ||
└── agent_<agent_id>/ | ||
├── agent.json # Agent metadata | ||
└── messages/ | ||
├── message_<created_timestamp>_<id1>.json | ||
└── message_<created_timestamp>_<id2>.json | ||
|
||
""" | ||
|
||
def __init__(self, session_id: str, storage_dir: Optional[str] = None): | ||
"""Initialize FileSession with filesystem storage. | ||
|
||
Args: | ||
session_id: ID for the session | ||
storage_dir: Directory for local filesystem storage (defaults to temp dir) | ||
""" | ||
self.storage_dir = storage_dir or os.path.join(tempfile.gettempdir(), "strands/sessions") | ||
os.makedirs(self.storage_dir, exist_ok=True) | ||
|
||
super().__init__(session_id=session_id, session_repository=self) | ||
|
||
def _get_session_path(self, session_id: str) -> str: | ||
"""Get session directory path.""" | ||
return os.path.join(self.storage_dir, f"{SESSION_PREFIX}{session_id}") | ||
|
||
def _get_agent_path(self, session_id: str, agent_id: str) -> str: | ||
"""Get agent directory path.""" | ||
session_path = self._get_session_path(session_id) | ||
return os.path.join(session_path, "agents", f"{AGENT_PREFIX}{agent_id}") | ||
|
||
def _get_message_path(self, session_id: str, agent_id: str, message_id: str, timestamp: str) -> str: | ||
"""Get message file path. | ||
|
||
Args: | ||
session_id: ID of the session | ||
agent_id: ID of the agent | ||
message_id: ID of the message | ||
timestamp: ISO format timestamp to include in filename for sorting | ||
Returns: | ||
The filename for the message | ||
""" | ||
agent_path = self._get_agent_path(session_id, agent_id) | ||
# Use timestamp for sortable filenames | ||
# Replace colons and periods in ISO format with underscores for filesystem compatibility | ||
filename_timestamp = timestamp.replace(":", "_").replace(".", "_") | ||
return os.path.join(agent_path, "messages", f"{MESSAGE_PREFIX}{filename_timestamp}_{message_id}.json") | ||
|
||
def _read_file(self, path: str) -> dict[str, Any]: | ||
"""Read JSON file.""" | ||
try: | ||
with open(path, "r", encoding="utf-8") as f: | ||
return cast(dict[str, Any], json.load(f)) | ||
except json.JSONDecodeError as e: | ||
raise SessionException(f"Invalid JSON in file {path}: {str(e)}") from e | ||
|
||
def _write_file(self, path: str, data: dict[str, Any]) -> None: | ||
"""Write JSON file.""" | ||
os.makedirs(os.path.dirname(path), exist_ok=True) | ||
with open(path, "w", encoding="utf-8") as f: | ||
json.dump(data, f, indent=2, ensure_ascii=False) | ||
|
||
def create_session(self, session: Session) -> Session: | ||
"""Create a new session.""" | ||
session_dir = self._get_session_path(session.session_id) | ||
if os.path.exists(session_dir): | ||
raise SessionException(f"Session {session.session_id} already exists") | ||
|
||
# Create directory structure | ||
os.makedirs(session_dir, exist_ok=True) | ||
os.makedirs(os.path.join(session_dir, "agents"), exist_ok=True) | ||
|
||
# Write session file | ||
session_file = os.path.join(session_dir, "session.json") | ||
session_dict = asdict(session) | ||
self._write_file(session_file, session_dict) | ||
|
||
return session | ||
|
||
def read_session(self, session_id: str) -> Optional[Session]: | ||
"""Read session data.""" | ||
session_file = os.path.join(self._get_session_path(session_id), "session.json") | ||
if not os.path.exists(session_file): | ||
return None | ||
|
||
session_data = self._read_file(session_file) | ||
return Session.from_dict(session_data) | ||
|
||
def create_agent(self, session_id: str, session_agent: SessionAgent) -> None: | ||
"""Create a new agent in the session.""" | ||
agent_id = session_agent.agent_id | ||
|
||
agent_dir = self._get_agent_path(session_id, agent_id) | ||
os.makedirs(agent_dir, exist_ok=True) | ||
os.makedirs(os.path.join(agent_dir, "messages"), exist_ok=True) | ||
|
||
agent_file = os.path.join(agent_dir, "agent.json") | ||
session_data = asdict(session_agent) | ||
self._write_file(agent_file, session_data) | ||
|
||
def delete_session(self, session_id: str) -> None: | ||
"""Delete session and all associated data.""" | ||
session_dir = self._get_session_path(session_id) | ||
if not os.path.exists(session_dir): | ||
raise SessionException(f"Session {session_id} does not exist") | ||
|
||
shutil.rmtree(session_dir) | ||
|
||
def read_agent(self, session_id: str, agent_id: str) -> Optional[SessionAgent]: | ||
"""Read agent data.""" | ||
agent_file = os.path.join(self._get_agent_path(session_id, agent_id), "agent.json") | ||
if not os.path.exists(agent_file): | ||
return None | ||
|
||
agent_data = self._read_file(agent_file) | ||
return SessionAgent.from_dict(agent_data) | ||
|
||
def update_agent(self, session_id: str, session_agent: SessionAgent) -> None: | ||
"""Update agent data.""" | ||
agent_id = session_agent.agent_id | ||
previous_agent = self.read_agent(session_id=session_id, agent_id=agent_id) | ||
if previous_agent is None: | ||
raise SessionException(f"Agent {agent_id} in session {session_id} does not exist") | ||
|
||
session_agent.created_at = previous_agent.created_at | ||
agent_file = os.path.join(self._get_agent_path(session_id, agent_id), "agent.json") | ||
self._write_file(agent_file, asdict(session_agent)) | ||
|
||
def create_message(self, session_id: str, agent_id: str, session_message: SessionMessage) -> None: | ||
"""Create a new message for the agent.""" | ||
message_file = self._get_message_path( | ||
session_id, | ||
agent_id, | ||
session_message.message_id, | ||
session_message.created_at, | ||
) | ||
session_dict = asdict(session_message) | ||
self._write_file(message_file, session_dict) | ||
|
||
def read_message(self, session_id: str, agent_id: str, message_id: str) -> Optional[SessionMessage]: | ||
"""Read message data.""" | ||
# Get the messages directory | ||
messages_dir = os.path.join(self._get_agent_path(session_id, agent_id), "messages") | ||
if not os.path.exists(messages_dir): | ||
return None | ||
|
||
# List files in messages directory, and check if the filename ends with the message id | ||
for filename in os.listdir(messages_dir): | ||
if filename.endswith(f"{message_id}.json"): | ||
file_path = os.path.join(messages_dir, filename) | ||
message_data = self._read_file(file_path) | ||
return SessionMessage.from_dict(message_data) | ||
|
||
return None | ||
|
||
def update_message(self, session_id: str, agent_id: str, session_message: SessionMessage) -> None: | ||
"""Update message data.""" | ||
message_id = session_message.message_id | ||
previous_message = self.read_message(session_id=session_id, agent_id=agent_id, message_id=message_id) | ||
if previous_message is None: | ||
raise SessionException(f"Message {message_id} does not exist") | ||
|
||
# Preserve the original created_at timestamp | ||
session_message.created_at = previous_message.created_at | ||
message_file = self._get_message_path(session_id, agent_id, message_id, session_message.created_at) | ||
self._write_file(message_file, asdict(session_message)) | ||
|
||
def list_messages( | ||
self, session_id: str, agent_id: str, limit: Optional[int] = None, offset: int = 0 | ||
) -> list[SessionMessage]: | ||
"""List messages for an agent with pagination.""" | ||
messages_dir = os.path.join(self._get_agent_path(session_id, agent_id), "messages") | ||
if not os.path.exists(messages_dir): | ||
raise SessionException(f"Messages directory missing from agent: {agent_id} in session {session_id}") | ||
|
||
# Read all message files | ||
message_files: list[str] = [] | ||
for filename in os.listdir(messages_dir): | ||
if filename.startswith(MESSAGE_PREFIX) and filename.endswith(".json"): | ||
message_files.append(filename) | ||
|
||
# Sort filenames - the timestamp in the file's name will sort chronologically | ||
message_files.sort() | ||
|
||
# Apply pagination to filenames | ||
if limit is not None: | ||
message_files = message_files[offset : offset + limit] | ||
else: | ||
message_files = message_files[offset:] | ||
|
||
# Load only the message files | ||
messages: list[SessionMessage] = [] | ||
for filename in message_files: | ||
file_path = os.path.join(messages_dir, filename) | ||
message_data = self._read_file(file_path) | ||
messages.append(SessionMessage.from_dict(message_data)) | ||
|
||
return messages |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.