Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .mcp.json

This file was deleted.

16 changes: 15 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
## Development

- Package manager: `uv`
- Testing framework: `pytest` (run with `uv run pytest`)
- Testing framework: `basedpyright` (run with `uvx basedpyright src`)
- Linter: `ruff` (run with `uv run ruff check .`)
- Formatter: `ruff` (run with `uv run ruff format .`)
- Type checks: `pyright` (run with `uv run pyright`)
- Python version: 3.14
- Strict typing is required

## Engineering Rules

- Organize code by functionality
- Organize related code/modules into packages
- Keep things DRY

## Issue Tracking with bd (beads)

**IMPORTANT**: This project uses **bd (beads)** for ALL issue tracking. Do NOT use markdown TODOs, task lists, or other tracking methods.
Expand All @@ -20,23 +28,27 @@
### Quick Start

**Check for ready work:**

```bash
bd ready --json
```

**Create new issues:**

```bash
bd create "Issue title" -t bug|feature|task -p 0-4 --json
bd create "Issue title" -p 1 --deps discovered-from:bd-123 --json
```

**Claim and update:**

```bash
bd update bd-42 --status in_progress --json
bd update bd-42 --priority 1 --json
```

**Complete work:**

```bash
bd close bd-42 --reason "Completed" --json
```
Expand Down Expand Up @@ -70,6 +82,7 @@ bd close bd-42 --reason "Completed" --json
### Auto-Sync

bd automatically syncs with git:

