Skip to content

Session persistence #302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import random
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, AsyncIterator, Callable, Mapping, Optional, Type, TypeVar, Union, cast
from uuid import uuid4

from opentelemetry import trace
from pydantic import BaseModel
Expand All @@ -32,6 +31,7 @@
)
from ..models.bedrock import BedrockModel
from ..models.model import Model
from ..session.session_manager import SessionManager
from ..telemetry.metrics import EventLoopMetrics
from ..telemetry.tracer import get_tracer
from ..tools.registry import ToolRegistry
Expand Down Expand Up @@ -62,6 +62,7 @@ class _DefaultCallbackHandlerSentinel:

_DEFAULT_CALLBACK_HANDLER = _DefaultCallbackHandlerSentinel()
_DEFAULT_AGENT_NAME = "Strands Agents"
_DEFAULT_AGENT_ID = "default"


class Agent:
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(
description: Optional[str] = None,
state: Optional[Union[AgentState, dict]] = None,
hooks: Optional[list[HookProvider]] = None,
session_manager: Optional[SessionManager] = None,
):
"""Initialize the Agent with the specified configuration.

Expand Down Expand Up @@ -237,22 +239,24 @@ def __init__(
load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
Defaults to False.
trace_attributes: Custom trace attributes to apply to the agent's trace span.
agent_id: Optional ID for the agent, useful for multi-agent scenarios.
If None, a UUID is generated.
agent_id: Optional ID for the agent, useful for session management and multi-agent scenarios.
Defaults to "default".
name: name of the Agent
Defaults to None.
Defaults to "Strands Agents".
description: description of what the Agent does
Defaults to None.
state: stateful information for the agent. Can be either an AgentState object, or a json serializable dict.
Defaults to an empty AgentState object.
hooks: hooks to be added to the agent hook registry
Defaults to None.
session_manager: Manager for handling agent sessions including conversation history and state.
If provided, enables session-based persistence and state management.
"""
self.model = BedrockModel() if not model else BedrockModel(model_id=model) if isinstance(model, str) else model
self.messages = messages if messages is not None else []

self.system_prompt = system_prompt
self.agent_id = agent_id or str(uuid4())
self.agent_id = agent_id or _DEFAULT_AGENT_ID
self.name = name or _DEFAULT_AGENT_NAME
self.description = description

