Skip to content

Support MCP #2665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/application/flow/step_node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
from .start_node import *
from .text_to_speech_step_node.impl.base_text_to_speech_node import BaseTextToSpeechNode
from .variable_assign_node import BaseVariableAssignNode
from .mcp_node import BaseMcpNode

node_list = [BaseStartStepNode, BaseChatNode, BaseSearchDatasetNode, BaseQuestionNode,
BaseConditionNode, BaseReplyNode,
BaseFunctionNodeNode, BaseFunctionLibNodeNode, BaseRerankerNode, BaseApplicationNode,
BaseDocumentExtractNode,
BaseImageUnderstandNode, BaseFormNode, BaseSpeechToTextNode, BaseTextToSpeechNode,
BaseImageGenerateNode, BaseVariableAssignNode]
BaseImageGenerateNode, BaseVariableAssignNode, BaseMcpNode]


def get_node(node_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class ChatNodeSerializer(serializers.Serializer):
error_messages=ErrMessage.dict('Model settings'))
dialogue_type = serializers.CharField(required=False, allow_blank=True, allow_null=True,
error_messages=ErrMessage.char(_("Context Type")))
mcp_enable = serializers.BooleanField(required=False,
error_messages=ErrMessage.boolean(_("Whether to enable MCP")))
mcp_servers = serializers.JSONField(required=False, error_messages=ErrMessage.list(_("MCP Server")))


