Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,3 @@ make run-docker
```bash
make push-docker
```
s
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requires-python = ">=3.11"
authors = [{ name = "Groundlight AI" }]
license = { text = "Apache-2.0" }
dependencies = [
"framegrab>=0.11.3",
"groundlight>=0.22.4",
"mcp[cli]>=1.6.0",
"pillow>=10.0.0",
Expand Down
189 changes: 187 additions & 2 deletions src/groundlight_mcp_server/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from contextlib import asynccontextmanager
from functools import cache
from typing import Annotated, Any, Dict, List, Literal, Optional
Expand All @@ -13,6 +14,19 @@
Rule,
VerbEnum,
)

import cv2
from framegrab import FrameGrabber
from framegrab.config import (
BaslerFrameGrabberConfig,
FileStreamFrameGrabberConfig,
GenericUSBFrameGrabberConfig,
HttpLiveStreamingFrameGrabberConfig,
RealSenseFrameGrabberConfig,
RTSPFrameGrabberConfig,
YouTubeLiveFrameGrabberConfig,
)

from mcp.server.fastmcp import FastMCP, Image
from mcp.server.fastmcp.resources import Resource
from pydantic import BaseModel, Field
Expand All @@ -21,6 +35,16 @@

logger = logging.getLogger(__name__)

ENABLE_FRAMEGRAB_AUTO_DISCOVERY = (
os.getenv("ENABLE_FRAMEGRAB_AUTO_DISCOVERY", "false").lower() == "true"
)
FRAMEGRAB_RTSP_AUTO_DISCOVERY_MODE = os.getenv(
"FRAMEGRAB_RTSP_AUTO_DISCOVERY_MODE",
"off", # "off", "ip_only", "light", "complete_fast", "complete_slow"
)

# Cache to store created FrameGrabbers, maps name to FrameGrabber
_grabber_cache = {}

@cache
def get_gl_client() -> Groundlight:
Expand Down Expand Up @@ -157,7 +181,6 @@ def list_detectors() -> list[Detector]:

return all_detectors