Expand Down Expand Up @@ -312,6 +316,12 @@ def __init__(
self.tool_caller = Agent.ToolCaller(self)

self.hooks = HookRegistry()

# Initialize session management functionality
self._session_manager = session_manager
if self._session_manager:
self.hooks.add_hook(self._session_manager)

if hooks:
for hook in hooks:
self.hooks.add_hook(hook)
Expand Down
226 changes: 226 additions & 0 deletions src/strands/session/file_session_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""File-based session manager for local filesystem storage."""

import json
import logging
import os
import shutil
import tempfile
from dataclasses import asdict
from typing import Any, Optional, cast

from ..types.exceptions import SessionException
from ..types.session import Session, SessionAgent, SessionMessage
from .repository_session_manager import RepositorySessionManager
from .session_repository import SessionRepository

logger = logging.getLogger(__name__)

SESSION_PREFIX = "session_"
AGENT_PREFIX = "agent_"
MESSAGE_PREFIX = "message_"


class FileSessionManager(RepositorySessionManager, SessionRepository):
"""File-based session manager for local filesystem storage.

Creates the following filesystem structure for the session storage:
/<sessions_dir>/
└── session_<session_id>/
├── session.json # Session metadata
└── agents/
└── agent_<agent_id>/
├── agent.json # Agent metadata
└── messages/
├── message_<created_timestamp>_<id1>.json
└── message_<created_timestamp>_<id2>.json

"""

def __init__(self, session_id: str, storage_dir: Optional[str] = None):
"""Initialize FileSession with filesystem storage.

Args:
session_id: ID for the session
storage_dir: Directory for local filesystem storage (defaults to temp dir)
"""
self.storage_dir = storage_dir or os.path.join(tempfile.gettempdir(), "strands/sessions")
os.makedirs(self.storage_dir, exist_ok=True)

super().__init__(session_id=session_id, session_repository=self)

def _get_session_path(self, session_id: str) -> str:
"""Get session directory path."""
return os.path.join(self.storage_dir, f"{SESSION_PREFIX}{session_id}")

def _get_agent_path(self, session_id: str, agent_id: str) -> str:
"""Get agent directory path."""
session_path = self._get_session_path(session_id)
return os.path.join(session_path, "agents", f"{AGENT_PREFIX}{agent_id}")

def _get_message_path(self, session_id: str, agent_id: str, message_id: str, timestamp: str) -> str:
"""Get message file path.

Args:
session_id: ID of the session
agent_id: ID of the agent
message_id: ID of the message
timestamp: ISO format timestamp to include in filename for sorting
Returns:
The filename for the message
"""
agent_path = self._get_agent_path(session_id, agent_id)
# Use timestamp for sortable filenames
# Replace colons and periods in ISO format with underscores for filesystem compatibility
filename_timestamp = timestamp.replace(":", "_").replace(".", "_")
return os.path.join(agent_path, "messages", f"{MESSAGE_PREFIX}{filename_timestamp}_{message_id}.json")

def _read_file(self, path: str) -> dict[str, Any]:
"""Read JSON file."""
try:
with open(path, "r", encoding="utf-8") as f:
return cast(dict[str, Any], json.load(f))
except json.JSONDecodeError as e:
raise SessionException(f"Invalid JSON in file {path}: {str(e)}") from e

def _write_file(self, path: str, data: dict[str, Any]) -> None:
"""Write JSON file."""
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)

def create_session(self, session: Session) -> Session:
"""Create a new session."""
session_dir = self._get_session_path(session.session_id)
if os.path.exists(session_dir):
raise SessionException(f"Session {session.session_id} already exists")

# Create directory structure
os.makedirs(session_dir, exist_ok=True)
os.makedirs(os.path.join(session_dir, "agents"), exist_ok=True)

# Write session file
session_file = os.path.join(session_dir, "session.json")
session_dict = asdict(session)
self._write_file(session_file, session_dict)

return session

def read_session(self, session_id: str) -> Optional[Session]:
"""Read session data."""
session_file = os.path.join(self._get_session_path(session_id), "session.json")
if not os.path.exists(session_file):
return None

session_data = self._read_file(session_file)
return Session.from_dict(session_data)

def create_agent(self, session_id: str, session_agent: SessionAgent) -> None:
"""Create a new agent in the session."""
agent_id = session_agent.agent_id

agent_dir = self._get_agent_path(session_id, agent_id)
os.makedirs(agent_dir, exist_ok=True)
os.makedirs(os.path.join(agent_dir, "messages"), exist_ok=True)

agent_file = os.path.join(agent_dir, "agent.json")
session_data = asdict(session_agent)
self._write_file(agent_file, session_data)

def delete_session(self, session_id: str) -> None:
"""Delete session and all associated data."""
session_dir = self._get_session_path(session_id)
if not os.path.exists(session_dir):
raise SessionException(f"Session {session_id} does not exist")

shutil.rmtree(session_dir)

def read_agent(self, session_id: str, agent_id: str) -> Optional[SessionAgent]:
"""Read agent data."""
agent_file = os.path.join(self._get_agent_path(session_id, agent_id), "agent.json")
if not os.path.exists(agent_file):
return None

agent_data = self._read_file(agent_file)
return SessionAgent.from_dict(agent_data)

def update_agent(self, session_id: str, session_agent: SessionAgent) -> None:
"""Update agent data."""
agent_id = session_agent.agent_id
previous_agent = self.read_agent(session_id=session_id, agent_id=agent_id)
if previous_agent is None:
raise SessionException(f"Agent {agent_id} in session {session_id} does not exist")

session_agent.created_at = previous_agent.created_at
agent_file = os.path.join(self._get_agent_path(session_id, agent_id), "agent.json")
self._write_file(agent_file, asdict(session_agent))

def create_message(self, session_id: str, agent_id: str, session_message: SessionMessage) -> None:
"""Create a new message for the agent."""
message_file = self._get_message_path(
session_id,
agent_id,
session_message.message_id,
session_message.created_at,
)
session_dict = asdict(session_message)
self._write_file(message_file, session_dict)

def read_message(self, session_id: str, agent_id: str, message_id: str) -> Optional[SessionMessage]:
"""Read message data."""
# Get the messages directory
messages_dir = os.path.join(self._get_agent_path(session_id, agent_id), "messages")
if not os.path.exists(messages_dir):
return None

# List files in messages directory, and check if the filename ends with the message id
for filename in os.listdir(messages_dir):
if filename.endswith(f"{message_id}.json"):
file_path = os.path.join(messages_dir, filename)
message_data = self._read_file(file_path)
return SessionMessage.from_dict(message_data)

return None

def update_message(self, session_id: str, agent_id: str, session_message: SessionMessage) -> None:
"""Update message data."""
message_id = session_message.message_id
previous_message = self.read_message(session_id=session_id, agent_id=agent_id, message_id=message_id)
if previous_message is None:
raise SessionException(f"Message {message_id} does not exist")

# Preserve the original created_at timestamp
session_message.created_at = previous_message.created_at
message_file = self._get_message_path(session_id, agent_id, message_id, session_message.created_at)
self._write_file(message_file, asdict(session_message))

def list_messages(
self, session_id: str, agent_id: str, limit: Optional[int] = None, offset: int = 0
) -> list[SessionMessage]:
"""List messages for an agent with pagination."""
messages_dir = os.path.join(self._get_agent_path(session_id, agent_id), "messages")
if not os.path.exists(messages_dir):
raise SessionException(f"Messages directory missing from agent: {agent_id} in session {session_id}")

# Read all message files
message_files: list[str] = []
for filename in os.listdir(messages_dir):
if filename.startswith(MESSAGE_PREFIX) and filename.endswith(".json"):
message_files.append(filename)

# Sort filenames - the timestamp in the file's name will sort chronologically
message_files.sort()

# Apply pagination to filenames
if limit is not None:
message_files = message_files[offset : offset + limit]
else:
message_files = message_files[offset:]

# Load only the message files
messages: list[SessionMessage] = []
for filename in message_files:
file_path = os.path.join(messages_dir, filename)
message_data = self._read_file(file_path)
messages.append(SessionMessage.from_dict(message_data))

return messages
Loading
Loading