-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add PostgreSQL support for long-term memory storage #2892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
from typing import Any, Dict, List | ||
from typing import Any, Dict, List, Optional | ||
|
||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem | ||
from crewai.memory.memory import Memory | ||
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage | ||
# Storage factory is used to create appropriate storage backend | ||
from crewai.memory.storage.ltm_storage_factory import LTMStorageFactory | ||
|
||
|
||
class LongTermMemory(Memory): | ||
|
@@ -14,14 +15,67 @@ class LongTermMemory(Memory): | |
LongTermMemoryItem instances. | ||
""" | ||
|
||
def __init__(self, storage=None, path=None): | ||
def __init__( | ||
self, | ||
storage=None, | ||
storage_type: str = "sqlite", | ||
path: Optional[str] = None, | ||
postgres_connection_string: Optional[str] = None, | ||
postgres_schema: Optional[str] = None, | ||
postgres_table_name: Optional[str] = None, | ||
postgres_min_pool_size: Optional[int] = None, | ||
postgres_max_pool_size: Optional[int] = None, | ||
postgres_use_connection_pool: Optional[bool] = None, | ||
): | ||
""" | ||
Initialize LongTermMemory with the specified storage backend. | ||
|
||
Args: | ||
storage: Optional pre-configured storage instance | ||
storage_type: Type of storage to use ('sqlite' or 'postgres') when creating a new storage | ||
path: Path to SQLite database file (only used with SQLite storage) | ||
postgres_connection_string: Postgres connection string (only used with Postgres storage) | ||
postgres_schema: Postgres schema name (only used with Postgres storage) | ||
postgres_table_name: Postgres table name (only used with Postgres storage) | ||
postgres_min_pool_size: Minimum connection pool size (only used with Postgres storage) | ||
postgres_max_pool_size: Maximum connection pool size (only used with Postgres storage) | ||
postgres_use_connection_pool: Whether to use connection pooling (only used with Postgres storage) | ||
""" | ||
if not storage: | ||
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage() | ||
storage = LTMStorageFactory.create_storage( | ||
storage_type=storage_type, | ||
path=path, | ||
connection_string=postgres_connection_string, | ||
schema=postgres_schema, | ||
table_name=postgres_table_name, | ||
min_pool_size=postgres_min_pool_size, | ||
max_pool_size=postgres_max_pool_size, | ||
use_connection_pool=postgres_use_connection_pool, | ||
) | ||
super().__init__(storage=storage) | ||
|
||
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory" | ||
metadata = item.metadata | ||
metadata.update({"agent": item.agent, "expected_output": item.expected_output}) | ||
""" | ||
Save a memory item to storage. | ||
|
||
Args: | ||
item: The LongTermMemoryItem to save | ||
""" | ||
# Create metadata dictionary with required values | ||
metadata = item.metadata.copy() if item.metadata else {} | ||
metadata.update({ | ||
"agent": item.agent, | ||
"expected_output": item.expected_output | ||
}) | ||
|
||
# Ensure quality is in metadata (from item.quality if available) | ||
if "quality" not in metadata and item.quality is not None: | ||
metadata["quality"] = item.quality | ||
|
||
# Check if quality is available | ||
if "quality" not in metadata: | ||
raise ValueError("Memory quality must be provided either in item.quality or item.metadata['quality']") | ||
|
||
Comment on lines
+71
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sending these info as metadata? Is it exclusive for LTStorage Postgress ? |
||
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage" | ||
task_description=item.task, | ||
score=metadata["quality"], | ||
|
@@ -30,7 +84,61 @@ def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signat | |
) | ||
|
||
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory" | ||
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load" | ||
""" | ||
Search for memory items by task. | ||
|
||
Args: | ||
task: The task description to search for | ||
latest_n: Maximum number of results to return | ||
|
||
Returns: | ||
List of memory items matching the search criteria | ||
""" | ||
return self.storage.load(task, latest_n) or [] # type: ignore # BUG?: "Storage" has no attribute "load" | ||
|
||
def reset(self) -> None: | ||
"""Reset the storage by deleting all memory items.""" | ||
self.storage.reset() | ||
|
||
def cleanup(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I misunderstood what this method was doing.. at first, I thought it was meant to reset created entries, but I realized it’s actually for closing any open connections. That said, I do have some concerns about the current API design, it feels a bit confusing. Here are a few thoughts off the top of my head:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm open to discuss the API usage if you want |
||
""" | ||
Clean up resources and connections. | ||
|
||
This method safely handles any exceptions that might occur during cleanup, | ||
ensuring resources are properly released. | ||
""" | ||
if hasattr(self.storage, 'close'): | ||
try: | ||
self.storage.close() | ||
except Exception as e: | ||
# Log the error but don't raise it to ensure cleanup continues | ||
print(f"WARNING: Error while closing memory storage: {e}") | ||
|
||
# Keep close() for backward compatibility | ||
def close(self) -> None: | ||
""" | ||
Close any resources held by the storage. | ||
|
||
For PostgreSQL storage with connection pooling enabled, this will | ||
close the connection pool. For other storage types, this is a no-op. | ||
|
||
This method safely handles any exceptions that might occur during closing. | ||
|
||
Note: This method is an alias for cleanup() and is maintained for backward compatibility. | ||
""" | ||
self.cleanup() | ||
|
||
def __enter__(self): | ||
"""Support for using LongTermMemory as a context manager.""" | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
""" | ||
Clean up resources when exiting a context manager block. | ||
|
||
Args: | ||
exc_type: Exception type if an exception was raised in the context | ||
exc_val: Exception value if an exception was raised in the context | ||
exc_tb: Exception traceback if an exception was raised in the context | ||
""" | ||
self.cleanup() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,7 @@ | ||
"""Memory storage implementations for crewAI.""" | ||
|
||
from crewai.memory.storage.ltm_postgres_storage import LTMPostgresStorage | ||
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage | ||
from crewai.memory.storage.ltm_storage_factory import LTMStorageFactory | ||
|
||
__all__ = ["LTMSQLiteStorage", "LTMPostgresStorage", "LTMStorageFactory"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that
storage_type
can be used as a factory parameter. However, I think this class should remain storage-type agnostic. We could rely on**kwargs
instead of mapping all supported attributes for each storage type here. IMO, the factory (as the PostgresStorageFactory) should be responsible for handling those mappings.. not this class