Skip to content

Add MCP examples with FastMCP implementation for Cube D3 APIs integration #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ Cube D3 is a powerful agent platform that can be integrated with other agent sys
## Protocols

1. A2A (Agent-to-Agent)
2. MCP (Multi-Agent Communication Protocol) (Coming Soon)
2. MCP (Model Context Protocol)


## Examples

### A2A Examples
1. [A2A - JSON-RPC](a2a/examples/jsonrpc)
2. [A2A - CrewAI](a2a/examples/crewai)
3. [A2A - LangGraph](a2a/examples/langgraph)

### MCP Examples
1. [MCP - FastMCP](mcp/examples/fastmcp) - Demonstrates integrating Cube D3 APIs with the Model Context Protocol using FastMCP
17 changes: 17 additions & 0 deletions mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Cube D3 MCP Examples

This directory contains examples of integrating Cube D3 with the Model Context Protocol (MCP).

## Examples

- [FastMCP](./examples/fastmcp/): Example demonstrating how to use FastMCP to create an MCP server that exposes resources, tools, and prompts.

## Installation

Each example may have its own dependencies. Please refer to the README.md in each example directory for specific installation instructions.

Generally, you will need to install the MCP SDK and FastMCP:

```bash
pip install mcp-sdk fastmcp python-dotenv
```
11 changes: 11 additions & 0 deletions mcp/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .client import D3ApiClient, D3ApiError, D3HttpError
from .config import load_d3_config, generate_auth_token, get_d3_headers

__all__ = [
"D3ApiClient",
"D3ApiError",
"D3HttpError",
"load_d3_config",
"generate_auth_token",
"get_d3_headers",
]
152 changes: 152 additions & 0 deletions mcp/common/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import httpx
import json
from typing import Dict, Any, Optional

from .config import get_d3_headers

# --- Exceptions ---
class D3ApiError(Exception):
"""Base exception for D3 API client errors."""
pass

class D3HttpError(D3ApiError):
"""Exception for HTTP errors during D3 API communication."""
def __init__(self, status_code: int, response_text: str):
self.status_code = status_code
self.response_text = response_text
super().__init__(f"HTTP Error {status_code}: {response_text}")

# --- API Client ---
class D3ApiClient:
"""Client for interacting with D3 APIs"""

def __init__(self, api_url: str, api_secret: str, user_context: str = "mcp-d3-example"):
if not api_url:
raise ValueError("api_url cannot be empty.")
if not api_secret:
raise ValueError("api_secret cannot be empty.")

self.api_url = api_url
self.api_secret = api_secret
self.user_context = user_context
self.headers = get_d3_headers(self.api_secret, self.user_context)

def list_agents(self) -> Dict[str, Any]:
"""Fetch list of agents from D3 API"""
try:
with httpx.Client(headers=self.headers, timeout=30.0) as client:
response = client.get(f"{self.api_url}/agents")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch agents: {str(e)}")

def get_agent(self, agent_id: str) -> Dict[str, Any]:
"""Get information about a specific agent"""
try:
with httpx.Client(headers=self.headers, timeout=30.0) as client:
response = client.get(f"{self.api_url}/agents/{agent_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch agent {agent_id}: {str(e)}")

def list_conversations(self, limit: int = 10) -> Dict[str, Any]:
"""Fetch list of recent conversations"""
try:
with httpx.Client(headers=self.headers, timeout=30.0) as client:
response = client.get(f"{self.api_url}/conversations?limit={limit}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch conversations: {str(e)}")

def get_conversation(self, conversation_id: str) -> Dict[str, Any]:
"""Get details about a specific conversation"""
try:
with httpx.Client(headers=self.headers, timeout=30.0) as client:
response = client.get(f"{self.api_url}/conversations/{conversation_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch conversation {conversation_id}: {str(e)}")

def send_message(self, agent_id: str, message: str, conversation_id: Optional[str] = None) -> Dict[str, Any]:
"""Send a message to an agent"""
try:
payload = {
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}

if conversation_id:
url = f"{self.api_url}/agents/{agent_id}/conversations/{conversation_id}/messages"
else:
url = f"{self.api_url}/agents/{agent_id}/messages"

