|
| 1 | +"""Minimal FastAPI server for handling OpenAI Realtime SIP calls with Twilio.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import asyncio |
| 6 | +import logging |
| 7 | +import os |
| 8 | + |
| 9 | +import websockets |
| 10 | +from fastapi import FastAPI, HTTPException, Request, Response |
| 11 | +from openai import APIStatusError, AsyncOpenAI, InvalidWebhookSignatureError |
| 12 | + |
| 13 | +from agents.realtime.items import ( |
| 14 | + AssistantAudio, |
| 15 | + AssistantMessageItem, |
| 16 | + AssistantText, |
| 17 | + InputText, |
| 18 | + UserMessageItem, |
| 19 | +) |
| 20 | +from agents.realtime.model_inputs import RealtimeModelSendRawMessage |
| 21 | +from agents.realtime.openai_realtime import OpenAIRealtimeSIPModel |
| 22 | +from agents.realtime.runner import RealtimeRunner |
| 23 | + |
| 24 | +from .agents import WELCOME_MESSAGE, get_starting_agent |
| 25 | + |
| 26 | +logging.basicConfig(level=logging.INFO) |
| 27 | + |
| 28 | +logger = logging.getLogger("twilio_sip_example") |
| 29 | + |
| 30 | + |
| 31 | +def _get_env(name: str) -> str: |
| 32 | + value = os.getenv(name) |
| 33 | + if not value: |
| 34 | + raise RuntimeError(f"Missing environment variable: {name}") |
| 35 | + return value |
| 36 | + |
| 37 | + |
| 38 | +OPENAI_API_KEY = _get_env("OPENAI_API_KEY") |
| 39 | +OPENAI_WEBHOOK_SECRET = _get_env("OPENAI_WEBHOOK_SECRET") |
| 40 | + |
| 41 | +client = AsyncOpenAI(api_key=OPENAI_API_KEY, webhook_secret=OPENAI_WEBHOOK_SECRET) |
| 42 | + |
| 43 | +# Build the multi-agent graph (triage + specialist agents) from agents.py. |
| 44 | +assistant_agent = get_starting_agent() |
| 45 | + |
| 46 | +app = FastAPI() |
| 47 | + |
| 48 | +# Track background tasks so repeated webhooks do not spawn duplicates. |
| 49 | +active_call_tasks: dict[str, asyncio.Task[None]] = {} |
| 50 | + |
| 51 | + |
| 52 | +async def accept_call(call_id: str) -> None: |
| 53 | + """Accept the incoming SIP call and configure the realtime session.""" |
| 54 | + |
| 55 | + # The starting agent uses static instructions, so we can forward them directly to the accept |
| 56 | + # call payload. If someone swaps in a dynamic prompt, fall back to a sensible default. |
| 57 | + instructions_payload = ( |
| 58 | + assistant_agent.instructions |
| 59 | + if isinstance(assistant_agent.instructions, str) |
| 60 | + else "You are a helpful triage agent for ABC customer service." |
| 61 | + ) |
| 62 | + |
| 63 | + try: |
| 64 | + # AsyncOpenAI does not yet expose high-level helpers like client.realtime.calls.accept, so |
| 65 | + # we call the REST endpoint directly via client.post(). Keep this until the SDK grows an |
| 66 | + # async helper. |
| 67 | + await client.post( |
| 68 | + f"/realtime/calls/{call_id}/accept", |
| 69 | + body={ |
| 70 | + "type": "realtime", |
| 71 | + "model": "gpt-realtime", |
| 72 | + "instructions": instructions_payload, |
| 73 | + }, |
| 74 | + cast_to=dict, |
| 75 | + ) |
| 76 | + except APIStatusError as exc: |
| 77 | + if exc.status_code == 404: |
| 78 | + # Twilio occasionally retries webhooks after the caller hangs up; treat as a no-op so |
| 79 | + # the webhook still returns 200. |
| 80 | + logger.warning( |
| 81 | + "Call %s no longer exists when attempting accept (404). Skipping.", call_id |
| 82 | + ) |
| 83 | + return |
| 84 | + |
| 85 | + detail = exc.message |
| 86 | + if exc.response is not None: |
| 87 | + try: |
| 88 | + detail = exc.response.text |
| 89 | + except Exception: # noqa: BLE001 |
| 90 | + detail = str(exc.response) |
| 91 | + |
| 92 | + logger.error( |
| 93 | + "Failed to accept call %s: %s %s", call_id, exc.status_code, detail |
| 94 | + ) |
| 95 | + raise HTTPException(status_code=500, detail="Failed to accept call") from exc |
| 96 | + |
| 97 | + logger.info("Accepted call %s", call_id) |
| 98 | + |
| 99 | + |
| 100 | +async def observe_call(call_id: str) -> None: |
| 101 | + """Attach to the realtime session and log conversation events.""" |
| 102 | + |
| 103 | + runner = RealtimeRunner(assistant_agent, model=OpenAIRealtimeSIPModel()) |
| 104 | + |
| 105 | + try: |
| 106 | + initial_settings = { |
| 107 | + "turn_detection": { |
| 108 | + "type": "semantic_vad", |
| 109 | + "interrupt_response": True, |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + async with await runner.run( |
| 114 | + model_config={ |
| 115 | + "call_id": call_id, |
| 116 | + "initial_model_settings": initial_settings, |
| 117 | + } |
| 118 | + ) as session: |
| 119 | + # Trigger an initial greeting so callers hear the agent right away. |
| 120 | + # Issue a response.create immediately after the WebSocket attaches so the model speaks |
| 121 | + # before the caller says anything. Using the raw client message ensures zero latency |
| 122 | + # and avoids threading the greeting through history. |
| 123 | + await session.model.send_event( |
| 124 | + RealtimeModelSendRawMessage( |
| 125 | + message={ |
| 126 | + "type": "response.create", |
| 127 | + "response": { |
| 128 | + "instructions": ( |
| 129 | + "Say exactly '" |
| 130 | + f"{WELCOME_MESSAGE}" |
| 131 | + "' now before continuing the conversation." |
| 132 | + ) |
| 133 | + }, |
| 134 | + } |
| 135 | + ) |
| 136 | + ) |
| 137 | + |
| 138 | + async for event in session: |
| 139 | + if event.type == "history_added": |
| 140 | + item = event.item |
| 141 | + if isinstance(item, UserMessageItem): |
| 142 | + for content in item.content: |
| 143 | + if isinstance(content, InputText) and content.text: |
| 144 | + logger.info("Caller: %s", content.text) |
| 145 | + elif isinstance(item, AssistantMessageItem): |
| 146 | + for content in item.content: |
| 147 | + if isinstance(content, AssistantText) and content.text: |
| 148 | + logger.info("Assistant (text): %s", content.text) |
| 149 | + elif isinstance(content, AssistantAudio) and content.transcript: |
| 150 | + logger.info("Assistant (audio transcript): %s", content.transcript) |
| 151 | + elif event.type == "error": |
| 152 | + logger.error("Realtime session error: %s", event.error) |
| 153 | + |
| 154 | + except websockets.exceptions.ConnectionClosedError: |
| 155 | + # Callers hanging up causes the WebSocket to close without a frame; log at info level so it |
| 156 | + # does not surface as an error. |
| 157 | + logger.info("Realtime WebSocket closed for call %s", call_id) |
| 158 | + except Exception as exc: # noqa: BLE001 - demo logging only |
| 159 | + logger.exception("Error while observing call %s", call_id, exc_info=exc) |
| 160 | + finally: |
| 161 | + logger.info("Call %s ended", call_id) |
| 162 | + active_call_tasks.pop(call_id, None) |
| 163 | + |
| 164 | + |
| 165 | +def _track_call_task(call_id: str) -> None: |
| 166 | + existing = active_call_tasks.get(call_id) |
| 167 | + if existing and not existing.done(): |
| 168 | + existing.cancel() |
| 169 | + |
| 170 | + task = asyncio.create_task(observe_call(call_id)) |
| 171 | + active_call_tasks[call_id] = task |
| 172 | + |
| 173 | + |
| 174 | +@app.post("/openai/webhook") |
| 175 | +async def openai_webhook(request: Request) -> Response: |
| 176 | + body = await request.body() |
| 177 | + |
| 178 | + try: |
| 179 | + event = client.webhooks.unwrap(body, request.headers) |
| 180 | + except InvalidWebhookSignatureError as exc: |
| 181 | + raise HTTPException(status_code=400, detail="Invalid webhook signature") from exc |
| 182 | + |
| 183 | + if event.type == "realtime.call.incoming": |
| 184 | + call_id = event.data.call_id |
| 185 | + await accept_call(call_id) |
| 186 | + _track_call_task(call_id) |
| 187 | + return Response(status_code=200) |
| 188 | + |
| 189 | + # Ignore other webhook event types for brevity. |
| 190 | + return Response(status_code=200) |
| 191 | + |
| 192 | + |
| 193 | +@app.get("/") |
| 194 | +async def healthcheck() -> dict[str, str]: |
| 195 | + return {"status": "ok"} |
0 commit comments