Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acp/acp_weather_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"langchain-openai>=0.3.7",
"openinference-instrumentation-langchain>=0.1.36",
"pydantic-settings>=2.8.1",
"langchain-mcp-adapters>=0.0.10",
"langchain-mcp-adapters>=0.1.0",
"python-keycloak>=5.5.1",
]

Expand Down
46 changes: 31 additions & 15 deletions acp/acp_weather_service/src/acp_weather_service/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_token() -> str:
{"name": "LLM_API_BASE", "description": "Base URL for OpenAI-compatible API endpoint"},
{"name": "LLM_API_KEY", "description": "API key for OpenAI-compatible API endpoint"},
{"name": "MCP_URL", "description": "MCP Server URL for the weather tool"},
{"name": "ACP_MCP_TRANSPORT", "description": "MCP transport type: sse, stdio, streamable_http, websocket (defaults to 'sse')"},
],
ui={"type": "hands-off", "user_greeting": "Ask me about the weather"},
examples={
Expand Down Expand Up @@ -120,22 +121,37 @@ async def acp_weather_service(input: list[Message]) -> AsyncIterator:

try:
output = None
async with get_mcpclient() as mcpclient:
graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
yield {
"message": "\n".join(
f"🚶‍♂️{key}: {str(value)[:100] + '...' if len(str(value)) > 100 else str(value)}"
for key, value in event.items()
)
+ "\n"
}
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")
yield MessagePart(content=str(output))
# Test MCP connection first
logger.info(f'Attempting to connect to MCP server at: {os.getenv("MCP_URL", "http://localhost:8000/sse")}')

mcpclient = get_mcpclient()

# Try to get tools to verify connection
try:
tools = await mcpclient.get_tools()
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
yield MessagePart(content=f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}")
return

graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
yield {
"message": "\n".join(
f"🚶‍♂️{key}: {str(value)[:100] + '...' if len(str(value)) > 100 else str(value)}"
for key, value in event.items()
)
+ "\n"
}
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")
yield MessagePart(content=str(output))
except Exception as e:
raise Exception(f"An error occurred while running the graph: {e}")
logger.error(f'Graph execution error: {e}')
yield MessagePart(content=f"Error: Failed to process weather request. {str(e)}")
raise ACPError(Error(code=ErrorCode.SERVER_ERROR, message=str(e)))


def run():
Expand Down
27 changes: 15 additions & 12 deletions acp/acp_weather_service/src/acp_weather_service/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def get_mcpclient():
return MultiServerMCPClient({
"math": {
"url": os.getenv("MCP_URL", "http://localhost:8000/sse"),
"transport": "sse",
"transport": os.getenv("ACP_MCP_TRANSPORT", "sse"),
}
})

Expand All @@ -27,7 +27,10 @@ async def get_graph(client) -> StateGraph:
openai_api_base=config.llm_api_base,
temperature=0,
)
llm_with_tools = llm.bind_tools(client.get_tools())

# Get tools asynchronously
tools = await client.get_tools()
llm_with_tools = llm.bind_tools(tools)

# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with providing weather information. You must use the provided tools to complete your task.")
Expand All @@ -46,7 +49,7 @@ def assistant(state: ExtendedMessagesState) -> ExtendedMessagesState:
# Build graph
builder = StateGraph(ExtendedMessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(client.get_tools()))
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
Expand All @@ -60,15 +63,15 @@ def assistant(state: ExtendedMessagesState) -> ExtendedMessagesState:

# async def main():
# from langchain_core.messages import HumanMessage
# async with get_mcpclient() as client:
# graph = await get_graph(client)
# messages = [HumanMessage(content="how is the weather in NY today?")]
# async for event in graph.astream_events({"messages": messages}, stream_mode="updates"):
# print(event)
# output = event
# output = output.get("data",{}).get("output",{}).get("final_answer")
# print(f">>> {output}")
# client = get_mcpclient()
# graph = await get_graph(client)
# messages = [HumanMessage(content="how is the weather in NY today?")]
# async for event in graph.astream({"messages": messages}, stream_mode="updates"):
# print(event)
# output = event
# output = output.get("assistant", {}).get("final_answer")
# print(f">>> {output}")

# if __name__ == "__main__":
# import asyncio
# asyncio.run(main())
# asyncio.run(main())
Loading