|
1 |
| -from typing import Dict, List, Optional, Any |
2 | 1 | from fastmcp import FastMCP
|
3 |
| - |
4 |
| - |
5 |
| -class AudioDevice: |
6 |
| - def __init__(self, device_id: str, name: str, is_input: bool): |
7 |
| - self.device_id = device_id |
8 |
| - self.name = name |
9 |
| - self.is_input = is_input |
| 2 | +import pyaudio |
| 3 | +import wave |
| 4 | +from typing import Dict, List, Optional |
| 5 | +from pydantic import Field |
| 6 | +from typing import Annotated |
| 7 | +import tempfile |
| 8 | +import datetime |
| 9 | +import os |
10 | 10 |
|
11 | 11 |
|
12 | 12 | def register_tools(app: FastMCP) -> None:
|
13 | 13 | @app.tool(
|
14 | 14 | name="list_audio_devices",
|
15 |
| - description="List all audio devices connected to the system", |
| 15 | + description="List all available audio input and output devices", |
| 16 | + tags=["audio"], |
16 | 17 | )
|
17 |
| - async def list_audio_devices( |
18 |
| - input_only: bool = False, output_only: bool = False |
19 |
| - ) -> List[Dict[str, Any]]: |
20 |
| - devices = [] |
21 |
| - |
22 |
| - if not output_only: |
23 |
| - devices.extend( |
24 |
| - [ |
25 |
| - { |
26 |
| - "device_id": "mic0", |
27 |
| - "name": "Built-in Microphone", |
28 |
| - "type": "input", |
29 |
| - }, |
30 |
| - {"device_id": "mic1", "name": "USB Microphone", "type": "input"}, |
31 |
| - ] |
32 |
| - ) |
| 18 | + def list_audio_devices() -> Dict[str, List[Dict[str, any]]]: |
| 19 | + p = pyaudio.PyAudio() |
| 20 | + |
| 21 | + try: |
| 22 | + input_devices = [] |
| 23 | + output_devices = [] |
| 24 | + |
| 25 | + for i in range(p.get_device_count()): |
| 26 | + device_info = p.get_device_info_by_index(i) |
| 27 | + device_data = { |
| 28 | + "index": i, |
| 29 | + "name": device_info["name"], |
| 30 | + "max_input_channels": device_info["maxInputChannels"], |
| 31 | + "max_output_channels": device_info["maxOutputChannels"], |
| 32 | + "default_sample_rate": device_info["defaultSampleRate"], |
| 33 | + "host_api": p.get_host_api_info_by_index(device_info["hostApi"])[ |
| 34 | + "name" |
| 35 | + ], |
| 36 | + } |
| 37 | + |
| 38 | + if device_info["maxInputChannels"] > 0: |
| 39 | + input_devices.append(device_data) |
| 40 | + |
| 41 | + if device_info["maxOutputChannels"] > 0: |
| 42 | + output_devices.append(device_data) |
| 43 | + |
| 44 | + return {"input_devices": input_devices, "output_devices": output_devices} |
| 45 | + finally: |
| 46 | + p.terminate() |
| 47 | + |
| 48 | + @app.tool( |
| 49 | + name="record_audio", |
| 50 | + description="Record audio from the microphone and save to a file", |
| 51 | + tags=["audio"], |
| 52 | + ) |
| 53 | + def record_audio( |
| 54 | + duration: Annotated[ |
| 55 | + float, Field(default=5.0, description="Recording duration in seconds") |
| 56 | + ], |
| 57 | + sample_rate: Annotated[ |
| 58 | + Optional[int], Field(default=44100, description="Sample rate in Hz") |
| 59 | + ] = 44100, |
| 60 | + channels: Annotated[ |
| 61 | + Optional[int], Field(default=1, description="Number of audio channels") |
| 62 | + ] = 1, |
| 63 | + output_file: Annotated[ |
| 64 | + Optional[str], Field(description="Output file path for the recorded audio") |
| 65 | + ] = None, |
| 66 | + device_index: Annotated[ |
| 67 | + Optional[int], |
| 68 | + Field( |
| 69 | + default=None, description="Audio input device index (None for default)" |
| 70 | + ), |
| 71 | + ] = None, |
| 72 | + ) -> Dict[str, any]: |
| 73 | + chunk = 1024 |
| 74 | + format = pyaudio.paInt16 |
| 75 | + |
| 76 | + if output_file is None: |
| 77 | + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| 78 | + filename = f"recording_{timestamp}.wav" |
| 79 | + output_file = os.path.join(tempfile.gettempdir(), filename) |
| 80 | + |
| 81 | + p = pyaudio.PyAudio() |
| 82 | + |
| 83 | + try: |
| 84 | + device_info = None |
| 85 | + if device_index is not None: |
| 86 | + device_info = p.get_device_info_by_index(device_index) |
| 87 | + if device_info["maxInputChannels"] == 0: |
| 88 | + return { |
| 89 | + "success": False, |
| 90 | + "error": f"Device {device_index} is not an input device", |
| 91 | + } |
33 | 92 |
|
34 |
| - if not input_only: |
35 |
| - devices.extend( |
36 |
| - [ |
37 |
| - { |
38 |
| - "device_id": "spk0", |
39 |
| - "name": "Built-in Speakers", |
40 |
| - "type": "output", |
41 |
| - }, |
42 |
| - { |
43 |
| - "device_id": "spk1", |
44 |
| - "name": "HDMI Audio Output", |
45 |
| - "type": "output", |
46 |
| - }, |
47 |
| - ] |
| 93 | + stream = p.open( |
| 94 | + format=format, |
| 95 | + channels=channels, |
| 96 | + rate=sample_rate, |
| 97 | + input=True, |
| 98 | + frames_per_buffer=chunk, |
| 99 | + input_device_index=device_index, |
48 | 100 | )
|
49 | 101 |
|
50 |
| - return devices |
51 |
| - |
52 |
| - @app.tool(name="record_audio", description="Record audio from an input device") |
53 |
| - async def record_audio( |
54 |
| - device_id: str, |
55 |
| - duration: Optional[int] = None, |
56 |
| - save_path: Optional[str] = None, |
57 |
| - format: str = "mp3", |
58 |
| - quality: str = "medium", |
59 |
| - ) -> Dict[str, Any]: |
60 |
| - return { |
61 |
| - "success": True, |
62 |
| - "recording_id": "audio123456", |
63 |
| - "device_id": device_id, |
64 |
| - "file_path": save_path or f"/tmp/mcp-peripherals/audio_recording.{format}", |
65 |
| - "start_time": "2025-05-23T12:34:56", |
66 |
| - "format": format, |
67 |
| - "quality": quality, |
68 |
| - "max_duration": duration or "unlimited", |
69 |
| - } |
70 |
| - |
71 |
| - @app.tool(name="stop_audio_recording", description="Stop recording audio") |
72 |
| - async def stop_audio_recording(recording_id: str) -> Dict[str, Any]: |
73 |
| - return { |
74 |
| - "success": True, |
75 |
| - "recording_id": recording_id, |
76 |
| - "file_path": "/tmp/mcp-peripherals/audio_recording.mp3", |
77 |
| - "duration": "00:02:34", |
78 |
| - "file_size": "3.2 MB", |
79 |
| - } |
80 |
| - |
81 |
| - @app.tool(name="play_audio", description="Play audio through an output device") |
82 |
| - async def play_audio( |
83 |
| - device_id: str, file_path: str, volume: Optional[int] = 100, loop: bool = False |
84 |
| - ) -> Dict[str, Any]: |
85 |
| - return { |
86 |
| - "success": True, |
87 |
| - "playback_id": "play123456", |
88 |
| - "device_id": device_id, |
89 |
| - "file_path": file_path, |
90 |
| - "duration": "00:03:45", |
91 |
| - "volume": volume, |
92 |
| - "loop": loop, |
93 |
| - } |
94 |
| - |
95 |
| - @app.tool(name="stop_audio_playback", description="Stop playing audio") |
96 |
| - async def stop_audio_playback(playback_id: str) -> Dict[str, Any]: |
97 |
| - return {"success": True, "playback_id": playback_id, "status": "stopped"} |
| 102 | + frames = [] |
| 103 | + total_frames = int(sample_rate / chunk * duration) |
| 104 | + |
| 105 | + for i in range(total_frames): |
| 106 | + data = stream.read(chunk) |
| 107 | + frames.append(data) |
| 108 | + |
| 109 | + stream.stop_stream() |
| 110 | + stream.close() |
| 111 | + |
| 112 | + with wave.open(output_file, "wb") as wf: |
| 113 | + wf.setnchannels(channels) |
| 114 | + wf.setsampwidth(p.get_sample_size(format)) |
| 115 | + wf.setframerate(sample_rate) |
| 116 | + wf.writeframes(b"".join(frames)) |
| 117 | + |
| 118 | + return { |
| 119 | + "success": True, |
| 120 | + "output_file": output_file, |
| 121 | + "duration": duration, |
| 122 | + "sample_rate": sample_rate, |
| 123 | + "channels": channels, |
| 124 | + "device_used": device_info["name"] if device_info else "Default device", |
| 125 | + } |
| 126 | + except Exception as e: |
| 127 | + return {"success": False, "error": str(e)} |
| 128 | + finally: |
| 129 | + p.terminate() |
| 130 | + |
| 131 | + @app.tool( |
| 132 | + name="play_audio", |
| 133 | + description="Play an audio file through the specified output device", |
| 134 | + tags=["audio"], |
| 135 | + ) |
| 136 | + def play_audio( |
| 137 | + file_path: Annotated[str, Field(description="Path to the audio file to play")], |
| 138 | + device_index: Annotated[ |
| 139 | + Optional[int], |
| 140 | + Field( |
| 141 | + default=None, description="Audio output device index (None for default)" |
| 142 | + ), |
| 143 | + ] = None, |
| 144 | + ) -> Dict[str, any]: |
| 145 | + try: |
| 146 | + with wave.open(file_path, "rb") as wf: |
| 147 | + channels = wf.getnchannels() |
| 148 | + sample_width = wf.getsampwidth() |
| 149 | + sample_rate = wf.getframerate() |
| 150 | + frames = wf.getnframes() |
| 151 | + duration = frames / sample_rate |
| 152 | + |
| 153 | + p = pyaudio.PyAudio() |
| 154 | + |
| 155 | + try: |
| 156 | + device_info = None |
| 157 | + if device_index is not None: |
| 158 | + device_info = p.get_device_info_by_index(device_index) |
| 159 | + if device_info["maxOutputChannels"] == 0: |
| 160 | + return { |
| 161 | + "success": False, |
| 162 | + "error": f"Device {device_index} is not an output device", |
| 163 | + } |
| 164 | + |
| 165 | + stream = p.open( |
| 166 | + format=p.get_format_from_width(sample_width), |
| 167 | + channels=channels, |
| 168 | + rate=sample_rate, |
| 169 | + output=True, |
| 170 | + output_device_index=device_index, |
| 171 | + ) |
| 172 | + |
| 173 | + chunk = 1024 |
| 174 | + data = wf.readframes(chunk) |
| 175 | + |
| 176 | + while data: |
| 177 | + stream.write(data) |
| 178 | + data = wf.readframes(chunk) |
| 179 | + |
| 180 | + stream.stop_stream() |
| 181 | + stream.close() |
| 182 | + |
| 183 | + return { |
| 184 | + "success": True, |
| 185 | + "file_played": file_path, |
| 186 | + "duration": duration, |
| 187 | + "sample_rate": sample_rate, |
| 188 | + "channels": channels, |
| 189 | + "device_used": device_info["name"] |
| 190 | + if device_info |
| 191 | + else "Default device", |
| 192 | + } |
| 193 | + finally: |
| 194 | + p.terminate() |
| 195 | + except FileNotFoundError: |
| 196 | + return {"success": False, "error": f"Audio file not found: {file_path}"} |
| 197 | + except Exception as e: |
| 198 | + return {"success": False, "error": str(e)} |
0 commit comments