Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CNAME
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hypmcp.talkincode.net
166 changes: 164 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

from dotenv import load_dotenv
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
from pydantic import ValidationError as PydanticValidationError

from services.hyperliquid_services import HyperliquidServices
from services.validators import ValidationError, validate_order_inputs
from services.validators import ValidationError, validate_coin, validate_order_inputs

# Load environment variables
load_dotenv()
Expand Down Expand Up @@ -84,6 +85,53 @@ def initialize_service():
logger.info(f"Service initialized for account: {account_info}")


class CandlesSnapshotParams(BaseModel):
"""Bulk candles snapshot request parameters"""

coins: list[str] = Field(..., min_length=1, description="List of trading pairs")
interval: str = Field(
..., description="Candlestick interval supported by HyperLiquid"
)
start_time: int | None = Field(
default=None,
description="Start timestamp in milliseconds",
)
end_time: int | None = Field(
default=None,
description="End timestamp in milliseconds",
)
days: int | None = Field(
default=None,
gt=0,
description="Fetch recent N days (mutually exclusive with start/end)",
)
limit: int | None = Field(
default=None,
gt=0,
le=5000,
description="Maximum number of candles per coin (latest N records)",
)

@model_validator(mode="after")
def validate_time_params(self):
if self.days is not None and (
self.start_time is not None or self.end_time is not None
):
raise ValueError("days cannot be used together with start_time or end_time")

if self.days is None and self.start_time is None:
raise ValueError("start_time is required when days is not provided")

if (
self.start_time is not None
and self.end_time is not None
and self.start_time >= self.end_time
):
raise ValueError("start_time must be less than end_time")

return self


# Account Management Tools


Expand Down Expand Up @@ -369,6 +417,103 @@ async def get_orderbook(coin: str, depth: int = 20) -> dict[str, Any]:
return await hyperliquid_service.get_orderbook(coin, depth)


@mcp.tool
async def get_candles_snapshot(
coins: list[str],
interval: str,
start_time: int | None = None,
end_time: int | None = None,
days: int | None = None,
limit: int | None = None,
) -> dict[str, Any]:
"""
Fetch candlestick (OHLCV) data for multiple coins in one request

Args:
coins: List of trading pairs (e.g., ["BTC", "ETH"])
interval: Candlestick interval supported by HyperLiquid (e.g., "1m", "1h")
start_time: Start timestamp in milliseconds (required when days not provided)
end_time: End timestamp in milliseconds (defaults to now when omitted)
days: Number of recent days to fetch (mutually exclusive with start/end)
limit: Optional max number of candles per coin (latest N samples)
"""

initialize_service()

try:
params = CandlesSnapshotParams(
coins=coins,
interval=interval,
start_time=start_time,
end_time=end_time,
days=days,
limit=limit,
)
except PydanticValidationError as validation_error:
return {
"success": False,
"error": f"Invalid input: {validation_error.errors()}",
"error_code": "VALIDATION_ERROR",
}
except ValueError as validation_error:
return {
"success": False,
"error": f"Invalid input: {str(validation_error)}",
"error_code": "VALIDATION_ERROR",
}

# Validate each coin using existing validator for consistency
for coin in params.coins:
try:
validate_coin(coin)
except ValidationError as validation_error:
return {
"success": False,
"error": f"Invalid input: {str(validation_error)}",
"error_code": "VALIDATION_ERROR",
}

service_result = await hyperliquid_service.get_candles_snapshot_bulk(
coins=params.coins,
interval=params.interval,
start_time=params.start_time,
end_time=params.end_time,
days=params.days,
)

if not service_result.get("success"):
return service_result

candles_data = service_result.get("data", {})
applied_limit = params.limit or None

if applied_limit is not None:
limited_data = {}
for coin, candles in candles_data.items():
if not isinstance(candles, list):
limited_data[coin] = candles
continue
limited_data[coin] = candles[-applied_limit:]
candles_data = limited_data

response: dict[str, Any] = {
"success": True,
"data": candles_data,
"interval": service_result.get("interval"),
"start_time": service_result.get("start_time"),
"end_time": service_result.get("end_time"),
"requested_coins": params.coins,
}

if applied_limit is not None:
response["limit_per_coin"] = applied_limit

if service_result.get("coin_errors"):
response["coin_errors"] = service_result["coin_errors"]

return response


@mcp.tool
async def get_funding_history(coin: str, days: int = 7) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -636,6 +781,23 @@ def start_server():
)
logger.info(f"Logs will be written to: {log_path}")

# Log all registered tools BEFORE starting server
if hasattr(mcp, "_tool_manager") and hasattr(mcp._tool_manager, "_tools"):
tools_dict = mcp._tool_manager._tools
tool_names = sorted(tools_dict.keys())

print("\n" + "=" * 60)
print(f"✅ {len(tool_names)} MCP Tools Registered:")
print("=" * 60)

for i, tool_name in enumerate(tool_names, 1):
marker = "🆕" if tool_name == "get_candles_snapshot" else " "
print(f"{marker} {i:2d}. {tool_name}")

print("=" * 60 + "\n")
else:
print("\n⚠️ Cannot verify tool registration\n")

asyncio.run(run_as_server())
except Exception as e:
logger.error(f"Failed to start server: {e}")
Expand Down
131 changes: 131 additions & 0 deletions services/hyperliquid_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,137 @@ async def get_orderbook(self, coin: str, depth: int = 20) -> dict[str, Any]:
)
return {"success": False, "error": str(e)}

async def get_candles_snapshot_bulk(
self,
coins: list[str],
interval: str,
start_time: int | None = None,
end_time: int | None = None,
days: int | None = None,
) -> dict[str, Any]:
"""
Retrieve candlestick data for multiple coins in a single call

Args:
coins: List of trading pairs (e.g., ["BTC", "ETH"])
interval: Candlestick interval string accepted by HyperLiquid
start_time: Optional start timestamp (ms). Required when days is None.
end_time: Optional end timestamp (ms). Defaults to current time when omitted.
days: Optional number of recent days to fetch. Mutually exclusive with start/end.
"""

try:
if not isinstance(coins, list) or not coins:
raise ValueError("coins must be a non-empty list of strings")

normalized_coins: list[str] = []
for coin in coins:
if not coin or not isinstance(coin, str):
raise ValueError("each coin must be a non-empty string")
coin_clean = coin.strip()
if not coin_clean:
raise ValueError("each coin must be a non-empty string")
if coin_clean not in normalized_coins:
normalized_coins.append(coin_clean)

if not interval or not isinstance(interval, str):
raise ValueError("interval must be a non-empty string")
interval = interval.strip()
if not interval:
raise ValueError("interval must be a non-empty string")

if days is not None and (start_time is not None or end_time is not None):
raise ValueError(
"days cannot be used together with start_time or end_time"
)

current_time_ms = int(time.time() * 1000)

if days is not None:
if not isinstance(days, int) or days <= 0:
raise ValueError("days must be a positive integer")
effective_end = current_time_ms if end_time is None else int(end_time)
effective_start = effective_end - (days * 24 * 60 * 60 * 1000)
else:
if start_time is None:
raise ValueError("start_time is required when days is not provided")
effective_start = int(start_time)
effective_end = (
int(end_time) if end_time is not None else current_time_ms
)

if effective_start >= effective_end:
raise ValueError("start_time must be less than end_time")

candles_by_coin: dict[str, list[dict[str, Any]]] = {}
coin_errors: dict[str, str] = {}

for coin in normalized_coins:
try:
raw_candles = self.info.candles_snapshot(
coin,
interval,
effective_start,
effective_end,
)

formatted_candles: list[dict[str, Any]] = []
for candle in raw_candles or []:
timestamp = candle.get("t") or candle.get("T")
if timestamp is None:
# Skip malformed entries without timestamp
continue

try:
formatted_candles.append(
{
"timestamp": int(timestamp),
"open": float(candle["o"]),
"high": float(candle["h"]),
"low": float(candle["l"]),
"close": float(candle["c"]),
"volume": float(candle["v"]),
"trade_count": int(candle.get("n", 0)),
}
)
except (KeyError, TypeError, ValueError) as format_error:
self.logger.warning(
"Skipping malformed candle for %s: %s",
coin,
format_error,
)

formatted_candles.sort(key=lambda item: item["timestamp"])
candles_by_coin[coin] = formatted_candles
except Exception as coin_error:
coin_errors[coin] = str(coin_error)
self.logger.error(
"Failed to fetch candles for %s: %s", coin, coin_error
)

if not candles_by_coin:
return {
"success": False,
"error": "Failed to fetch candle data for requested coins",
"coin_errors": coin_errors,
}

response: dict[str, Any] = {
"success": True,
"data": candles_by_coin,
"interval": interval,
"start_time": effective_start,
"end_time": effective_end,
}

if coin_errors:
response["coin_errors"] = coin_errors

return response
except Exception as e:
self.logger.error("Failed to get candles snapshot bulk: %s", e)
return {"success": False, "error": str(e)}

async def update_leverage(
self, coin: str, leverage: int, is_cross: bool = True
) -> dict[str, Any]:
Expand Down
Loading