Skip to content

Commit 61e305f

Browse files
committed
"thread" support for parallel workloads
1 parent 617b220 commit 61e305f

File tree

14 files changed

+313
-32
lines changed

14 files changed

+313
-32
lines changed

examples/mcp_basic_agent/mcp_agent.config.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ execution_engine: asyncio
44
logger:
55
type: console
66
level: info
7-
path: "./mcp-agent.jsonl"
7+
path: "./mcp-agent-4o-mini.jsonl"
88
batch_size: 100
99
flush_interval: 2
1010
max_queue_size: 2048
@@ -23,4 +23,5 @@ mcp:
2323

2424
openai:
2525
# Secrets (API keys, etc.) are stored in an mcp_agent.secrets.yaml file which can be gitignored
26-
default_model: o3-mini
26+
# default_model: "o3-mini"
27+
default_model: "gpt-4o-mini"

examples/workflow_parallel/main.py

-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import os
32

43
from mcp_agent.app import MCPApp
54
from mcp_agent.agents.agent import Agent
@@ -83,7 +82,6 @@ async def example_usage():
8382

8483
result = await parallel.generate_str(
8584
message=f"Student short story submission: {SHORT_STORY}",
86-
request_params=RequestParams(model="gpt-4o"),
8785
)
8886

8987
logger.info(f"{result}")

examples/workflow_parallel/mcp_agent.config.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ execution_engine: asyncio
55
logger:
66
type: console
77
level: info
8+
path: "./mcp-agent-o3-mini.jsonl"
89
batch_size: 100
910
flush_interval: 2
1011
max_queue_size: 2048
@@ -14,4 +15,4 @@ logger:
1415

1516
openai:
1617
# Secrets (API keys, etc.) are stored in an mcp_agent.secrets.yaml file which can be gitignored
17-
default_model: gpt-4o-mini
18+
default_model: "gpt-4o-mini"

examples/workflow_router/main.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async def example_usage():
8282
logger.info("Tools available:", data=result.model_dump())
8383

