11import logging
22from typing import Sequence
3+ import concurrent .futures
4+ import atexit
35
46import timeplus_connect
57from timeplus_connect .driver .binding import quote_identifier , format_query_value
2022)
2123logger = logging .getLogger (MCP_SERVER_NAME )
2224
25+ QUERY_EXECUTOR = concurrent .futures .ThreadPoolExecutor (max_workers = 10 )
26+ atexit .register (lambda : QUERY_EXECUTOR .shutdown (wait = True ))
27+ SELECT_QUERY_TIMEOUT_SECS = 30
28+
2329load_dotenv ()
2430
2531deps = [
3541
3642@mcp .tool ()
3743def list_databases ():
44+ """List available Timeplus databases"""
3845 logger .info ("Listing all databases" )
3946 client = create_timeplus_client ()
4047 result = client .command ("SHOW DATABASES" )
@@ -44,6 +51,7 @@ def list_databases():
4451
4552@mcp .tool ()
4653def list_tables (database : str = 'default' , like : str = None ):
54+ """List available tables/streams in the given database"""
4755 logger .info (f"Listing tables in database '{ database } '" )
4856 client = create_timeplus_client ()
4957 query = f"SHOW STREAMS FROM { quote_identifier (database )} "
@@ -109,10 +117,7 @@ def get_table_info(table):
109117 logger .info (f"Found { len (tables )} tables" )
110118 return tables
111119
112-
113- @mcp .tool ()
114- def run_sql (query : str ):
115- logger .info (f"Executing query: { query } " )
120+ def execute_query (query : str ):
116121 client = create_timeplus_client ()
117122 try :
118123 readonly = 1 if config .readonly else 0
@@ -128,7 +133,36 @@ def run_sql(query: str):
128133 return rows
129134 except Exception as err :
130135 logger .error (f"Error executing query: { err } " )
131- return f"error running query: { err } "
136+ # Return a structured dictionary rather than a string to ensure proper serialization
137+ # by the MCP protocol. String responses for errors can cause BrokenResourceError.
138+ return {"error" : str (err )}
139+
140+ @mcp .tool ()
141+ def run_sql (query : str ):
142+ """Run a query in a Timeplus database"""
143+ logger .info (f"Executing query: { query } " )
144+ try :
145+ future = QUERY_EXECUTOR .submit (execute_query , query )
146+ try :
147+ result = future .result (timeout = SELECT_QUERY_TIMEOUT_SECS )
148+ # Check if we received an error structure from execute_query
149+ if isinstance (result , dict ) and "error" in result :
150+ logger .warning (f"Query failed: { result ['error' ]} " )
151+ # MCP requires structured responses; string error messages can cause
152+ # serialization issues leading to BrokenResourceError
153+ return {"status" : "error" , "message" : f"Query failed: { result ['error' ]} " }
154+ return result
155+ except concurrent .futures .TimeoutError :
156+ logger .warning (f"Query timed out after { SELECT_QUERY_TIMEOUT_SECS } seconds: { query } " )
157+ future .cancel ()
158+ # Return a properly structured response for timeout errors
159+ return {"status" : "error" , "message" : f"Query timed out after { SELECT_QUERY_TIMEOUT_SECS } seconds" }
160+ except Exception as e :
161+ logger .error (f"Unexpected error in run_select_query: { str (e )} " )
162+ # Catch all other exceptions and return them in a structured format
163+ # to prevent MCP serialization failures
164+ return {"status" : "error" , "message" : f"Unexpected error: { str (e )} " }
165+
132166
133167@mcp .prompt ()
134168def generate_sql (requirements : str ) -> str :
0 commit comments