with httpx.Client(headers=self.headers, timeout=30.0) as client:
response = client.post(url, json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to send message: {str(e)}")

async def list_agents_async(self) -> Dict[str, Any]:
"""Asynchronously fetch list of agents from D3 API"""
try:
async with httpx.AsyncClient(headers=self.headers, timeout=30.0) as client:
response = await client.get(f"{self.api_url}/agents")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch agents: {str(e)}")

async def get_agent_async(self, agent_id: str) -> Dict[str, Any]:
"""Asynchronously get information about a specific agent"""
try:
async with httpx.AsyncClient(headers=self.headers, timeout=30.0) as client:
response = await client.get(f"{self.api_url}/agents/{agent_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to fetch agent {agent_id}: {str(e)}")

async def send_message_async(self, agent_id: str, message: str, conversation_id: Optional[str] = None) -> Dict[str, Any]:
"""Asynchronously send a message to an agent"""
try:
payload = {
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}

if conversation_id:
url = f"{self.api_url}/agents/{agent_id}/conversations/{conversation_id}/messages"
else:
url = f"{self.api_url}/agents/{agent_id}/messages"

async with httpx.AsyncClient(headers=self.headers, timeout=30.0) as client:
response = await client.post(url, json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise D3HttpError(e.response.status_code, e.response.text)
except Exception as e:
raise D3ApiError(f"Failed to send message: {str(e)}")
39 changes: 39 additions & 0 deletions mcp/common/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import datetime
import jwt
from typing import Optional, Tuple

D3_API_URL: Optional[str] = os.getenv("D3_API_URL")
D3_API_SECRET: Optional[str] = os.getenv("D3_API_SECRET")

def load_d3_config() -> Tuple[str, str]:
"""Loads D3 API URL and Secret from environment variables, exiting if not found."""
api_url = os.getenv("D3_API_URL")
secret = os.getenv("D3_API_SECRET")

if not all([api_url, secret]):
print("Error: Required environment variables missing.")
print("Please set: D3_API_URL, D3_API_SECRET")
exit(1)

# Assertions to help the type checker
assert api_url is not None
assert secret is not None

return api_url, secret

def generate_auth_token(secret: str, user_context: str = "mcp-d3-example") -> str:
"""Generates a JWT authentication token for D3 API requests."""
payload = {
'context': {'user': user_context},
'exp': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)
}
return jwt.encode(payload, secret, algorithm='HS256')

def get_d3_headers(secret: str, user_context: str = "mcp-d3-example") -> dict[str, str]:
"""Generates the standard headers for D3 API requests."""
auth_token = generate_auth_token(secret, user_context)
return {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
}
124 changes: 124 additions & 0 deletions mcp/examples/fastmcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# FastMCP Example for Cube D3

This example demonstrates how to integrate Cube D3 with the Model Context Protocol (MCP) using FastMCP.

## Overview

The FastMCP example provides a simple MCP server that exposes Cube D3 APIs through the MCP protocol:
- Resources for accessing D3 agents and conversations
- Tools for sending messages to agents and fetching more data
- Custom prompts for analyzing agents and conversations

## Installation

Ensure you have the required dependencies:

```bash
pip install mcp-sdk fastmcp python-dotenv httpx pyjwt
```

## Configuration

This example requires the following environment variables:

```
D3_API_URL=https://your-d3-api-endpoint
D3_API_SECRET=your-d3-api-secret
```

You can set these in a `.env` file in the project directory.

## Usage

### Running the Server

To run the MCP server:

```bash
python -m mcp.examples.fastmcp
```

Or with custom host and port:

```bash
python -m mcp.examples.fastmcp --host 0.0.0.0 --port 8888
```

For debug mode:

```bash
python -m mcp.examples.fastmcp --debug
```

### Connecting to the Server

You can connect to this server using any MCP client, including:

1. The MCP Inspector:
```bash
mcp dev http://localhost:8080
```

2. Claude Desktop:
```bash
mcp install http://localhost:8080
```

3. Any other MCP-compatible client

## Example Interactions

### Accessing Resources

To get all D3 agents:
```
mcp://localhost:8080/agents
```

To get details about a specific agent:
```
mcp://localhost:8080/agents/agent-id-123
```

To get recent conversations:
```
mcp://localhost:8080/conversations
```

### Using Tools

To send a message to an agent:
```json
{
"agent_id": "agent-id-123",
"message": "Hello, what can you help me with today?"
}
```

To fetch more conversations:
```json
{
"limit": 50
}
```

### Using Prompts

To generate an agent analysis:
```
mcp://localhost:8080/prompts/agent_analysis?agent_id=agent-id-123
```

To generate a conversation analysis:
```
mcp://localhost:8080/prompts/conversation_analysis?conversation_id=conv-id-456
```

## Integration with Cube D3

This example demonstrates how MCP can be used to expose and interact with the Cube D3 platform's APIs, allowing LLM agents to:

1. Discover available D3 agents
2. Access conversation history
3. Send messages to D3 agents
4. Generate analyses of D3 agents and conversations
1 change: 1 addition & 0 deletions mcp/examples/fastmcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

4 changes: 4 additions & 0 deletions mcp/examples/fastmcp/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .agent import main

if __name__ == "__main__":
main()
Loading