Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 239 additions & 223 deletions app/core/config.py

Large diffs are not rendered by default.

1,172 changes: 449 additions & 723 deletions app/core/storage.py

Large diffs are not rendered by default.

767 changes: 767 additions & 0 deletions app/services/grok/token.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions app/services/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""MCP模块初始化"""

from app.services.mcp.server import mcp

__all__ = ["mcp"]
49 changes: 49 additions & 0 deletions app/services/mcp/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""FastMCP Server"""

from typing import Optional
from fastmcp import FastMCP
from fastmcp.server.auth.providers.jwt import StaticTokenVerifier
from app.services.mcp.tools import (
ask_grok_impl, generate_image_impl, edit_image_impl,
generate_video_impl, list_models_impl
)
from app.core.config import setting


def create_mcp_server() -> FastMCP:
api_key = setting.grok_config.get("api_key")
auth = None
if api_key:
auth = StaticTokenVerifier(tokens={api_key: {"client_id": "grok2api", "scopes": ["read"]}}, required_scopes=["read"])
return FastMCP(name="Grok2API-MCP", instructions="MCP server for Grok AI", auth=auth)


mcp = create_mcp_server()


@mcp.tool
async def ask_grok(query: str, model: str = "grok-4", system_prompt: str = None) -> str:
return await ask_grok_impl(query, model, system_prompt)


@mcp.tool
async def generate_image(prompt: str, n: int = None, size: str = None, model: str = "grok-imagine-1.0") -> str:
return await generate_image_impl(prompt, n, size, model)


@mcp.tool
async def edit_image(prompt: str, image_source: str, n: int = None, size: str = None,
model: str = "grok-imagine-1.0-edit") -> str:
return await edit_image_impl(prompt, image_source, n, size, model)


@mcp.tool
async def generate_video(prompt: str, image_url: str, aspect_ratio: str = None, video_length: int = None,
resolution: str = None, preset: str = None, model: str = "grok-imagine-1.0-video") -> str:
return await generate_video_impl(prompt, image_url, aspect_ratio, video_length, resolution, preset, model)


@mcp.tool
async def list_models() -> str:
return await list_models_impl()
112 changes: 112 additions & 0 deletions app/services/mcp/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""MCP Tools - Grok AI"""

import json
from typing import Optional
from app.services.grok.client import GrokClient
from app.core.logger import logger


async def ask_grok_impl(query: str, model: str = "grok-4", system_prompt: Optional[str] = None) -> str:
try:
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": query})
logger.info(f"[MCP] ask_grok, model={model}")
request_data = {"model": model, "messages": messages, "stream": False}
result = await GrokClient.openai_to_grok(request_data)
if isinstance(result, dict):
choices = result.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
return str(result)
except Exception as e:
logger.error(f"[MCP] ask_grok error: {e}")
raise Exception(f"处理请求失败: {e}")


async def generate_image_impl(prompt: str, n: Optional[int] = None, size: Optional[str] = None,
model: str = "grok-imagine-1.0") -> str:
try:
logger.info(f"[MCP] generate_image, model={model}, n={n}, size={size}")
request_data = {"model": model, "messages": [{"role": "user", "content": prompt}], "stream": False}
if n: request_data["n"] = n
if size: request_data["size"] = size
result = await GrokClient.openai_to_grok(request_data)
if isinstance(result, dict):
choices = result.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
return str(result)
except Exception as e:
logger.error(f"[MCP] generate_image error: {e}")
raise Exception(f"图片生成失败: {e}")


async def edit_image_impl(prompt: str, image_source: str, n: Optional[int] = None,
size: Optional[str] = None, model: str = "grok-imagine-1.0-edit") -> str:
try:
logger.info(f"[MCP] edit_image, model={model}, n={n}, size={size}")
messages = [{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image_source}},
]}]
request_data = {"model": model, "messages": messages, "stream": False}
if n: request_data["n"] = n
if size: request_data["size"] = size
result = await GrokClient.openai_to_grok(request_data)
if isinstance(result, dict):
choices = result.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
return str(result)
except Exception as e:
logger.error(f"[MCP] edit_image error: {e}")
raise Exception(f"图片编辑失败: {e}")


async def generate_video_impl(prompt: str, image_url: str, aspect_ratio: Optional[str] = None,
video_length: Optional[int] = None, resolution: Optional[str] = None,
preset: Optional[str] = None, model: str = "grok-imagine-1.0-video") -> str:
try:
logger.info(f"[MCP] generate_video, model={model}, ratio={aspect_ratio}, length={video_length}, res={resolution}, preset={preset}")
messages = [{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image_url}},
]}]
video_config = {}
if aspect_ratio: video_config["aspect_ratio"] = aspect_ratio
if video_length: video_config["video_length"] = video_length
if resolution: video_config["resolution_name"] = resolution
if preset: video_config["preset"] = preset
request_data = {"model": model, "messages": messages, "stream": False}
if video_config:
request_data["video_config"] = video_config
result = await GrokClient.openai_to_grok(request_data)
if isinstance(result, dict):
choices = result.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
return str(result)
except Exception as e:
logger.error(f"[MCP] generate_video error: {e}")
raise Exception(f"视频生成失败: {e}")


async def list_models_impl() -> str:
try:
from app.models.grok_models import Models
model_names = Models.get_all_model_names()
result = []
for name in model_names:
info = Models.get_model_info(name)
result.append({
"id": name,
"name": info.get("display_name", name),
"description": info.get("description", "")
})
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"[MCP] list_models error: {e}")
raise Exception(f"获取模型列表失败: {e}")
4 changes: 4 additions & 0 deletions app/static/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const LOCALE_MAP = {
"image_format": { title: "图片格式", desc: "生成的图片格式(url 或 base64)。" },
"video_format": { title: "视频格式", desc: "生成的视频格式(html 或 url,url 为处理后的链接)。" }
},
"mcp": {
"label": "MCP 服务",
"enabled": { title: "启用 MCP", desc: "是否启用 Model Context Protocol (MCP) 服务,支持 AI 工具调用。" }
},
"network": {
"label": "网络配置",
"timeout": { title: "请求超时", desc: "请求 Grok 服务的超时时间(秒)。" },
Expand Down
3 changes: 3 additions & 0 deletions data/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ usage_max_tokens = 1000
nsfw_max_concurrent = 10
nsfw_batch_size = 50
nsfw_max_tokens = 1000

[mcp]
enabled = true
Loading
Loading