8484
result = await agent.call_tool(
85-
name="read_file",
85+
tool_name="read_file",
8686
arguments={
8787
"path": str(os.path.join(os.getcwd(), "mcp_agent.config.yaml"))
8888
},

examples/workflow_router/mcp_agent.config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ mcp:
2828

2929
openai:
3030
# Secrets (API keys, etc.) are stored in an mcp_agent.secrets.yaml file which can be gitignored
31-
default_model: gpt-4o
31+
default_model: "gpt-4o-mini"

scripts/event_replay.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/env python3
2+
"""Event Replay Script
3+
4+
Replays events from a JSONL log file using rich_progress display.
5+
"""
6+
7+
import json
8+
import time
9+
from pathlib import Path
10+
11+
import typer
12+
from mcp_agent.event_progress import convert_log_event
13+
from mcp_agent.logging.rich_progress import RichProgressDisplay
14+
15+
16+
def load_events(path: Path) -> list:
17+
"""Load events from JSONL file."""
18+
events = []
19+
with open(path) as f:
20+
for line in f:
21+
if line.strip():
22+
events.append(json.loads(line))
23+
return events
24+
25+
26+
def main(log_file: str):
27+
"""Replay MCP Agent events from a log file with progress display."""
28+
# Load events from file
29+
events = load_events(Path(log_file))
30+
31+
# Initialize progress display
32+
progress = RichProgressDisplay()
33+
progress.start()
34+
35+
try:
36+
# Process each event in sequence
37+
for event in events:
38+
progress_event = convert_log_event(event)
39+
if progress_event:
40+
progress.update(progress_event)
41+
# Add a small delay to make the replay visible
42+
time.sleep(0.3)
43+
except KeyboardInterrupt:
44+
pass
45+
finally:
46+
progress.stop()
47+
48+
49+
if __name__ == "__main__":
50+
typer.run(main)

scripts/sample_logs/basic-agent-4o-mini.jsonl

+63
Large diffs are not rendered by default.

scripts/sample_logs/basic-agent-o3-mini.jsonl

+101
Large diffs are not rendered by default.

scripts/sample_logs/workflow_parallel-o3-mini.jsonl

+47
Large diffs are not rendered by default.

src/mcp_agent/agents/agent.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class Agent(MCPAggregator):
3939

4040
def __init__(
4141
self,
42-
name: str,
42+
name: str, # agent name
4343
instruction: str | Callable[[Dict], str] = "You are a helpful agent.",
4444
server_names: List[str] = None,
4545
functions: List[Callable] = None,
@@ -52,6 +52,7 @@ def __init__(
5252
context=context,
5353
server_names=server_names or [],
5454
connection_persistence=connection_persistence,
55+
name=name,
5556
**kwargs,
5657
)
5758

@@ -188,6 +189,7 @@ async def list_tools(self) -> ListToolsResult:
188189

189190
return result
190191

192+
# todo would prefer to use tool_name to disambiguate agent name
191193
async def call_tool(
192194
self, name: str, arguments: dict | None = None
193195
) -> CallToolResult:

src/mcp_agent/event_progress.py

+25-12
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dataclasses import dataclass
44
from enum import Enum
5-
from typing import Any, Dict, Optional
5+
from typing import Any, Dict, Optional, Union
66
from mcp_agent.workflows.llm.llm_constants import FINAL_RESPONSE_LOG_MESSAGE
77

88

@@ -51,18 +51,30 @@ def convert_log_event(event: Dict[str, Any]) -> Optional[ProgressEvent]:
5151
"""Convert a log event to a progress event if applicable."""
5252
namespace = event.get("namespace", "")
5353
message = event.get("message", "")
54+
5455
# Extract agent name from namespace if present
5556
agent_name = None
56-
parts = namespace.split(".")
57-
if len(parts) > 3:
58-
if parts[0:3] == ["mcp_agent", "agents", "agent"]:
59-
agent_name = parts[3]
60-
elif (
61-
len(parts) > 4
62-
and parts[0:3] == ["mcp_agent", "workflows", "llm"]
63-
and parts[3].startswith("augmented_llm")
64-
):
65-
agent_name = parts[4]
57+
58+
# Known class namespaces that may have agent names
59+
CLASS_NAMESPACES = [
60+
"mcp_agent.agents.agent", # TODO: Use Agent.__module__
61+
("mcp_agent.workflows.llm", "augmented_llm"), # Matches augmented_llm_* classes
62+
"mcp_agent.mcp.mcp_aggregator", # TODO: Use Finder.__module__
63+
]
64+
65+
# Check if namespace starts with any of our class prefixes and has an additional part
66+
for class_ns in CLASS_NAMESPACES:
67+
if isinstance(class_ns, tuple):
68+
# Special case for augmented_llm_* classes
69+
base_ns, class_prefix = class_ns
70+
parts = namespace[len(base_ns) + 1 :].split(".") # +1 for the dot
71+
if len(parts) >= 2 and parts[0].startswith(class_prefix):
72+
agent_name = parts[1]
73+
break
74+
elif namespace.startswith(class_ns + "."):
75+
# Regular case - agent name is after the class namespace
76+
agent_name = namespace.split(".")[-1]
77+
break
6678

6779
# Handle MCP connection events
6880
if "mcp_connection_manager" in namespace:
@@ -90,9 +102,10 @@ def convert_log_event(event: Dict[str, Any]) -> Optional[ProgressEvent]:
90102
# Handle MCPServerAggregator tool calls
91103
if "mcp_aggregator" in namespace:
92104
tool_name = extract_from_aggregator(message)
105+
target = f"{agent_name} ({tool_name})" if agent_name else tool_name
93106
if tool_name:
94107
return ProgressEvent(
95-
ProgressAction.CALLING_TOOL, tool_name, agent_name=agent_name
108+
ProgressAction.CALLING_TOOL, target, agent_name=agent_name
96109
)
97110

98111
# Handle LLM events

src/mcp_agent/logging/rich_progress.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class RichProgressDisplay:
1313
def __init__(self, console: Optional[Console] = None):
1414
"""Initialize the progress display."""
1515
self.console = console or Console()
16-
self._taskmap = {"default": None}
16+
self._taskmap = {}
1717
self._progress = None
1818

1919
def start(self):
@@ -27,9 +27,10 @@ def start(self):
2727
console=self.console,
2828
transient=False,
2929
) as self._progress:
30-
self._taskmap["default"] = self._progress.add_task(
30+
task_id = self._progress.add_task(
3131
"mcp-agent......", total=None, target="", details=""
3232
)
33+
self._taskmap["default"] = task_id
3334

3435
self._progress.start()
3536

src/mcp_agent/mcp/mcp_aggregator.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from mcp_agent.context import Context
2323

2424

25-
logger = get_logger(__name__)
25+
logger = get_logger(__name__) # This will be replaced per-instance when agent_name is available
2626

2727
SEP = "-"
2828

@@ -77,6 +77,7 @@ def __init__(
7777
server_names: List[str],
7878
connection_persistence: bool = False,
7979
context: Optional["Context"] = None,
80+
name: str = None,
8081
**kwargs,
8182
):
8283
"""
@@ -90,9 +91,14 @@ def __init__(
9091

9192
self.server_names = server_names
9293
self.connection_persistence = connection_persistence
93-
94+
self.agent_name = name
9495
self._persistent_connection_manager: MCPConnectionManager = None
9596

97+
# Set up logger with agent name in namespace if available
98+
global logger
99+
logger_name = f"{__name__}.{name}" if name else __name__
100+
logger = get_logger(logger_name)
101+
96102
# Maps namespaced_tool_name -> namespaced tool info
97103
self._namespaced_tool_map: Dict[str, NamespacedTool] = {}
98104
# Maps server_name -> list of tools

src/mcp_agent/workflows/llm/augmented_llm_openai.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,24 @@ def __init__(self, *args, **kwargs):
6161
intelligencePriority=0.3,
6262
)
6363
# Get default model from config if available
64-
default_model = "gpt-4o" # Fallback default
64+
chosen_model = "gpt-4o" # Fallback default
65+
6566
self._reasoning_effort = "medium"
6667
if self.context and self.context.config and self.context.config.openai:
6768
if hasattr(self.context.config.openai, "default_model"):
68-
default_model = self.context.config.openai.default_model
69+
chosen_model = self.context.config.openai.default_model
6970
if hasattr(self.context.config.openai, "reasoning_effort"):
7071
self._reasoning_effort = self.context.config.openai.reasoning_effort
7172

7273
# o1 does not have tool support
73-
self._reasoning = default_model.startswith("o3")
74+
self._reasoning = chosen_model.startswith("o3")
7475
if self._reasoning:
7576
self.logger.info(
76-
f"Using reasoning model '{default_model}' with '{self._reasoning_effort}' reasoning effort"
77-
)
78-
print(
79-
f"\nUsing reasoning model [white on dark_blue]{default_model}[/white on dark_blue] with [white on dark_green]{self._reasoning_effort}[/white on dark_green] reasoning effort"
77+
f"Using reasoning model '{chosen_model}' with '{self._reasoning_effort}' reasoning effort"
8078
)
8179

8280
self.default_request_params = self.default_request_params or RequestParams(
83-
model=default_model,
81+
model=chosen_model,
8482
modelPreferences=self.model_preferences,
8583
maxTokens=4096,
8684
systemPrompt=self.instruction,

0 commit comments

Comments
 (0)