class IChatNode(INode):
Expand All @@ -49,5 +52,7 @@ def execute(self, model_id, system, prompt, dialogue_number, history_chat_record
model_params_setting=None,
dialogue_type=None,
model_setting=None,
mcp_enable=False,
mcp_servers=None,
**kwargs) -> NodeResult:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
@date:2024/6/4 14:30
@desc:
"""
import asyncio
import json
import re
import time
from functools import reduce
from types import AsyncGeneratorType
from typing import List, Dict

from django.db.models import QuerySet
from langchain.schema import HumanMessage, SystemMessage
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.messages import BaseMessage, AIMessage, AIMessageChunk, ToolMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

from application.flow.i_step_node import NodeResult, INode
from application.flow.step_node.ai_chat_step_node.i_chat_node import IChatNode
Expand Down Expand Up @@ -56,6 +61,7 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
reasoning = Reasoning(model_setting.get('reasoning_content_start', '<think>'),
model_setting.get('reasoning_content_end', '</think>'))
response_reasoning_content = False

for chunk in response:
reasoning_chunk = reasoning.get_reasoning_content(chunk)
content_chunk = reasoning_chunk.get('content')
Expand Down Expand Up @@ -84,6 +90,47 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo
_write_context(node_variable, workflow_variable, node, workflow, answer, reasoning_content)



async def _yield_mcp_response(chat_model, message_list, mcp_servers):
async with MultiServerMCPClient(json.loads(mcp_servers)) as client:
agent = create_react_agent(chat_model, client.get_tools())
response = agent.astream({"messages": message_list}, stream_mode='messages')
async for chunk in response:
# if isinstance(chunk[0], ToolMessage):
# print(chunk[0])
if isinstance(chunk[0], AIMessageChunk):
yield chunk[0]

def mcp_response_generator(chat_model, message_list, mcp_servers):
loop = asyncio.new_event_loop()
try:
async_gen = _yield_mcp_response(chat_model, message_list, mcp_servers)
while True:
try:
chunk = loop.run_until_complete(anext_async(async_gen))
yield chunk
except StopAsyncIteration:
break
except Exception as e:
print(f'exception: {e}')
finally:
loop.close()

async def anext_async(agen):
return await agen.__anext__()

async def _get_mcp_response(chat_model, message_list, mcp_servers):
async with MultiServerMCPClient(json.loads(mcp_servers)) as client:
agent = create_react_agent(chat_model, client.get_tools())
response = agent.astream({"messages": message_list}, stream_mode='messages')
result = []
async for chunk in response:
# if isinstance(chunk[0], ToolMessage):
# print(chunk[0].content)
if isinstance(chunk[0], AIMessageChunk):
result.append(chunk[0])
return result

def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
"""
写入上下文数据
Expand Down Expand Up @@ -142,6 +189,8 @@ def execute(self, model_id, system, prompt, dialogue_number, history_chat_record
model_params_setting=None,
dialogue_type=None,
model_setting=None,
mcp_enable=False,
mcp_servers=None,
**kwargs) -> NodeResult:
if dialogue_type is None:
dialogue_type = 'WORKFLOW'
Expand All @@ -163,6 +212,14 @@ def execute(self, model_id, system, prompt, dialogue_number, history_chat_record
self.context['system'] = system
message_list = self.generate_message_list(system, prompt, history_message)
self.context['message_list'] = message_list

if mcp_enable and mcp_servers is not None:
r = mcp_response_generator(chat_model, message_list, mcp_servers)
return NodeResult(
{'result': r, 'chat_model': chat_model, 'message_list': message_list,
'history_message': history_message, 'question': question.content}, {},
_write_context=write_context_stream)

if stream:
r = chat_model.stream(message_list)
return NodeResult({'result': r, 'chat_model': chat_model, 'message_list': message_list,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your updated code includes several improvements and additions:

  1. Concurrency Improvements:

    • The mcp_response_generator function now uses asynchronous programming to efficiently fetch responses from multi-server Message Collaboration Platform (MCP) servers, utilizing asyncio.
    • It includes an inner _yield_mcp_response coroutine to handle streaming of AI messages.
  2. Efficiency Enhancements:

    • Removed redundant imports (BaseMessage, AIMessage) that were already included from langchain_core.messages.
    • Replaced synchronous operations with asynchronous ones, improving performance when handling multiple requests.
  3. Error Handling:

    • Added basic exception handling in the __init__ method and within the coroutine functions (_yield_mcp_response, _get_mcp_response).
    • Properly managed event loops using new_event_loop() and closing it after use.
  4. API Enhancements:

    • Provided two versions of fetching a response:
      • A simple generator-based approach using mcp_response_generator
      • An alternative that returns all results as a list at once (_get_mcp_response)
  5. Functionality Extensions:

    • Introduced new functionality for enabling MCP integration through mcp_enable and providing MCP server configurations via mcp_servers.

Potential Considerations

  • Ensure proper error handling in production environments, especially around exceptions that might occur during asynchronous operations.
  • Review the generated context data by checking output streams against expected values to verify correctness.

These changes make your code more robust, efficient, and capable of handling concurrent interactions well with MCPServers.

Expand Down
3 changes: 3 additions & 0 deletions apps/application/flow/step_node/mcp_node/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# coding=utf-8

from .impl import *
35 changes: 35 additions & 0 deletions apps/application/flow/step_node/mcp_node/i_mcp_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# coding=utf-8

from typing import Type

from rest_framework import serializers

from application.flow.i_step_node import INode, NodeResult
from common.util.field_message import ErrMessage
from django.utils.translation import gettext_lazy as _


class McpNodeSerializer(serializers.Serializer):
mcp_servers = serializers.JSONField(required=True,
error_messages=ErrMessage.char(_("Mcp servers")))

mcp_server = serializers.CharField(required=True,
error_messages=ErrMessage.char(_("Mcp server")))

mcp_tool = serializers.CharField(required=True, error_messages=ErrMessage.char(_("Mcp tool")))

tool_params = serializers.DictField(required=True,
error_messages=ErrMessage.char(_("Tool parameters")))


class IMcpNode(INode):
type = 'mcp-node'

def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return McpNodeSerializer

def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)

def execute(self, mcp_servers, mcp_server, mcp_tool, tool_params, **kwargs) -> NodeResult:
pass
3 changes: 3 additions & 0 deletions apps/application/flow/step_node/mcp_node/impl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# coding=utf-8

from .base_mcp_node import BaseMcpNode
56 changes: 56 additions & 0 deletions apps/application/flow/step_node/mcp_node/impl/base_mcp_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# coding=utf-8
import asyncio
import json
from typing import List

from langchain_mcp_adapters.client import MultiServerMCPClient

from application.flow.i_step_node import NodeResult
from application.flow.step_node.mcp_node.i_mcp_node import IMcpNode


class BaseMcpNode(IMcpNode):
def save_context(self, details, workflow_manage):
self.context['result'] = details.get('result')
self.context['tool_params'] = details.get('tool_params')
self.context['mcp_tool'] = details.get('mcp_tool')
self.answer_text = details.get('result')

def execute(self, mcp_servers, mcp_server, mcp_tool, tool_params, **kwargs) -> NodeResult:
servers = json.loads(mcp_servers)
params = self.handle_variables(tool_params)

async def call_tool(s, session, t, a):
async with MultiServerMCPClient(s) as client:
s = await client.sessions[session].call_tool(t, a)
return s

res = asyncio.run(call_tool(servers, mcp_server, mcp_tool, params))
return NodeResult({'result': [content.text for content in res.content], 'tool_params': params, 'mcp_tool': mcp_tool}, {})

def handle_variables(self, tool_params):
# 处理参数中的变量
for k, v in tool_params.items():
if type(v) == str:
tool_params[k] = self.workflow_manage.generate_prompt(tool_params[k])
if type(v) == dict:
self.handle_variables(v)
return tool_params

def get_reference_content(self, fields: List[str]):
return str(self.workflow_manage.get_reference_field(
fields[0],
fields[1:]))

def get_details(self, index: int, **kwargs):
return {
'name': self.node.properties.get('stepName'),
"index": index,
'run_time': self.context.get('run_time'),
'status': self.status,
'err_message': self.err_message,
'type': self.node.type,
'mcp_tool': self.context.get('mcp_tool'),
'tool_params': self.context.get('tool_params'),
'result': self.context.get('result'),
}
28 changes: 28 additions & 0 deletions apps/application/serializers/application_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@date:2023/11/7 10:02
@desc:
"""
import asyncio
import datetime
import hashlib
import json
Expand All @@ -23,6 +24,8 @@
from django.db.models.expressions import RawSQL
from django.http import HttpResponse
from django.template import Template, Context
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.sse import sse_client
from rest_framework import serializers, status
from rest_framework.utils.formatting import lazy_format

Expand Down Expand Up @@ -1305,3 +1308,28 @@ def edit(self, instance, with_valid=True):
application_api_key.save()
# 写入缓存
get_application_api_key(application_api_key.secret_key, False)

class McpServers(serializers.Serializer):
mcp_servers = serializers.JSONField(required=True)

def get_mcp_servers(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
servers = json.loads(self.data.get('mcp_servers'))

async def get_mcp_tools(servers):
async with MultiServerMCPClient(servers) as client:
return client.get_tools()

tools = []
for server in servers:
tools += [
{
'server': server,
'name': tool.name,
'description': tool.description,
'args_schema': tool.args_schema,
}
for tool in asyncio.run(get_mcp_tools({server: servers[server]}))]
return tools

1 change: 1 addition & 0 deletions apps/application/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
path('application/profile', views.Application.Profile.as_view(), name='application/profile'),
path('application/embed', views.Application.Embed.as_view()),
path('application/authentication', views.Application.Authentication.as_view()),
path('application/mcp_servers', views.Application.McpServers.as_view()),
path('application/<str:application_id>/publish', views.Application.Publish.as_view()),
path('application/<str:application_id>/edit_icon', views.Application.EditIcon.as_view()),
path('application/<str:application_id>/export', views.Application.Export.as_view()),
Expand Down
10 changes: 10 additions & 0 deletions apps/application/views/application_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,13 @@ def post(self, request: Request, application_id: str):
data={'application_id': application_id, 'user_id': request.user.id}).play_demo_text(request.data)
return HttpResponse(byte_data, status=200, headers={'Content-Type': 'audio/mp3',
'Content-Disposition': 'attachment; filename="abc.mp3"'})

class McpServers(APIView):
authentication_classes = [TokenAuth]

@action(methods=['GET'], detail=False)
@has_permissions(PermissionConstants.APPLICATION_READ, compare=CompareConstants.AND)
@log(menu='Application', operate="Get the MCP server tools")
def get(self, request: Request):
return result.success(ApplicationSerializer.McpServers(
data={'mcp_servers': request.query_params.get('mcp_servers')}).get_mcp_servers())
10 changes: 9 additions & 1 deletion ui/src/api/application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ const getFunctionLib: (
return get(`${prefix}/${application_id}/function_lib/${function_lib_id}`, undefined, loading)
}

const getMcpTools: (
data: any,
loading?: Ref<boolean>
) => Promise<Result<any>> = (data, loading) => {
return get(`${prefix}/mcp_servers`, data, loading)
}

const getApplicationById: (
application_id: String,
app_id: String,
Expand Down Expand Up @@ -576,5 +583,6 @@ export default {
uploadFile,
exportApplication,
importApplication,
getApplicationById
getApplicationById,
getMcpTools
}
34 changes: 34 additions & 0 deletions ui/src/components/ai-chat/ExecutionDetailDialog.vue
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,40 @@
</div>
</div>
</template>

<!-- MCP 节点 -->
<template v-if="item.type === WorkflowType.McpNode">
<div class="card-never border-r-4">
<h5 class="p-8-12">
{{ $t('views.applicationWorkflow.nodes.mcpNode.tool') }}
</h5>
<div class="p-8-12 border-t-dashed lighter">
<div class="mb-8">
<span class="color-secondary"> {{ $t('views.applicationWorkflow.nodes.mcpNode.tool') }}: </span> {{ item.mcp_tool }}
</div>
</div>
</div>
<div class="card-never border-r-4">
<h5 class="p-8-12">
{{ $t('views.applicationWorkflow.nodes.mcpNode.toolParam') }}
</h5>
<div class="p-8-12 border-t-dashed lighter">
<div v-for="(value, name) in item.tool_params" :key="name" class="mb-8">
<span class="color-secondary">{{ name }}:</span> {{ value }}
</div>
</div>
</div>
<div class="card-never border-r-4">
<h5 class="p-8-12">
{{ $t('common.param.outputParam') }}
</h5>
<div class="p-8-12 border-t-dashed lighter">
<div v-for="(f, i) in item.result" :key="i" class="mb-8">
<span class="color-secondary">result:</span> {{ f }}
</div>
</div>
</div>
</template>
</template>
<template v-else>
<div class="card-never border-r-4">
Expand Down
3 changes: 2 additions & 1 deletion ui/src/enums/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export enum WorkflowType {
FormNode = 'form-node',
TextToSpeechNode = 'text-to-speech-node',
SpeechToTextNode = 'speech-to-text-node',
ImageGenerateNode = 'image-generate-node'
ImageGenerateNode = 'image-generate-node',
McpNode = 'mcp-node',
}
8 changes: 8 additions & 0 deletions ui/src/locales/lang/en-US/views/application-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ export default {
text: 'Update the value of the global variable',
assign: 'Set Value'
},
mcpNode: {
label: 'MCP Server',
text: 'Call MCP Tools',
getToolsSuccess: 'Get Tools Successfully',
getTool: 'Get Tools',
tool: 'Tool',
toolParam: 'Tool Params'
},
imageGenerateNode: {
label: 'Image Generation',
text: 'Generate images based on provided text content',
Expand Down
Loading