-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstart.py
229 lines (184 loc) · 7.74 KB
/
start.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import os
import sys
import signal
import time
import threading
import pygame
import ctypes
from pydispatch import dispatcher
from web_io import WebServer
from system_info import SystemInfo
from gpio import GPIO
from animatronic_movements import Movement
from gamepad_input import USBGamepadReader
from show_player import ShowPlayer
from voice_input_processor import VoiceInputProcessor
from voice_event_handler import VoiceEventHandler
from wifi_management import WifiManagement
class Pasqually:
def __init__(self):
self.isRunning = True
# Initialize pygame for managing audio playback
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=2048)
pygame.display.init()
pygame.display.set_mode((1, 1))
self.voiceEvent = {
'id': None,
'value': None,
}
self.wifiAccessPoints = None
# Initialize components
self.gpio = GPIO()
self.movements = Movement(self.gpio)
self.webServer = WebServer()
self.wifiManagement = WifiManagement()
self.systemInfo = SystemInfo()
self.gamepad = USBGamepadReader(self.movements, self.webServer)
self.showPlayer = ShowPlayer(pygame)
self.voiceInputProcessor = VoiceInputProcessor(pygame)
self.voiceEventHandler = VoiceEventHandler(pygame, self.voiceInputProcessor)
self.setDispatchEvents()
# Handle SIGINT and SIGTERM for graceful shutdown
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
self.movements.setDefaultAnimation(True)
def setDispatchEvents(self):
dispatcher.connect(self.onKeyEvent, signal='keyEvent', sender=dispatcher.Any)
dispatcher.connect(self.onSystemInfoUpdate, signal='systemInfoUpdate', sender=dispatcher.Any)
dispatcher.connect(self.onVoiceInputEvent, signal='voiceInputEvent', sender=dispatcher.Any)
dispatcher.connect(self.onMirroredModeToggle, signal='mirrorModeToggle', sender=dispatcher.Any)
dispatcher.connect(self.onConnectEvent, signal='connectEvent', sender=dispatcher.Any)
dispatcher.connect(self.onShowListLoad, signal='showListLoad', sender=dispatcher.Any)
dispatcher.connect(self.onShowPlay, signal='showPlay', sender=dispatcher.Any)
dispatcher.connect(self.onShowPause, signal='showPause', sender=dispatcher.Any)
dispatcher.connect(self.onShowStop, signal='showStop', sender=dispatcher.Any)
dispatcher.connect(self.onShowEnd, signal='showEnd', sender=dispatcher.Any)
dispatcher.connect(self.onMirroredMode, signal='onMirroredMode', sender=dispatcher.Any)
dispatcher.connect(self.onRetroMode, signal='onRetroMode', sender=dispatcher.Any)
dispatcher.connect(self.onHeadNodInverted, signal='onHeadNodInverted', sender=dispatcher.Any)
dispatcher.connect(self.onShowPlaybackMidiEvent, signal='showPlaybackMidiEvent', sender=dispatcher.Any)
dispatcher.connect(self.onActivateWifiHotspot, signal='activateWifiHotspot', sender=dispatcher.Any)
dispatcher.connect(self.onConnectToWifiNetwork, signal='connectToWifi', sender=dispatcher.Any)
dispatcher.connect(self.onWebTTSEvent, signal='webTTSEvent', sender=dispatcher.Any)
def run(self):
try:
while self.isRunning:
if self.voiceEvent['id'] is not None:
self.webServer.broadcast('voiceCommandUpdate', self.voiceEvent)
self.voiceEvent['id'] = None
self.voiceEvent['value'] = None
# Broadcast a new wifi scan result if it has changed.
if self.wifiManagement.get_wifi_access_points() != self.wifiAccessPoints:
self.wifiAccessPoints = self.wifiManagement.get_wifi_access_points()
self.webServer.broadcast('wifiScan', self.wifiAccessPoints)
time.sleep(0.005)
except Exception as e:
print(f"Error in main loop: {e}")
finally:
print("Main loop exiting, calling shutdown...")
self.shutdown()
def shutdown(self, *args):
try:
self.isRunning = False # Signal all loops to stop
# Stop all dependent components
if self.voiceInputProcessor:
self.voiceInputProcessor.shutdown()
if self.webServer:
self.webServer.shutdown()
if self.showPlayer:
self.showPlayer.stopShow()
# Ensure all non-main threads exit before quitting pygame
for thread in threading.enumerate():
if thread is not threading.main_thread():
# If thread is still alive, force kill it
if thread.is_alive():
try:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(SystemExit)
)
except Exception as e:
print(f"Error stopping thread {thread.name}: {e}")
pygame.mixer.quit()
pygame.display.quit()
pygame.quit()
print("Shutdown complete. Exiting.")
sys.exit(0)
except Exception as e:
print(f"Error during shutdown: {e}")
sys.exit(1)
def onSystemInfoUpdate(self):
self.webServer.broadcast('systemInfo', self.systemInfo.get())
# Event handling methods
def onVoiceInputEvent(self, id, value=None):
self.voiceEvent['id'] = id
self.voiceEvent['value'] = value
# Play various animations to show Pasqually is listening and processing voice commands.
if id == "idle" or id == "ttsComplete":
# Don't do any animations while he's not doing any voice processing.
self.movements.stopAllAnimationThreads()
elif id == "wakeWord":
# Twirls his mustache a bit to demonstrate wakeword acknowledgement.
self.movements.playWakewordAcknowledgement()
self.showPlayer.stopShow() # Stop any playing shows
elif id == "transcribing":
# Start random blinking animation.
self.movements.playBlinkAnimation()
elif id == "command" or id == "ttsSubmitted":
# Add some eye left/right movement animation.
self.movements.playEyeLeftRightAnimation()
self.movements.playBlinkAnimation()
self.voiceEventHandler.triggerEvent(id, value)
def onShowListLoad(self, showList):
self.webServer.broadcast('showListLoaded', showList)
def onShowPlay(self, showName):
self.showPlayer.loadShow(showName)
self.movements.setDefaultAnimation(False)
def onShowStop(self):
self.showPlayer.stopShow()
self.movements.setDefaultAnimation(True)
def onShowEnd(self):
self.movements.setDefaultAnimation(True)
def onShowPause(self):
self.showPlayer.togglePause()
def onShowPlaybackMidiEvent(self, midiNote, value):
self.movements.executeMidiNote(midiNote, value)
def onConnectEvent(self, client_ip):
print(f"Web client connected from IP: {client_ip}")
# Tell the web frontend what the current voice command status is.
command = self.voiceInputProcessor.getLastVoiceCommand()
self.voiceEvent['id'] = command['id']
self.voiceEvent['value'] = command['value']
self.onSystemInfoUpdate()
self.showPlayer.getShowList()
self.webServer.broadcast('movementInfo', self.movements.getAllMovementInfo())
self.webServer.broadcast('wifiScan', self.wifiAccessPoints)
self.wifiManagement.scan_wifi_access_points()
def onKeyEvent(self, key, val):
# Receive key events from the HTML front end and execute any specified movement
try:
self.movements.executeMovement(str(key).lower(), val)
except Exception as e:
print(f"Invalid key: {e}")
def onRetroMode(self, val):
self.movements.setRetroMode(val)
def onHeadNodInverted(self, val):
self.gamepad.headNodInverted = val
def onMirroredMode(self, val):
self.movements.setMirrored(val)
def onMirroredModeToggle(self):
# Toggle animation mirrored mode (swapping left and right movements)
bNewMirrorMode = not self.movements.bMirrored
self.movements.setMirrored(bNewMirrorMode)
def onActivateWifiHotspot(self, bActivate):
if bActivate:
self.wifiManagement.activate_hotspot()
elif bActivate == False and self.wifiManagement.is_hotspot_active():
self.wifiManagement.deactivate_hotspot_and_reconnect()
def onConnectToWifiNetwork(self, ssid, password=None):
self.wifiManagement.connect_to_wifi(ssid, password)
def onWebTTSEvent(self, val):
print(val)
self.voiceInputProcessor.generate_and_play_tts(val)
if __name__ == "__main__":
animatronic = Pasqually()
animatronic.run()