Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following packages are hosted in this repository:

### LangGraph

- **Checkpointers**: Provides a custom checkpointing solution for LangGraph agents using either the [AgentCore Memory Service](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory.html) or the [AWS Bedrock Session Management Service](https://docs.aws.amazon.com/bedrock/latest/userguide/sessions.html).
- **Checkpointers**: Provides a custom checkpointing solution for LangGraph agents using the [AgentCore Memory Service](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory.html), the [AWS Bedrock Session Management Service](https://docs.aws.amazon.com/bedrock/latest/userguide/sessions.html), or the [ElastiCache Valkey Service](https://aws.amazon.com/elasticache/).
- **Memory Stores** - Provides a memory store solution for saving, processing, and retrieving intelligent long term memories using the [AgentCore Memory Service](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory.html).

...and more to come. This repository will continue to expand and offer additional components for various AWS services as development progresses.
Expand Down
232 changes: 204 additions & 28 deletions libs/langgraph-checkpoint-aws/README.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
# LangGraph Checkpoint AWS

A custom LangChain checkpointer implementation that uses Bedrock AgentCore Memory to enable stateful and resumable LangGraph agents through efficient state persistence and retrieval.
A custom AWS-based persistence solution for LangGraph agents that provides multiple storage backends including Bedrock AgentCore Memory and high-performance Valkey (Redis-compatible) storage.

## Overview
This package provides multiple persistence solutions for LangGraph agents:

This package provides a custom checkpointing solution for LangGraph agents using AWS Bedrock AgentCore Memory Service. It enables:

### AWS Bedrock AgentCore Memory Service
1. Stateful conversations and interactions
2. Resumable agent sessions
3. Efficient state persistence and retrieval
4. Seamless integration with AWS Bedrock

### Valkey Storage Solutions
1. **Checkpoint storage** with Valkey (Redis-compatible)

## Installation

You can install the package using pip:

```bash
# Base package (includes Bedrock AgentCore Memory components)
pip install langgraph-checkpoint-aws
```

## Requirements
# With Valkey support
pip install 'langgraph-checkpoint-aws[valkey]'

```text
Python >=3.9
langgraph >=0.2.55
boto3 >=1.39.7
```

## Usage - Checkpointer
## Components

This package provides three main components:

1. **AgentCoreMemorySaver** - AWS Bedrock-based checkpoint storage
2. **ValkeySaver** - Valkey checkpoint storage
3. **AgentCoreMemoryStore** - AWS Bedrock-based document store


## Usage

### 1. Bedrock Session Management

```python
# Import LangGraph and LangChain components
Expand Down Expand Up @@ -70,7 +80,7 @@ response = graph.invoke(
)
```

## Usage - Memory Store
### 2. Bedrock Memory Store

```python
# Import LangGraph and LangChain components
Expand Down Expand Up @@ -138,6 +148,92 @@ response = graph.invoke(
)
```

### 3. Valkey Checkpoint Storage

```python
from langgraph.graph import StateGraph
from langgraph_checkpoint_aws import ValkeySaver

# Using connection string
with ValkeySaver.from_conn_string(
"valkey://localhost:6379",
ttl_seconds=3600, # 1 hour TTL
pool_size=10
) as checkpointer:
# Create your graph
builder = StateGraph(int)
builder.add_node("add_one", lambda x: x + 1)
builder.set_entry_point("add_one")
builder.set_finish_point("add_one")

graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke(1, config)
```

## Async Usage

All components support async operations:

```python
from langgraph_checkpoint_aws.async_saver import AsyncBedrockSessionSaver
from langgraph_checkpoint_aws.checkpoint.valkey import AsyncValkeySaver

# Async Bedrock usage
session_saver = AsyncBedrockSessionSaver(region_name="us-west-2")
session_id = (await session_saver.session_client.create_session()).session_id

# Async Valkey usage
async with AsyncValkeySaver.from_conn_string("valkey://localhost:6379") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
result = await graph.ainvoke(1, {"configurable": {"thread_id": "session-1"}})
```

## Configuration Options

### Bedrock Session Saver

`BedrockSessionSaver` and `AsyncBedrockSessionSaver` accept the following parameters:

```python
def __init__(
client: Optional[Any] = None,
session: Optional[boto3.Session] = None,
region_name: Optional[str] = None,
credentials_profile_name: Optional[str] = None,
aws_access_key_id: Optional[SecretStr] = None,
aws_secret_access_key: Optional[SecretStr] = None,
aws_session_token: Optional[SecretStr] = None,
endpoint_url: Optional[str] = None,
config: Optional[Config] = None,
)
```

### Valkey Components

Valkey components support these common configuration options:

#### Connection Options
- **Connection String**: `valkey://localhost:6379` or `valkeys://localhost:6380` (SSL). Refer [connection examples](https://valkey-py.readthedocs.io/en/latest/examples/connection_examples.html).
- **Connection Pool**: Reusable connection pools for better performance
- **Pool Size**: Maximum number of connections (default: 10)
- **SSL Support**: Secure connections with certificate validation

#### Performance Options
- **TTL (Time-to-Live)**: Automatic expiration of stored data
- **Batch Operations**: Efficient bulk operations for better throughput
- **Async Support**: Non-blocking operations for high concurrency

#### ValkeySaver Options
```python
valkey_client = Valkey.from_url("valkey://localhost:6379")
ValkeySaver(
client: valkey_client,
ttl: float | None = None, # TTL in seconds
serde: SerializerProtocol | None = None # Custom serialization
)
```

## Development

Setting Up Development Environment
Expand Down Expand Up @@ -187,7 +283,9 @@ make spell_check # Check spelling
make clean # Remove all generated files
```

## AWS Configuration
## Infrastructure Setup

### AWS Configuration (for Bedrock components)

Ensure you have AWS credentials configured using one of these methods:

Expand All @@ -196,14 +294,14 @@ Ensure you have AWS credentials configured using one of these methods:
3. IAM roles
4. Direct credential injection via constructor parameters

## Required AWS permissions
Required AWS permissions for Bedrock Session Management:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Sid": "BedrockSessionManagement",
"Effect": "Allow",
"Action": [
"bedrock-agentcore:CreateEvent",
Expand Down Expand Up @@ -325,11 +423,10 @@ def __init__(
"bedrock:GetInvocationStep",
"bedrock:ListInvocationSteps"
],
"Resource": [
"*"
]
"Resource": ["*"]
},
{
"Sid": "KMSAccess",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
Expand All @@ -338,20 +435,68 @@ def __init__(
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:{region}:{account}:key/{kms-key-id}"
},
{
"Effect": "Allow",
"Action": [
"bedrock:TagResource",
"bedrock:UntagResource",
"bedrock:ListTagsForResource"
],
"Resource": "arn:aws:bedrock:{region}:{account}:session/*"
}
]
}
```

### Valkey Setup

#### Using AWS ElastiCache for Valkey (Recommended)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#### Using AWS ElastiCache for Valkey (Recommended)
#### Using AWS ElastiCache for Valkey

```python
# Connect to AWS ElastiCache from host running inside VPC with access to cache
from langgraph_checkpoint_aws.checkpoint.valkey import ValkeySaver

with ValkeySaver.from_conn_string(
"valkeys://your-elasticache-cluster.amazonaws.com:6379",
pool_size=20
) as checkpointer:
pass
```
If you want to connect to cache from a host outside of VPC, use ElastiCache console to setup a jump host so you could create SSH tunnel to access cache locally.

#### Using Docker
```bash
# Start Valkey with required modules
docker run --name valkey-bundle -p 6379:6379 -d valkey/valkey-bundle:latest

# Or with custom configuration
docker run --name valkey-custom \
-p 6379:6379 \
-v $(pwd)/valkey.conf:/etc/valkey/valkey.conf \
-d valkey/valkey-bundle:latest
```

## Performance and Best Practices

### Valkey Performance Optimization

#### Connection Pooling
```python
# Use connection pools for better performance
from valkey.connection import ConnectionPool

pool = ConnectionPool.from_url(
"valkey://localhost:6379",
max_connections=20,
retry_on_timeout=True
)

with ValkeySaver.from_pool(pool) as checkpointer:
# Reuse connections across operations
pass
```

#### TTL Strategy
```python
# Configure appropriate TTL values
with ValkeySaver.from_conn_string(
"valkey://localhost:6379",
ttl_seconds=3600 # 1 hour for active sessions
) as checkpointer:
pass
```

## Security Considerations

* Never commit AWS credentials
Expand All @@ -361,6 +506,37 @@ def __init__(
* Use IAM roles and temporary credentials when possible
* Implement proper access controls for session management

### Valkey Security
* Use SSL/TLS for production deployments (`valkeys://` protocol), refer [SSL connection examples](https://valkey-py.readthedocs.io/en/latest/examples/ssl_connection_examples.html#Connect-to-a-Valkey-instance-via-SSL,-and-validate-OCSP-stapled-certificates)
* Configure authentication with strong passwords
* Implement network security (VPC, security groups)
* Regular security updates and monitoring
* Use AWS ElastiCache for managed Valkey with encryption

```python
# Secure connection example
import os
import valkey

pki_dir = os.path.join("..", "..", "dockers", "stunnel", "keys")

valkey_client = valkey.Valkey(
host="localhost",
port=6666,
ssl=True,
ssl_certfile=os.path.join(pki_dir, "client-cert.pem"),
ssl_keyfile=os.path.join(pki_dir, "client-key.pem"),
ssl_cert_reqs="required",
ssl_ca_certs=os.path.join(pki_dir, "ca-cert.pem"),
)

checkpointer = ValkeySaver(valkey_client)
```

## Examples and Samples

Comprehensive examples are available in the `samples/memory/` directory:

## Contributing

* Fork the repository
Expand All @@ -377,5 +553,5 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
## Acknowledgments

* LangChain team for the base LangGraph framework

* AWS Bedrock team for the session management service
* Valkey team for the Redis-compatible storage
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,47 @@
Bedrock Session Management Service.
"""

from importlib.metadata import version

from langgraph_checkpoint_aws.agentcore.saver import (
AgentCoreMemorySaver,
)
from langgraph_checkpoint_aws.agentcore.store import (
AgentCoreMemoryStore,
)

__version__ = "1.0.0"
# Conditional imports for checkpoint functionality
try:
from langgraph_checkpoint_aws.checkpoint import AsyncValkeySaver, ValkeySaver

valkey_available = True
except ImportError:
# If checkpoint dependencies are not available, create placeholder classes
from typing import Any

def _missing_checkpoint_dependencies_error(*args: Any, **kwargs: Any) -> Any:
raise ImportError(
"Valkey checkpoint functionality requires optional dependencies. "
"Install them with: pip install 'langgraph-checkpoint-aws[valkey]'"
)

# Create placeholder classes that raise helpful errors
AsyncValkeySaver: type[Any] = _missing_checkpoint_dependencies_error # type: ignore[assignment,no-redef]
ValkeySaver: type[Any] = _missing_checkpoint_dependencies_error # type: ignore[assignment,no-redef]
valkey_available = False

try:
__version__ = version("langgraph-checkpoint-aws")
except Exception:
# Fallback version if package is not installed
__version__ = "1.0.0a1"
SDK_USER_AGENT = f"LangGraphCheckpointAWS#{__version__}"

# Expose the saver class at the package level
__all__ = [
"AgentCoreMemorySaver",
"AgentCoreMemoryStore",
"AsyncValkeySaver",
"ValkeySaver",
"SDK_USER_AGENT",
]
Loading