1- from typing import Any
1+ from contextlib import AsyncExitStack
2+ from typing import TYPE_CHECKING , Any
23
4+ import anyio
5+ import mcp .types as types
36from mcp .server .lowlevel .server import (
47 LifespanResultT ,
58 NotificationOptions ,
912 Server as _Server ,
1013)
1114from mcp .server .models import InitializationOptions
15+ from mcp .server .session import ServerSession
16+ from mcp .shared .session import RequestResponder
17+
18+ if TYPE_CHECKING :
19+ from fastmcp .server .middleware .middleware import MiddlewareContext
20+ from fastmcp .server .server import FastMCP
21+
22+
23+ class MiddlewareExposedServerSession (ServerSession ):
24+ """ServerSession that routes initialization requests through FastMCP middleware."""
25+
26+ def __init__ (self , fastmcp_server : "FastMCP" , * args , ** kwargs ):
27+ super ().__init__ (* args , ** kwargs )
28+ self .fastmcp_server = fastmcp_server
29+
30+ async def _received_request (
31+ self , responder : RequestResponder [types .ClientRequest , types .ServerResult ]
32+ ):
33+ # Check if this is an initialization request and if middleware should handle it
34+ if (
35+ isinstance (responder .request .root , types .InitializeRequest )
36+ and self .fastmcp_server
37+ and hasattr (self .fastmcp_server , "_apply_middleware" )
38+ ):
39+ # Import here to avoid circular imports
40+ from fastmcp .server .middleware .middleware import MiddlewareContext
41+
42+ # HACK: Pass session object directly to middleware context for proof-of-concept
43+ context = MiddlewareContext (
44+ message = responder .request .root .params ,
45+ method = "initialize" ,
46+ type = "request" ,
47+ source = "client" ,
48+ session = self , # Pass session so middleware can store data on it
49+ )
50+
51+ # Create a continuation that calls the original initialization handler
52+ async def call_original_handler (
53+ ctx : "MiddlewareContext" ,
54+ ) -> types .InitializeResult :
55+ # Call the original handler by continuing to the parent implementation
56+ await super (MiddlewareExposedServerSession , self )._received_request (
57+ responder
58+ )
59+ # The response will be handled by the parent, we just need to extract the result
60+ # This is a bit tricky since the parent handles the response internally
61+ # For now, we'll call the parent and assume it handles the response correctly
62+ return None # Parent handles the actual response
63+
64+ # Apply middleware chain, but still let parent handle the actual response
65+ try :
66+ await self .fastmcp_server ._apply_middleware (
67+ context , call_original_handler
68+ )
69+ except Exception :
70+ # If middleware fails, fall back to original handling
71+ await super ()._received_request (responder )
72+ else :
73+ # For non-initialization requests or when no middleware, use original handling
74+ await super ()._received_request (responder )
1275
1376
1477class LowLevelServer (_Server [LifespanResultT , RequestT ]):
@@ -20,6 +83,8 @@ def __init__(self, *args, **kwargs):
2083 resources_changed = True ,
2184 tools_changed = True ,
2285 )
86+ # Reference to FastMCP server for middleware integration
87+ self .fastmcp_server : FastMCP | None = None
2388
2489 def create_initialization_options (
2590 self ,
@@ -35,3 +100,46 @@ def create_initialization_options(
35100 experimental_capabilities = experimental_capabilities ,
36101 ** kwargs ,
37102 )
103+
104+ async def run (
105+ self ,
106+ read_stream ,
107+ write_stream ,
108+ initialization_options ,
109+ raise_exceptions = False ,
110+ stateless = False ,
111+ ):
112+ """Override run to use MiddlewareExposedServerSession when fastmcp_server is available."""
113+ async with AsyncExitStack () as stack :
114+ lifespan_context = await stack .enter_async_context (self .lifespan (self ))
115+
116+ # Use MiddlewareExposedServerSession if we have a FastMCP server, otherwise use default
117+ if self .fastmcp_server :
118+ session = await stack .enter_async_context (
119+ MiddlewareExposedServerSession (
120+ self .fastmcp_server ,
121+ read_stream ,
122+ write_stream ,
123+ initialization_options ,
124+ stateless = stateless ,
125+ )
126+ )
127+ else :
128+ session = await stack .enter_async_context (
129+ ServerSession (
130+ read_stream ,
131+ write_stream ,
132+ initialization_options ,
133+ stateless = stateless ,
134+ )
135+ )
136+
137+ async with anyio .create_task_group () as tg :
138+ async for message in session .incoming_messages :
139+ tg .start_soon (
140+ self ._handle_message ,
141+ message ,
142+ session ,
143+ lifespan_context ,
144+ raise_exceptions ,
145+ )
0 commit comments