Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ An MCP server for Timeplus.

### Tools

* `run_select_query`
* `run_sql`
- Execute SQL queries on your Timeplus cluster.
- Input: `sql` (string): The SQL query to execute.
- By default, all Timeplus queries are run with `readonly = 1` to ensure they are safe. If you want to run DDL or DML queries, you can set the environment variable `TIMEPLUS_READ_ONLY` to `false`.
Expand All @@ -21,6 +21,17 @@ An MCP server for Timeplus.
- List all tables in a database.
- Input: `database` (string): The name of the database.

* `list_kafka_topics`
- List all topics in a Kafka cluster

* `explore_kafka_topic`
- Show some messages in the Kafka topic
- Input: `topic` (string): The name of the topic. `message_count` (int): The number of messages to show, default to 1.

* `create_kafka_stream`
- Setup a streaming ETL in Timeplus to save the Kafka messages locally
- Input: `topic` (string): The name of the topic.

## Configuration

First, ensure you have the `uv` executable installed. If not, you can install it by following the instructions [here](https://docs.astral.sh/uv/).
Expand All @@ -46,7 +57,8 @@ First, ensure you have the `uv` executable installed. If not, you can install it
"TIMEPLUS_VERIFY": "true",
"TIMEPLUS_CONNECT_TIMEOUT": "30",
"TIMEPLUS_SEND_RECEIVE_TIMEOUT": "30",
"TIMEPLUS_READ_ONLY": "true"
"TIMEPLUS_READ_ONLY": "false",
"TIMEPLUS_KAFKA_CONFIG": "{\"bootstrap.servers\":\"a.aivencloud.com:28864\", \"sasl.mechanism\":\"SCRAM-SHA-256\",\"sasl.username\":\"avnadmin\", \"sasl.password\":\"thePassword\",\"security.protocol\":\"SASL_SSL\",\"enable.ssl.certificate.verification\":\"false\"}"
}
}
}
Expand Down Expand Up @@ -74,12 +86,13 @@ TIMEPLUS_SECURE=false
TIMEPLUS_VERIFY=true
TIMEPLUS_CONNECT_TIMEOUT=30
TIMEPLUS_SEND_RECEIVE_TIMEOUT=30
TIMEPLUS_READ_ONLY=true
TIMEPLUS_READ_ONLY=false
TIMEPLUS_KAFKA_CONFIG={"bootstrap.servers":"a.aivencloud.com:28864", "sasl.mechanism":"SCRAM-SHA-256","sasl.username":"avnadmin", "sasl.password":"thePassword","security.protocol":"SASL_SSL","enable.ssl.certificate.verification":"false"}
```

3. Run `uv sync` to install the dependencies. Then do `source .venv/bin/activate`.

4. For easy testing, you can run `fastmcp dev mcp_timeplus/mcp_server.py` to start the MCP server. Click the "Connect" button to connect the UI with the MCP server, then switch to the "Tools" tab to run the available tools: list_databases, list_tables, run_selected_query.
4. For easy testing, you can run `fastmcp dev mcp_timeplus/mcp_server.py` to start the MCP server. Click the "Connect" button to connect the UI with the MCP server, then switch to the "Tools" tab to run the available tools.

### Environment Variables

Expand Down Expand Up @@ -112,3 +125,4 @@ The following environment variables are used to configure the Timeplus connectio
* `TIMEPLUS_READ_ONLY`: Enable/disable read-only mode
- Default: `"true"`
- Set to `"false"` to enable DDL/DML
* `TIMEPLUS_KAFKA_CONFIG`: A JSON string for the Kafka configuration. Please refer to [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) or take the above example as a reference.
10 changes: 8 additions & 2 deletions mcp_timeplus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
create_timeplus_client,
list_databases,
list_tables,
run_select_query,
run_sql,
list_kafka_topics,
explore_kafka_topic,
create_kafka_stream,
)

__all__ = [
"list_databases",
"list_tables",
"run_select_query",
"run_sql",
"create_timeplus_client",
"list_kafka_topics",
"explore_kafka_topic",
"create_kafka_stream",
]
61 changes: 58 additions & 3 deletions mcp_timeplus/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
MCP_SERVER_NAME = "mcp-timeplus"
from mcp_timeplus.mcp_env import config

import json, os, time
from confluent_kafka.admin import (AdminClient)
from confluent_kafka import Consumer

# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
Expand All @@ -21,6 +25,7 @@
"timeplus-connect",
"python-dotenv",
"uvicorn",
"confluent-kafka",
]

mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)
Expand All @@ -36,7 +41,7 @@ def list_databases():


@mcp.tool()
def list_tables(database: str, like: str = None):
def list_tables(database: str = 'default', like: str = None):
logger.info(f"Listing tables in database '{database}'")
client = create_timeplus_client()
query = f"SHOW STREAMS FROM {quote_identifier(database)}"
Expand Down Expand Up @@ -104,8 +109,8 @@ def get_table_info(table):


@mcp.tool()
def run_select_query(query: str):
logger.info(f"Executing SELECT query: {query}")
def run_sql(query: str):
logger.info(f"Executing query: {query}")
client = create_timeplus_client()
try:
readonly = 1 if config.readonly else 0
Expand All @@ -123,6 +128,56 @@ def run_select_query(query: str):
logger.error(f"Error executing query: {err}")
return f"error running query: {err}"

@mcp.tool()
def list_kafka_topics():
logger.info("Listing all topics in the Kafka cluster")
admin_client = AdminClient(json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG']))
topics = admin_client.list_topics(timeout=10).topics
topics_array = []
for topic, detail in topics.items():
topic_info = {"topic": topic, "partitions": len(detail.partitions)}
topics_array.append(topic_info)
return topics_array

@mcp.tool()
def explore_kafka_topic(topic: str, message_count: int = 1):
logger.info(f"Consuming topic {topic}")
conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])
conf['group.id'] = f"mcp-{time.time()}"
client = Consumer(conf)
client.subscribe([topic])
messages = []
for i in range(message_count):
logger.info(f"Consuming message {i+1}")
message = client.poll()
if message is None:
logger.info("No message received")
continue
if message.error():
logger.error(f"Error consuming message: {message.error()}")
continue
else:
logger.info(f"Received message {i+1}")
messages.append(json.loads(message.value()))
client.close()
return messages

@mcp.tool()
def create_kafka_stream(topic: str):
logger.info(f"Creating Kafka externalstream for topic {topic}")
conf = json.loads(os.environ['TIMEPLUS_KAFKA_CONFIG'])
ext_stream=f"ext_stream_{topic}"
sql=f"""CREATE EXTERNAL STREAM {ext_stream} (raw string)
SETTINGS type='kafka',brokers='{conf['bootstrap.servers']}',topic='{topic}',security_protocol='{conf['security.protocol']}',sasl_mechanism='{conf['sasl.mechanism']}',username='{conf['sasl.username']}',password='{conf['sasl.password']}',skip_ssl_cert_check=true
"""
run_sql(sql)
logger.info("External Stream created")

sql=f"CREATE MATERIALIZED VIEW {topic} AS SELECT raw from {ext_stream}"
run_sql(sql)
logger.info("MATERIALIZED VIEW created")

return f"Materialized the Kafka data as {topic}"

def create_timeplus_client():
client_config = config.get_client_config()
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-timeplus"
version = "0.1.2"
version = "0.1.3"
description = "An MCP server for Timeplus."
readme = "README.md"
license = "Apache-2.0"
Expand All @@ -12,6 +12,7 @@ dependencies = [
"fastmcp>=0.4.0",
"uvicorn>=0.34.0",
"timeplus-connect>=0.8.14",
"confluent-kafka",
]

[project.scripts]
Expand Down
6 changes: 3 additions & 3 deletions tests/test_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dotenv import load_dotenv

from mcp_timeplus import create_timeplus_client, list_databases, list_tables, run_select_query
from mcp_timeplus import create_timeplus_client, list_databases, list_tables, run_sql

load_dotenv()

Expand Down Expand Up @@ -60,7 +60,7 @@ def test_list_tables_with_like(self):
def test_run_select_query_success(self):
"""Test running a SELECT query successfully."""
query = f"SELECT * FROM {self.test_db}.{self.test_table}"
result = run_select_query(query)
result = run_sql(query)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["id"], 1)
Expand All @@ -69,7 +69,7 @@ def test_run_select_query_success(self):
def test_run_select_query_failure(self):
"""Test running a SELECT query with an error."""
query = f"SELECT * FROM {self.test_db}.non_existent_table"
result = run_select_query(query)
result = run_sql(query)
self.assertIsInstance(result, str)
self.assertIn("error running query", result)

Expand Down
17 changes: 16 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading