Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 118 additions & 22 deletions server/api_server/api_server_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from flask import Flask, request, jsonify, Response
from models.device_instance import DeviceInstance # noqa: E402
from flask_cors import CORS
from werkzeug.exceptions import HTTPException

# Register NetAlertX directories
INSTALL_PATH = os.getenv("NETALERTX_APP", "/app")
Expand Down Expand Up @@ -59,7 +60,8 @@
mcp_sse,
mcp_messages,
openapi_spec,
) # noqa: E402 [flake8 lint suppression]
get_openapi_spec,
)
# validation and schemas for MCP v2
from .openapi.validation import validate_request # noqa: E402 [flake8 lint suppression]
from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
Expand Down Expand Up @@ -100,6 +102,20 @@
app = Flask(__name__)


@app.errorhandler(500)
@app.errorhandler(Exception)
def handle_500_error(e):
"""Global error handler for uncaught exceptions."""
if isinstance(e, HTTPException):
return e
mylog("none", [f"[API] Uncaught exception: {e}"])
return jsonify({
"success": False,
"error": "Internal Server Error",
"message": "Something went wrong on the server"
}), 500


# Parse CORS origins from environment or use safe defaults
_cors_origins_env = os.environ.get("CORS_ORIGINS", "")
_cors_origins = [
Expand Down Expand Up @@ -599,7 +615,7 @@ def api_device_open_ports(payload=None):
@validate_request(
operation_id="get_all_devices",
summary="Get All Devices",
description="Retrieve a list of all devices in the system.",
description="Retrieve a list of all devices in the system. Returns all records. No pagination supported.",
response_model=DeviceListWrapperResponse,
tags=["devices"],
auth_callable=is_authorized
Expand Down Expand Up @@ -662,7 +678,7 @@ def api_delete_unknown_devices(payload=None):
@app.route("/devices/export", methods=["GET"])
@app.route("/devices/export/<format>", methods=["GET"])
@validate_request(
operation_id="export_devices",
operation_id="export_devices_all",
summary="Export Devices",
description="Export all devices in CSV or JSON format.",
query_params=[{
Expand All @@ -679,7 +695,8 @@ def api_delete_unknown_devices(payload=None):
}],
response_model=DeviceExportResponse,
tags=["devices"],
auth_callable=is_authorized
auth_callable=is_authorized,
response_content_types=["application/json", "text/csv"]
)
def api_export_devices(format=None, payload=None):
export_format = (format or request.args.get("format", "csv")).lower()
Expand Down Expand Up @@ -747,7 +764,7 @@ def api_devices_totals(payload=None):
@app.route('/mcp/sse/devices/by-status', methods=['GET', 'POST'])
@app.route("/devices/by-status", methods=["GET", "POST"])
@validate_request(
operation_id="list_devices_by_status",
operation_id="list_devices_by_status_api",
summary="List Devices by Status",
description="List devices filtered by their online/offline status.",
request_model=DeviceListRequest,
Expand All @@ -763,7 +780,30 @@ def api_devices_totals(payload=None):
"connected", "down", "favorites", "new", "archived", "all", "my",
"offline"
]}
}]
}],
links={
"GetOpenPorts": {
"operationId": "get_open_ports",
"parameters": {
"target": "$response.body#/0/devLastIP"
},
"description": "The `target` parameter for `get_open_ports` requires an IP address. Use the `devLastIP` from the first device in the list."
},
"WakeOnLan": {
"operationId": "wake_on_lan",
"parameters": {
"devMac": "$response.body#/0/devMac"
},
"description": "The `devMac` parameter for `wake_on_lan` requires a MAC address. Use the `devMac` from the first device in the list."
},
"UpdateDevice": {
"operationId": "update_device",
"parameters": {
"mac": "$response.body#/0/devMac"
},
"description": "The `mac` parameter for `update_device` is a path parameter. Use the `devMac` from the first device in the list."
}
}
)
def api_devices_by_status(payload: DeviceListRequest = None):
status = payload.status if payload else request.args.get("status")
Expand All @@ -774,13 +814,43 @@ def api_devices_by_status(payload: DeviceListRequest = None):
@app.route('/mcp/sse/devices/search', methods=['POST'])
@app.route('/devices/search', methods=['POST'])
@validate_request(
operation_id="search_devices",
operation_id="search_devices_api",
summary="Search Devices",
description="Search for devices based on various criteria like name, IP, MAC, or vendor.",
request_model=DeviceSearchRequest,
response_model=DeviceSearchResponse,
tags=["devices"],
auth_callable=is_authorized
auth_callable=is_authorized,
links={
"GetOpenPorts": {
"operationId": "get_open_ports",
"parameters": {
"target": "$response.body#/devices/0/devLastIP"
},
"description": "The `target` parameter for `get_open_ports` requires an IP address. Use the `devLastIP` from the first device in the search results."
},
"WakeOnLan": {
"operationId": "wake_on_lan",
"parameters": {
"devMac": "$response.body#/devices/0/devMac"
},
"description": "The `devMac` parameter for `wake_on_lan` requires a MAC address. Use the `devMac` from the first device in the search results."
},
"NmapScan": {
"operationId": "run_nmap_scan",
"parameters": {
"scan": "$response.body#/devices/0/devLastIP"
},
"description": "The `scan` parameter for `run_nmap_scan` requires an IP or range. Use the `devLastIP` from the first device in the search results."
},
"UpdateDevice": {
"operationId": "update_device",
"parameters": {
"mac": "$response.body#/devices/0/devMac"
},
"description": "The `mac` parameter for `update_device` is a path parameter. Use the `devMac` from the first device in the search results."
}
}
)
def api_devices_search(payload=None):
"""Device search: accepts 'query' in JSON and maps to device info/search."""
Expand Down Expand Up @@ -884,9 +954,13 @@ def api_devices_network_topology(payload=None):
auth_callable=is_authorized
)
def api_wakeonlan(payload=None):
data = request.get_json(silent=True) or {}
mac = data.get("devMac")
ip = data.get("devLastIP") or data.get('ip')
if payload:
mac = payload.mac
ip = payload.devLastIP
else:
data = request.get_json(silent=True) or {}
mac = data.get("mac") or data.get("devMac")
ip = data.get("devLastIP") or data.get('ip')

if not mac and ip:

Expand Down Expand Up @@ -1011,7 +1085,7 @@ def api_network_interfaces(payload=None):


@app.route('/mcp/sse/nettools/trigger-scan', methods=['POST'])
@app.route("/nettools/trigger-scan", methods=["GET"])
@app.route("/nettools/trigger-scan", methods=["GET", "POST"])
@validate_request(
operation_id="trigger_network_scan",
summary="Trigger Network Scan",
Expand Down Expand Up @@ -1300,13 +1374,25 @@ def api_create_event(mac, payload=None):

@app.route("/events/<mac>", methods=["DELETE"])
@validate_request(
operation_id="delete_events_by_mac",
summary="Delete Events by MAC",
description="Delete all events for a specific device MAC address.",
operation_id="delete_events",
summary="Delete Events",
description="Delete events by device MAC address or older than a specified number of days.",
path_params=[{
"name": "mac",
"description": "Device MAC address",
"schema": {"type": "string"}
"description": "Device MAC address or number of days",
"schema": {
"oneOf": [
{
"type": "integer",
"description": "Number of days (e.g., 30) to delete events older than this value."
},
{
"type": "string",
"pattern": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$",
"description": "Device MAC address to delete all events for a specific device."
}
]
}
}],
response_model=BaseResponse,
tags=["events"],
Expand All @@ -1315,6 +1401,7 @@ def api_create_event(mac, payload=None):
def api_events_by_mac(mac, payload=None):
"""Delete events for a specific device MAC; string converter keeps this distinct from /events/<int:days>."""
device_handler = DeviceInstance()

result = device_handler.deleteDeviceEvents(mac)
return jsonify(result)

Expand All @@ -1338,7 +1425,7 @@ def api_delete_all_events(payload=None):
@validate_request(
operation_id="get_all_events",
summary="Get Events",
description="Retrieve a list of events, optionally filtered by MAC.",
description="Retrieve a list of events, optionally filtered by MAC. Returns all matching records. No pagination supported.",
query_params=[{
"name": "mac",
"description": "Filter by Device MAC",
Expand Down Expand Up @@ -1372,7 +1459,8 @@ def api_get_events(payload=None):
}],
response_model=BaseResponse,
tags=["events"],
auth_callable=is_authorized
auth_callable=is_authorized,
exclude_from_spec=True
)
def api_delete_old_events(days: int, payload=None):
"""
Expand Down Expand Up @@ -1406,7 +1494,7 @@ def api_get_events_totals(payload=None):


@app.route('/mcp/sse/events/recent', methods=['GET', 'POST'])
@app.route('/events/recent', methods=['GET'])
@app.route('/events/recent', methods=['GET', 'POST'])
@validate_request(
operation_id="get_recent_events",
summary="Get Recent Events",
Expand All @@ -1426,7 +1514,7 @@ def api_events_default_24h(payload=None):


@app.route('/mcp/sse/events/last', methods=['GET', 'POST'])
@app.route('/events/last', methods=['GET'])
@app.route('/events/last', methods=['GET', 'POST'])
@validate_request(
operation_id="get_last_events",
summary="Get Last Events",
Expand Down Expand Up @@ -1763,7 +1851,7 @@ def sync_endpoint_post(payload=None):
@validate_request(
operation_id="check_auth",
summary="Check Authentication",
description="Check if the current API token is valid.",
description="Check if the current API token is valid. Note: tokens must be generated externally via the UI or CLI.",
response_model=BaseResponse,
tags=["auth"],
auth_callable=is_authorized
Expand All @@ -1778,6 +1866,14 @@ def check_auth(payload=None):
# Mount SSE endpoints after is_authorized is defined (avoid circular import)
create_sse_endpoint(app, is_authorized)

# Apply environment-driven MCP disablement by regenerating the OpenAPI spec.
# This populates the registry and applies any operation IDs listed in MCP_DISABLED_TOOLS.
try:
get_openapi_spec(force_refresh=True, flask_app=app)
mylog("verbose", [f"[MCP] Applied MCP_DISABLED_TOOLS: {os.environ.get('MCP_DISABLED_TOOLS', '')}"])
except Exception as e:
mylog("none", [f"[MCP] Error applying MCP_DISABLED_TOOLS: {e}"])


def start_server(graphql_port, app_state):
"""Start the GraphQL server in a background thread."""
Expand Down
71 changes: 63 additions & 8 deletions server/api_server/mcp_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,24 +309,28 @@ def map_openapi_to_mcp_tools(spec: Dict[str, Any]) -> List[Dict[str, Any]]:

This function transforms OpenAPI operations into MCP-compatible tool schemas,
ensuring proper inputSchema derivation from request bodies and parameters.
It deduplicates tools by their original operationId, preferring /mcp/ routes.

Args:
spec: OpenAPI specification dictionary

Returns:
List of MCP tool definitions with name, description, and inputSchema
"""
tools = []
tools_map = {}

if not spec or "paths" not in spec:
return tools
return []

for path, methods in spec["paths"].items():
for method, details in methods.items():
if "operationId" not in details:
continue

operation_id = details["operationId"]
# Deduplicate using the original operationId (before suffixing)
# or the unique operationId as fallback.
original_op_id = details.get("x-original-operationId", operation_id)

# Build inputSchema from requestBody and parameters
input_schema = {
Expand Down Expand Up @@ -382,31 +386,82 @@ def map_openapi_to_mcp_tools(spec: Dict[str, Any]) -> List[Dict[str, Any]]:
tool = {
"name": operation_id,
"description": details.get("description", details.get("summary", "")),
"inputSchema": input_schema
"inputSchema": input_schema,
"_original_op_id": original_op_id,
"_is_mcp": path.startswith("/mcp/"),
"_is_post": method.upper() == "POST"
}

tools.append(tool)

return tools
# Preference logic for deduplication:
# 1. Prefer /mcp/ routes over standard ones.
# 2. Prefer POST methods over GET for the same logic (usually more robust body validation).
existing = tools_map.get(original_op_id)
if not existing:
tools_map[original_op_id] = tool
else:
# Upgrade if current is MCP and existing is not
mcp_upgrade = tool["_is_mcp"] and not existing["_is_mcp"]
# Upgrade if same route type but current is POST and existing is GET
method_upgrade = (tool["_is_mcp"] == existing["_is_mcp"]) and tool["_is_post"] and not existing["_is_post"]

if mcp_upgrade or method_upgrade:
tools_map[original_op_id] = tool

# Final cleanup: remove internal preference flags and ensure tools have the original names
# unless we explicitly want the suffixed ones.
# The user said "Eliminate Duplicate Tool Names", so we should use original_op_id as the tool name.
final_tools = []
_tool_name_to_operation_id: Dict[str, str] = {}
for tool in tools_map.values():
actual_operation_id = tool["name"] # Save before overwriting
tool["name"] = tool["_original_op_id"]
_tool_name_to_operation_id[tool["name"]] = actual_operation_id
del tool["_original_op_id"]
del tool["_is_mcp"]
del tool["_is_post"]
final_tools.append(tool)

return final_tools


def find_route_for_tool(tool_name: str) -> Optional[Dict[str, Any]]:
"""
Find the registered route for a given tool name (operationId).
Handles exact matches and deduplicated original IDs.

Args:
tool_name: The operationId to look up
tool_name: The operationId or original_operation_id to look up

Returns:
Route dictionary with path, method, and models, or None if not found
"""
registry = get_registry()
candidates = []

for entry in registry:
# Exact match (priority) - if the client passed the specific suffixed ID
if entry["operation_id"] == tool_name:
return entry
if entry.get("original_operation_id") == tool_name:
candidates.append(entry)

return None
if not candidates:
return None

# Apply same preference logic as map_openapi_to_mcp_tools to ensure we pick the
# same route definition that generated the tool schema.

# Priority 1: MCP routes (they have specialized paths/behavior)
mcp_candidates = [c for c in candidates if c["path"].startswith("/mcp/")]
pool = mcp_candidates if mcp_candidates else candidates

# Priority 2: POST methods (usually preferred for tools)
post_candidates = [c for c in pool if c["method"].upper() == "POST"]
if post_candidates:
return post_candidates[0]

# Fallback: return the first from the best pool available
return pool[0]


# =============================================================================
Expand Down
Loading