Skip to content

Commit c5b8910

Browse files
committed
Add screenshot and screen record functionality
1 parent 638cdce commit c5b8910

File tree

7 files changed

+363
-78
lines changed

7 files changed

+363
-78
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ wheels/
88

99
# Virtual environments
1010
.venv
11+
12+
# Vscode
13+
.vscode/

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.10"
77
dependencies = [
88
"fastmcp>=2.4.0",
9+
"mss>=10.0.0",
10+
"python-ffmpeg>=2.0.12",
11+
"screeninfo>=0.8.1",
912
]

src/devices/preview.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from fastmcp import FastMCP
2+
3+
4+
def register_tools(app: FastMCP) -> None:
5+
pass

src/devices/printer.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import subprocess
66
import re
77
import tempfile
8+
import os
89

910

1011
class Printer:
@@ -103,6 +104,42 @@ async def print_file(
103104
finally:
104105
temp_file.close()
105106

107+
@app.tool(
108+
name="print_as_pdf",
109+
description="Print a file as PDF to a specified device location",
110+
tags=["printer"],
111+
)
112+
async def print_as_pdf(
113+
file_data: Annotated[
114+
bytes, Field(description="Binary data of the PDF file to be saved")
115+
],
116+
file_format: Annotated[str, Field(description="File format (must be 'pdf')")],
117+
output_path: Annotated[
118+
str, Field(description="Full path where the PDF should be saved")
119+
],
120+
) -> Dict[str, Any]:
121+
if file_format.lower() != "pdf":
122+
return {"success": False, "error": "File format must be 'pdf'"}
123+
124+
try:
125+
output_dir = os.path.dirname(output_path)
126+
if output_dir and not os.path.exists(output_dir):
127+
os.makedirs(output_dir, exist_ok=True)
128+
129+
with open(output_path, "wb") as pdf_file:
130+
pdf_file.write(file_data)
131+
132+
return {
133+
"success": True,
134+
"output_path": output_path,
135+
"file_size": len(file_data),
136+
}
137+
138+
except OSError as e:
139+
return {"success": False, "error": f"Failed to save PDF: {str(e)}"}
140+
except Exception as e:
141+
return {"success": False, "error": f"Unexpected error: {str(e)}"}
142+
106143
@app.tool(
107144
name="get_print_job",
108145
description="Get information about a print job",

src/devices/screen.py

Lines changed: 196 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,209 @@
11
from typing import Dict, List, Optional, Any
22
from fastmcp import FastMCP
3-
4-
5-
class Screen:
6-
def __init__(self, device_id: str, name: str):
7-
self.device_id = device_id
8-
self.name = name
3+
from screeninfo import get_monitors
4+
from datetime import datetime
5+
from mss import mss
6+
import tempfile
7+
import os
8+
import time
9+
from typing import Annotated
10+
from pydantic import Field
11+
from ffmpeg.asyncio import FFmpeg
12+
import shutil
913

1014

1115
def register_tools(app: FastMCP) -> None:
1216
@app.tool(
13-
name="list_displays", description="List all displays connected to the system"
17+
name="list_displays",
18+
description="List all displays connected to the system",
19+
tags=["screen"],
1420
)
1521
async def list_displays() -> List[Dict[str, Any]]:
16-
displays = [
17-
{
18-
"device_id": "display0",
19-
"name": "Built-in Retina Display",
20-
"resolution": "2560x1600",
21-
"is_primary": True,
22-
},
23-
{
24-
"device_id": "display1",
25-
"name": "External HDMI Monitor",
26-
"resolution": "3840x2160",
27-
"is_primary": False,
28-
},
29-
]
22+
displays = []
23+
monitors = get_monitors()
24+
25+
for i, monitor in enumerate(monitors):
26+
displays.append(
27+
{
28+
"device_id": f"display{i}",
29+
"name": f"Display {i}",
30+
"resolution": f"{monitor.width}x{monitor.height}",
31+
"is_primary": monitor.is_primary,
32+
"x": monitor.x,
33+
"y": monitor.y,
34+
}
35+
)
36+
3037
return displays
3138

32-
@app.tool(name="capture_screen", description="Capture a screenshot from a display")
33-
async def capture_screen(
34-
device_id: Optional[str] = None,
35-
save_path: Optional[str] = None,
36-
region: Optional[str] = None,
39+
@app.tool(
40+
name="capture_screenshot",
41+
description="Capture a screenshot from a display",
42+
tags=["screen"],
43+
)
44+
async def capture_screenshot(
45+
device_id: Annotated[
46+
Optional[str],
47+
Field(
48+
description="The display identifier in format 'displayN' where N is the display index (e.g., 'display0', 'display1')",
49+
default="display0",
50+
),
51+
] = "display0",
52+
save_path: Annotated[
53+
Optional[str],
54+
Field(
55+
description="The file path with file name where the screenshot should be saved. If None, a temporary file with timestamp will be created automatically"
56+
),
57+
] = None,
3758
) -> Dict[str, Any]:
38-
return {
39-
"success": True,
40-
"file_path": save_path or "/tmp/mcp-peripherals/screenshot.png",
41-
"resolution": region or "Full Screen (2560x1600)",
42-
"timestamp": "2025-05-23T12:34:56",
43-
"device_id": device_id or "display0",
44-
}
45-
46-
@app.tool(name="record_screen", description="Start recording a screen")
59+
try:
60+
display_index = 0
61+
if device_id and device_id.startswith("display"):
62+
try:
63+
display_index = int(device_id.replace("display", ""))
64+
except ValueError:
65+
return {
66+
"success": False,
67+
"error": f"Invalid device_id format: {device_id}. Expected format: 'displayN'",
68+
}
69+
70+
monitors = get_monitors()
71+
if not monitors:
72+
return {"success": False, "error": "No displays found"}
73+
74+
if display_index >= len(monitors):
75+
return {
76+
"success": False,
77+
"error": f"Display {device_id} not found. Available displays: 0-{len(monitors) - 1}",
78+
}
79+
80+
with mss() as sct:
81+
if save_path is None:
82+
temp_dir = tempfile.gettempdir()
83+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
84+
save_path = os.path.join(temp_dir, f"screenshot_{timestamp}.png")
85+
else:
86+
if (
87+
os.path.isdir(save_path)
88+
or save_path.endswith("/")
89+
or save_path.endswith("\\")
90+
):
91+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
92+
save_path = os.path.join(
93+
save_path, f"screenshot_{timestamp}.png"
94+
)
95+
else:
96+
if not os.path.splitext(save_path)[1]:
97+
save_path = save_path + ".png"
98+
99+
os.makedirs(os.path.dirname(save_path), exist_ok=True)
100+
101+
sct.shot(mon=display_index + 1, output=save_path)
102+
return {
103+
"success": True,
104+
"file_path": save_path,
105+
}
106+
107+
except Exception as e:
108+
return {"success": False, "error": f"Screenshot capture failed: {str(e)}"}
109+
110+
@app.tool(
111+
name="record_screen", description="Start recording a screen", tags=["screen"]
112+
)
47113
async def record_screen(
48-
device_id: Optional[str] = None,
49-
save_path: Optional[str] = None,
50-
region: Optional[str] = None,
51-
fps: int = 30,
52-
audio: bool = False,
114+
device_id: Annotated[
115+
Optional[str],
116+
Field(
117+
description="The display identifier in format 'displayN' where N is the display index (e.g., 'display0', 'display1')",
118+
default="display0",
119+
),
120+
] = "display0",
121+
save_path: Annotated[
122+
Optional[str],
123+
Field(
124+
description="The file path with or without file name where the video should be saved. If None, a temporary file with timestamp will be created automatically"
125+
),
126+
] = None,
127+
duration: Annotated[
128+
Optional[int],
129+
Field(
130+
description="Duration of the recording in seconds. If None, defaults to 10 seconds",
131+
default=10,
132+
),
133+
] = 10,
134+
fps: Annotated[
135+
Optional[float],
136+
Field(
137+
description="Frames per second for the recording. If None, defaults to 15 fps",
138+
default=15.0,
139+
),
140+
] = 15.0,
53141
) -> Dict[str, Any]:
54-
return {
55-
"success": True,
56-
"recording_id": "screen123456",
57-
"file_path": save_path or "/tmp/mcp-peripherals/screen_recording.mp4",
58-
"start_time": "2025-05-23T12:34:56",
59-
"device_id": device_id or "display0",
60-
"region": region or "Full Screen",
61-
"fps": fps,
62-
"with_audio": audio,
63-
}
64-
65-
@app.tool(name="stop_screen_recording", description="Stop recording a screen")
66-
async def stop_screen_recording(recording_id: str) -> Dict[str, Any]:
67-
return {
68-
"success": True,
69-
"recording_id": recording_id,
70-
"file_path": "/tmp/mcp-peripherals/screen_recording.mp4",
71-
"duration": "00:02:34",
72-
"file_size": "42.7 MB",
73-
"resolution": "2560x1600",
74-
}
142+
try:
143+
display_index = 0
144+
if device_id and device_id.startswith("display"):
145+
try:
146+
display_index = int(device_id.replace("display", ""))
147+
except ValueError:
148+
return {
149+
"success": False,
150+
"error": f"Invalid device_id format: {device_id}. Expected format: 'displayN'",
151+
}
152+
153+
monitors = get_monitors()
154+
if not monitors:
155+
return {"success": False, "error": "No displays found"}
156+
157+
if display_index >= len(monitors):
158+
return {
159+
"success": False,
160+
"error": f"Display {device_id} not found. Available displays: 0-{len(monitors) - 1}",
161+
}
162+
163+
if save_path is None:
164+
temp_dir = tempfile.gettempdir()
165+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
166+
save_path = os.path.join(temp_dir, f"screen_recording_{timestamp}.mp4")
167+
else:
168+
if (
169+
os.path.isdir(save_path)
170+
or save_path.endswith("/")
171+
or save_path.endswith("\\")
172+
):
173+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
174+
save_path = os.path.join(
175+
save_path, f"screen_recording_{timestamp}.mp4"
176+
)
177+
else:
178+
if not os.path.splitext(save_path)[1]:
179+
save_path = save_path + ".mp4"
180+
181+
os.makedirs(os.path.dirname(save_path), exist_ok=True)
182+
183+
frames_dir = tempfile.mkdtemp()
184+
185+
with mss() as sct:
186+
start_time = time.time()
187+
frame_count = 0
188+
189+
while time.time() - start_time < duration:
190+
sct.shot(
191+
mon=display_index + 1,
192+
output=f"{frames_dir}/frame_{frame_count:06d}.png",
193+
)
194+
195+
frame_count += 1
196+
197+
ffmpeg = (
198+
FFmpeg()
199+
.option("y")
200+
.input(f"{frames_dir}/frame_%06d.png", framerate=fps)
201+
.output(save_path, vcodec="libx264", pix_fmt="yuv420p")
202+
)
203+
204+
await ffmpeg.execute()
205+
shutil.rmtree(frames_dir)
206+
return {"success": True, "file_path": save_path}
207+
208+
except Exception as e:
209+
return {"success": False, "error": f"Screen recording failed: {str(e)}"}

src/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from server import create_app
22
from config import get_settings
3+
import asyncio
34

45

56
def main():
@@ -9,4 +10,4 @@ def main():
910

1011

1112
if __name__ == "__main__":
12-
main()
13+
asyncio.run(main())

0 commit comments

Comments
 (0)