-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathwebcam.py
71 lines (59 loc) · 2.29 KB
/
webcam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import socketio
from MicroPie import App
# Create the Socket.IO server
sio = socketio.AsyncServer(async_mode="asgi")
# Track active users and their watchers/streamers
active_users = set()
# MicroPie Server with integrated Socket.IO
class MyApp(App):
async def index(self):
return await self.render_template("index_stream.html")
async def submit(self, username: str, action: str):
if username:
active_users.add(username)
route = f"/stream/{username}" if action == "Start Streaming" else f"/watch/{username}"
return self.redirect(route)
return self.redirect("/")
async def stream(self, username: str):
return await self.render_template("stream.html", username=username) if username in active_users else self.redirect("/")
async def watch(self, username: str):
return await self.render_template("watch.html", username=username) if username in active_users else self.redirect("/")
# Socket.IO event handlers
@sio.event
async def connect(sid, environ):
print(f"Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"Client disconnected: {sid}")
@sio.on("stream_frame")
async def handle_stream_frame(sid, data):
"""
Broadcast the streamed frame (binary blob) to all watchers.
data = { "username": <str>, "frame": <binary blob> }
"""
username = data.get("username")
frame = data.get("frame") # This is binary
if username in active_users:
# Emit the frame to watchers in username's room
await sio.emit(
"video_frame",
{"username": username, "frame": frame},
room=username,
)
@sio.on("join_room")
async def join_room(sid, data):
"""Add a client to a room (either as a streamer or watcher)."""
username = data.get("username")
if username in active_users:
await sio.enter_room(sid, username) # Await the method
print(f"{sid} joined room for {username}")
@sio.on("leave_room")
async def leave_room(sid, data):
"""Remove a client from a room."""
username = data.get("username")
if username in active_users:
sio.leave_room(sid, username)
print(f"{sid} left room for {username}")
# Attach the Socket.IO server to the ASGI app
asgi_app = MyApp()
app = socketio.ASGIApp(sio, asgi_app)