-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathmain.py
81 lines (60 loc) · 2.14 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
from random import choices
from typing import Any
from fastapi import FastAPI, Depends, HTTPException
from fastapi.websockets import WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from manager import WebSocketManager
app = FastAPI()
manager = WebSocketManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_json()
await manager.send_response(client_id, data)
except WebSocketDisconnect:
manager.disconnect(client_id)
await manager.send_one(client_id, f"Client #{client_id} left the chat")
async def client():
if not manager.clients:
raise HTTPException(status_code=400, detail="No clients connected")
client_id = choices(list(manager.clients.keys()))[0]
if not client_id:
raise HTTPException(status_code=400, detail="No clients connected")
return client_id
class InvokeParams(BaseModel):
action: str
param: Any = None
async def send_request(client_id: str, params: dict):
response = await manager.send_one(client_id, params)
if not response:
raise HTTPException(status_code=400, detail=f"{client_id} offline")
try:
result = await asyncio.wait_for(manager.get_response(client_id), timeout=10) # Set timeout
except asyncio.TimeoutError:
raise HTTPException(status_code=400, detail="Timeout waiting for client response")
return result
@app.post("/invoke")
async def handle_invoke(
params: InvokeParams,
client_id: str = Depends(client)
):
return await send_request(client_id, {"action": params.action, "param": params.param})
@app.get("/invoke")
async def handle_invoke_get(
action: str,
param: Any = None,
client_id: str = Depends(client)
):
return await send_request(client_id, {"action": action, "param": param})
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app="main:app",
log_level="info",
port=8000,
# loop="asyncio",
workers=1,
)