@mcp.tool(
name="submit_image_query",
description=(
Expand All @@ -171,6 +194,26 @@ def submit_image_query(detector_id: str, image: str | bytes) -> ImageQuery:
iq = gl.submit_image_query(detector=detector_id, image=img)
return iq

@mcp.tool(
name="capture_and_submit_image_query",
description=(
"Captures an image from a specified framegrabber and then submits that to the specified detector."
"The detector will return a response with a label and confidence score."
),
)
def capture_and_submit_image_query(detector_id: str, framegrabber_name: str) -> ImageQuery:
gl = get_gl_client()

grabber: FrameGrabber = _grabber_cache.get(framegrabber_name)
if not grabber:
raise ValueError(
f"Framegrabber with name {framegrabber_name} not found. Options are: {list(_grabber_cache.keys())}."
)

frame = grabber.grab()

iq = gl.submit_image_query(detector=detector_id, image=frame)
return iq

@mcp.tool(
name="get_image_query",
Expand All @@ -180,7 +223,6 @@ def get_image_query(image_query_id: str) -> ImageQuery:
gl = get_gl_client()
return gl.get_image_query(id=image_query_id)


@mcp.tool(
name="list_image_queries",
description="List all image queries associated with the specified detector. Note that this may return a large number of results.",
Expand Down Expand Up @@ -459,3 +501,146 @@ def read_documentation(path: str) -> str:
else:
raise ValueError(f"Documentation not found: {path}")



@mcp.tool(
name="create_framegrabber",
description="""Create a new framegrabber from a configuration object.
Framegrabbers can be used to capture images from a webcam, a USB camera, an RTSP stream, a youtube live stream, or any other video source supported by the framegrab library.
Returns the name of the created framegrabber.""",
)
def create_framegrabber(
config: YouTubeLiveFrameGrabberConfig
| RTSPFrameGrabberConfig
| GenericUSBFrameGrabberConfig
| FileStreamFrameGrabberConfig
| HttpLiveStreamingFrameGrabberConfig
| RealSenseFrameGrabberConfig
| BaslerFrameGrabberConfig,
) -> str:
try:
# Create the new grabber
grabber = FrameGrabber.create_grabber(config)
_grabber_cache[config.name] = grabber
logger.info(f"Created new framegrabber: {config.name}")
return config.name
except Exception as e:
logger.error(f"Error creating framegrabber: {e}")
raise ValueError(f"Failed to create framegrabber: {str(e)}")


@mcp.tool(
name="grab_frame",
description="Grab a frame from the specified framegrabber and return it as an image in the specified format.",
)
def grab_frame(
framegrabber_name: str, format: Literal["png", "jpg", "webp"] = "webp"
) -> Image:
grabber: FrameGrabber = _grabber_cache.get(framegrabber_name)
if not grabber:
raise ValueError(
f"Framegrabber with name {framegrabber_name} not found. Options are: {list(_grabber_cache.keys())}."
)

frame = grabber.grab()
if format not in ["png", "jpg", "webp"]:
raise ValueError("Format must be one of: png, jpg, webp")

# Convert ndarray to bytes in specified format
if format == "jpg":
encode_params = [cv2.IMWRITE_JPEG_QUALITY, 80]
success, buffer = cv2.imencode(f".{format}", frame, encode_params)
elif format == "png":
# Use compression level 9 (highest) for PNG to reduce size
encode_params = [cv2.IMWRITE_PNG_COMPRESSION, 9]
success, buffer = cv2.imencode(f".{format}", frame, encode_params)
elif format == "webp":
# Use quality 80 for WebP to balance size and quality
encode_params = [cv2.IMWRITE_WEBP_QUALITY, 80]
success, buffer = cv2.imencode(f".{format}", frame, encode_params)
else:
success, buffer = cv2.imencode(f".{format}", frame)

if not success:
raise RuntimeError(f"Failed to encode image as {format.upper()}.")

# Create MCP Image object from the encoded bytes
return Image(data=buffer.tobytes(), format=format)


@mcp.tool(
name="list_framegrabbers",
description="List all available framegrabbers by name, sorted alphanumerically.",
)
def list_framegrabbers() -> list[str]:
return sorted(list(_grabber_cache.keys()))


@mcp.tool(
name="get_framegrabber_config",
description="Retrieve the configuration of a specific framegrabber.",
)
def get_framegrabber_config(framegrabber_name: str) -> dict:
grabber: FrameGrabber = _grabber_cache.get(framegrabber_name)
if not grabber:
raise ValueError(
f"Framegrabber with name {framegrabber_name} not found. Options are: {list(_grabber_cache.keys())}."
)
return grabber.config


@mcp.tool(
name="set_config",
description="Update the configuration options for a specific framegrabber.",
)
def set_framegrabber_config(framegrabber_name: str, options: dict) -> dict:
grabber: FrameGrabber = _grabber_cache.get(framegrabber_name)
if not grabber:
raise ValueError(
f"Framegrabber with name {framegrabber_name} not found. Options are: {list(_grabber_cache.keys())}."
)

try:
# Update the framegrabber's configuration with the new options
grabber.apply_options(options)
logger.info(f"Updated configuration for framegrabber '{framegrabber_name}'")
return grabber.config
except Exception as e:
logger.error(f"Error applying options to {framegrabber_name}: {e}")
raise ValueError(f"Failed to apply options to framegrabber: {str(e)}")


@mcp.tool(
name="release_grabber",
description="Release a framegrabber and remove it from the available grabbers.",
)
def release_framegrabber(framegrabber_name: str) -> bool:
"""
Release a framegrabber's resources and remove it from the available grabbers.

Returns True if successful, raises an exception otherwise.
"""
grabber: FrameGrabber = _grabber_cache.get(framegrabber_name)
if not grabber:
raise ValueError(
f"Framegrabber with name {framegrabber_name} not found. Options are: {list(_grabber_cache.keys())}."
)

try:
grabber.release()
del _grabber_cache[framegrabber_name]
logger.info(f"Released framegrabber: {framegrabber_name}")
return True
except Exception as e:
logger.error(f"Error releasing framegrabber {framegrabber_name}: {e}")
raise ValueError(f"Failed to release framegrabber: {str(e)}")


@mcp.resource(
uri="fg://framegrabbers",
name="framegrabbers",
description="Lists all available framegrabbers by name, sorted alphanumerically.",
mime_type="application/json",
)
def framegrabbers() -> list[str]:
return list_framegrabbers()
Loading