- Exports to `.beads/issues.jsonl` after changes (5s debounce)
- Imports from JSONL when newer (e.g., after `git pull`)
- No manual export/import needed!
Expand All @@ -83,6 +96,7 @@ pip install beads-mcp
```

Add to MCP config (e.g., `~/.config/claude/config.json`):

```json
{
"beads": {
Expand Down
153 changes: 125 additions & 28 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Agent state automatically persists to Redis, surviving restarts without manual s
```python
agent = Agent("my_agent", capabilities=["chat"])
await agent.start()
await agent.send("other_agent", {"hello": "world"})
await agent.send("other_agent", "greeting.message", {"hello": "world"})
```

### 5. Redis as Single Dependency
Expand Down Expand Up @@ -118,8 +118,20 @@ stop()
async def _message_loop(self):
async for message in self._pubsub.listen():
if message["type"] == "message":
msg = AgentMessage.parse(message["data"])
await self.on_message(msg) # User hook
msg = AgentMessage.model_validate_json(message["data"])
msg.attach_agent(self) # For reply() convenience

# Check if reply to pending request
if msg.meta.is_reply:
correlation_id = msg.meta.correlation_id
if correlation_id in self._pending_requests:
future = self._pending_requests.pop(correlation_id)
future.set_result(msg)
continue

# Try decorator-based dispatch first
await self._dispatch_typed(msg)
# Falls back to on_message() if no handler registered
```

**Heartbeat Loop** (`_heartbeat_loop`):
Expand Down Expand Up @@ -241,47 +253,87 @@ await redis.delete(f"agent.state:{agent_id}")

### 5. Protocol (`src/mas/protocol.py`)

**Purpose**: Define message formats.
**Purpose**: Define strongly-typed message formats.

**AgentMessage**:
**EnvelopeMessage** (aliased as `AgentMessage`):
```python
class AgentMessage(BaseModel):
class EnvelopeMessage(BaseModel):
sender_id: str
target_id: str
payload: dict
message_type: str # Message type identifier
data: dict[str, Any] # Business payload
meta: MessageMeta # Transport metadata
timestamp: float = time.time()
message_id: str = str(time.time_ns())

# Convenience properties
payload: dict # Alias for data (backward compatibility)
expects_reply: bool # Convenience for meta.expects_reply
is_reply: bool # Convenience for meta.is_reply

async def reply(message_type: str, payload: dict) -> None:
"""Reply to this message using correlation metadata"""
```

**Message**: Generic envelope (less commonly used):
**MessageMeta**:
```python
class Message(BaseModel):
sender_id: str
target_id: str
message_type: MessageType
payload: dict
class MessageMeta(BaseModel):
version: int = 1
correlation_id: Optional[str] = None # For request-response
expects_reply: bool = False
is_reply: bool = False
```

**MessageType**: Type alias for string identifiers
```python
MessageType: TypeAlias = str
```

**Decorator-Based Handlers**:
```python
class MyAgent(Agent):
@Agent.on("task.process", model=TaskRequest)
async def handle_task(self, message: AgentMessage, payload: TaskRequest):
"""Handler registered for 'task.process' message type"""
# payload is validated TaskRequest Pydantic model
await message.reply("task.complete", {"status": "done"})

@Agent.on("status.check")
async def handle_status(self, message: AgentMessage, payload: None):
"""Handler without payload model"""
await message.reply("status.response", {"status": "healthy"})

async def on_message(self, message: AgentMessage):
"""Fallback for unhandled message types"""
pass
```

## Message Flow

### Peer-to-Peer Send

```
1. Agent A calls send("agent_b", payload)
1. Agent A calls send("agent_b", "task.message", {"data": "hello"})
2. Create AgentMessage
2. Create EnvelopeMessage
sender_id: "agent_a"
target_id: "agent_b"
payload: {...}
message_type: "task.message"
data: {"data": "hello"}
meta: MessageMeta(...)
3. Publish to Redis channel: agent.agent_b
await redis.publish("agent.agent_b", message.json())
await redis.publish("agent.agent_b", message.model_dump_json())
4. Agent B's pubsub listener receives
5. Agent B's _message_loop parses message
6. Agent B's on_message(message) called
6. Agent B's _dispatch_typed() checks for registered handlers
7a. If handler found: Call @Agent.on() handler with validated payload
OR
7b. If no handler: Call on_message() fallback
```

### System Messages (Registration)
Expand Down Expand Up @@ -321,6 +373,7 @@ class Message(BaseModel):
]
5. Agent A can now send() to discovered agents
await agent.send("agent_b", "task.message", {"task": "analyze"})
```

## State Management
Expand Down Expand Up @@ -514,7 +567,7 @@ HGETALL agent.state:my_agent
**Agent Channel** (`agent.{id}`):
- Each agent subscribes to its own channel
- Other agents publish messages here
- Format: JSON-encoded AgentMessage
- Format: JSON-encoded EnvelopeMessage with message_type, data, and meta fields

**System Channel** (`mas.system`):
- MAS Service subscribes
Expand Down Expand Up @@ -657,7 +710,7 @@ All I/O operations are async:
await agent.start()

# Message send
await agent.send(target_id, payload)
await agent.send(target_id, "task.message", {"task": "process"})

# State update
await agent.update_state(changes)
Expand All @@ -680,8 +733,20 @@ self._tasks = [
```python
async for message in self._pubsub.listen():
try:
msg = AgentMessage.parse(message["data"])
await self.on_message(msg)
msg = AgentMessage.model_validate_json(message["data"])
msg.attach_agent(self)

# Handle replies to pending requests
if msg.meta.is_reply:
correlation_id = msg.meta.correlation_id
if correlation_id in self._pending_requests:
future = self._pending_requests.pop(correlation_id)
future.set_result(msg)
continue

# Try decorator-based dispatch
await self._dispatch_typed(msg)
# Falls back to on_message() if no handler
except Exception as e:
logger.error("Failed to handle message", exc_info=e)
# Continue processing other messages
Expand Down Expand Up @@ -727,9 +792,11 @@ class ReceiverAgent(Agent):
# In test
agent = ReceiverAgent("test_agent")
await agent.start()
await other_agent.send("test_agent", {"data": "test"})
await other_agent.send("test_agent", "test.message", {"data": "test"})
await asyncio.wait_for(agent.message_event.wait(), timeout=2.0)
assert len(agent.messages) == 1
assert agent.messages[0].message_type == "test.message"
assert agent.messages[0].data["data"] == "test"
```

### Logging
Expand Down Expand Up @@ -760,17 +827,26 @@ logger.error(
### Type Safety

**Pydantic Models**:
- `AgentMessage`: Message validation
- `Message`: Generic message envelope
- `BaseModel`: State models (optional)
- `EnvelopeMessage` (aliased as `AgentMessage`): Strongly-typed message envelope with message_type and data separation
- `MessageMeta`: Transport metadata (correlation ID, reply flags, version)
- `BaseModel`: State models (optional) and message payload models for decorator handlers

**Type Hints**:
```python
async def send(self, target_id: str, payload: dict) -> None:
async def send(self, target_id: str, message_type: str, data: dict) -> None:
async def request(self, target_id: str, message_type: str, data: dict, timeout: float = 30.0) -> AgentMessage:
async def discover(self, capabilities: list[str] | None = None) -> list[dict]:
async def update_state(self, updates: dict) -> None:
```

**Decorator-Based Handlers**:
```python
@Agent.on("message.type", model=PayloadModel)
async def handler(self, message: AgentMessage, payload: PayloadModel) -> None:
# Type-safe handler with validated payload
pass
```

**Pyright Configuration**:
```toml
[tool.pyright]
Expand All @@ -788,7 +864,28 @@ from mas import Agent

agent = Agent("my_agent", capabilities=["nlp"])
await agent.start()
await agent.send("target", payload)
await agent.send("target", "task.message", {"task": "process"})
```

### Decorator-Based Message Handling

```python
from pydantic import BaseModel
from mas import Agent, AgentMessage

class TaskRequest(BaseModel):
task_id: str
priority: int = 1

class MyAgent(Agent):
@Agent.on("task.process", model=TaskRequest)
async def handle_task(self, message: AgentMessage, payload: TaskRequest):
"""Type-safe handler with validated payload"""
await message.reply("task.complete", {"status": "done"})

async def on_message(self, message: AgentMessage):
"""Fallback for unhandled message types"""
pass
```

### Agent Characteristics
Expand Down
12 changes: 10 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@


## Development

- Package manager: `uv`
- Testing framework: `pytest` (run with `uv run pytest`)
- Testing framework: `basedpyright` (run with `uvx basedpyright src`)
- Linter: `ruff` (run with `uv run ruff check .`)
- Formatter: `ruff` (run with `uv run ruff format .`)
- Type checks: `pyright` (run with `uv run pyright`)
- Python version: 3.14
- Strict typing is required
- Strict typing is required

## Engineering Rules

- Organize code by functionality
- Organize related code/modules into packages
- Keep things DRY
Loading