Skip to content

Add PostgreSQL support for long-term memory storage #2892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
730 changes: 730 additions & 0 deletions docs/how-to/postgres-long-term-memory.mdx

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
postgres = [
"psycopg[pool]>=3.1.12",
]

[tool.uv]
dev-dependencies = [
Expand All @@ -87,6 +90,7 @@ dev-dependencies = [
"pytest-recording>=0.13.2",
"pytest-randomly>=3.16.0",
"pytest-timeout>=2.3.1",
"psycopg[pool]>=3.1.12",
]

[project.scripts]
Expand All @@ -102,4 +106,4 @@ exclude_dirs = ["src/crewai/cli/templates"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
build-backend = "hatchling.build"
122 changes: 115 additions & 7 deletions src/crewai/memory/long_term/long_term_memory.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
# Storage factory is used to create appropriate storage backend
from crewai.memory.storage.ltm_storage_factory import LTMStorageFactory


class LongTermMemory(Memory):
Expand All @@ -14,14 +15,67 @@ class LongTermMemory(Memory):
LongTermMemoryItem instances.
"""

def __init__(self, storage=None, path=None):
def __init__(
self,
storage=None,
storage_type: str = "sqlite",
path: Optional[str] = None,
postgres_connection_string: Optional[str] = None,
postgres_schema: Optional[str] = None,
postgres_table_name: Optional[str] = None,
postgres_min_pool_size: Optional[int] = None,
postgres_max_pool_size: Optional[int] = None,
postgres_use_connection_pool: Optional[bool] = None,
):
Comment on lines +18 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that storage_type can be used as a factory parameter. However, I think this class should remain storage-type agnostic. We could rely on **kwargs instead of mapping all supported attributes for each storage type here. IMO, the factory (as the PostgresStorageFactory) should be responsible for handling those mappings.. not this class

"""
Initialize LongTermMemory with the specified storage backend.

Args:
storage: Optional pre-configured storage instance
storage_type: Type of storage to use ('sqlite' or 'postgres') when creating a new storage
path: Path to SQLite database file (only used with SQLite storage)
postgres_connection_string: Postgres connection string (only used with Postgres storage)
postgres_schema: Postgres schema name (only used with Postgres storage)
postgres_table_name: Postgres table name (only used with Postgres storage)
postgres_min_pool_size: Minimum connection pool size (only used with Postgres storage)
postgres_max_pool_size: Maximum connection pool size (only used with Postgres storage)
postgres_use_connection_pool: Whether to use connection pooling (only used with Postgres storage)
"""
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
storage = LTMStorageFactory.create_storage(
storage_type=storage_type,
path=path,
connection_string=postgres_connection_string,
schema=postgres_schema,
table_name=postgres_table_name,
min_pool_size=postgres_min_pool_size,
max_pool_size=postgres_max_pool_size,
use_connection_pool=postgres_use_connection_pool,
)
super().__init__(storage=storage)

def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
"""
Save a memory item to storage.

Args:
item: The LongTermMemoryItem to save
"""
# Create metadata dictionary with required values
metadata = item.metadata.copy() if item.metadata else {}
metadata.update({
"agent": item.agent,
"expected_output": item.expected_output
})

# Ensure quality is in metadata (from item.quality if available)
if "quality" not in metadata and item.quality is not None:
metadata["quality"] = item.quality

# Check if quality is available
if "quality" not in metadata:
raise ValueError("Memory quality must be provided either in item.quality or item.metadata['quality']")

Comment on lines +71 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sending these info as metadata? Is it exclusive for LTStorage Postgress ?

self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
Expand All @@ -30,7 +84,61 @@ def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signat
)

def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
"""
Search for memory items by task.

Args:
task: The task description to search for
latest_n: Maximum number of results to return

Returns:
List of memory items matching the search criteria
"""
return self.storage.load(task, latest_n) or [] # type: ignore # BUG?: "Storage" has no attribute "load"

def reset(self) -> None:
"""Reset the storage by deleting all memory items."""
self.storage.reset()

def cleanup(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood what this method was doing.. at first, I thought it was meant to reset created entries, but I realized it’s actually for closing any open connections.

That said, I do have some concerns about the current API design, it feels a bit confusing. Here are a few thoughts off the top of my head:

  • If users want to use long-term memory with Postgres, we should always encourage them to use context managers.
  • Let’s consider removing this cleanup method, as its name could be easily confused with something like reset. If we decide to keep it, we should rename it to something clearer—maybe just close

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to discuss the API usage if you want

"""
Clean up resources and connections.

This method safely handles any exceptions that might occur during cleanup,
ensuring resources are properly released.
"""
if hasattr(self.storage, 'close'):
try:
self.storage.close()
except Exception as e:
# Log the error but don't raise it to ensure cleanup continues
print(f"WARNING: Error while closing memory storage: {e}")

# Keep close() for backward compatibility
def close(self) -> None:
"""
Close any resources held by the storage.

For PostgreSQL storage with connection pooling enabled, this will
close the connection pool. For other storage types, this is a no-op.

This method safely handles any exceptions that might occur during closing.

Note: This method is an alias for cleanup() and is maintained for backward compatibility.
"""
self.cleanup()

def __enter__(self):
"""Support for using LongTermMemory as a context manager."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Clean up resources when exiting a context manager block.

Args:
exc_type: Exception type if an exception was raised in the context
exc_val: Exception value if an exception was raised in the context
exc_tb: Exception traceback if an exception was raised in the context
"""
self.cleanup()
6 changes: 6 additions & 0 deletions src/crewai/memory/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
"""Memory storage implementations for crewAI."""

from crewai.memory.storage.ltm_postgres_storage import LTMPostgresStorage
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
from crewai.memory.storage.ltm_storage_factory import LTMStorageFactory

__all__ = ["LTMSQLiteStorage", "LTMPostgresStorage", "LTMStorageFactory"]
Loading