Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 1 addition & 56 deletions examples/realtime/audio_util.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from __future__ import annotations

import io
import base64
import asyncio
import threading
from typing import Callable, Awaitable

import numpy as np
import pyaudio
import sounddevice as sd
from pydub import AudioSegment

from openai.resources.realtime.realtime import AsyncRealtimeConnection

CHUNK_LENGTH_S = 0.05 # 100ms
SAMPLE_RATE = 24000
FORMAT = pyaudio.paInt16
Expand Down Expand Up @@ -89,54 +84,4 @@ def stop(self):
self.queue = []

def terminate(self):
self.stream.close()


async def send_audio_worker_sounddevice(
connection: AsyncRealtimeConnection,
should_send: Callable[[], bool] | None = None,
start_send: Callable[[], Awaitable[None]] | None = None,
):
sent_audio = False

device_info = sd.query_devices()
print(device_info)

read_size = int(SAMPLE_RATE * 0.02)

stream = sd.InputStream(
channels=CHANNELS,
samplerate=SAMPLE_RATE,
dtype="int16",
)
stream.start()

try:
while True:
if stream.read_available < read_size:
await asyncio.sleep(0)
continue

data, _ = stream.read(read_size)

if should_send() if should_send else True:
if not sent_audio and start_send:
await start_send()
await connection.send(
{"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")}
)
sent_audio = True

elif sent_audio:
print("Done, triggering inference")
await connection.send({"type": "input_audio_buffer.commit"})
await connection.send({"type": "response.create", "response": {}})
sent_audio = False

await asyncio.sleep(0)

except KeyboardInterrupt:
pass
finally:
stream.stop()
stream.close()
self.stream.close()
92 changes: 80 additions & 12 deletions examples/realtime/azure_realtime.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
#!/usr/bin/env uv run
#
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "textual",
# "numpy",
# "pyaudio",
# "pydub",
# "sounddevice",
# "openai[realtime]",
# "azure-identity",
# "aiohttp",
# "python-dotenv",
# ]
#
# [tool.uv.sources]
# openai = { path = "../../", editable = true }
# ///

import logging
from dotenv import load_dotenv
import httpx

load_dotenv()

import os
import base64
import asyncio

from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
Expand All @@ -11,6 +38,14 @@
# Supported models and API versions: https://learn.microsoft.com/azure/ai-services/openai/how-to/realtime-audio#supported-models
# Entra ID auth: https://learn.microsoft.com/azure/ai-services/openai/how-to/managed-identity

logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("websockets").setLevel(logging.DEBUG)

logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.DEBUG,
)


async def main() -> None:
"""The following example demonstrates how to configure Azure OpenAI to use the Realtime API.
Expand All @@ -21,21 +56,40 @@ async def main() -> None:
"""

credential = DefaultAzureCredential()

if not (api_key := os.environ.get("AZURE_OPENAI_API_KEY")):
token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default")
else:
token_provider = None

endpoint = httpx.URL(os.environ["AZURE_OPENAI_ENDPOINT"])
if endpoint.scheme in ("ws", "wss"):
websocket_base_url, azure_endpoint = f"{endpoint}/openai", None
else:
websocket_base_url, azure_endpoint = None, endpoint

print(f"{websocket_base_url=}, {azure_endpoint=}")

client = AsyncAzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_ad_token_provider=get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default"),
api_version="2024-10-01-preview",
)
async with client.realtime.connect(
azure_deployment="gpt-realtime",
azure_endpoint=str(azure_endpoint),
websocket_base_url=websocket_base_url,
azure_ad_token_provider=token_provider,
api_key=api_key,
api_version="2025-04-01-preview"
) # type: ignore

async with client.beta.realtime.connect(
model="gpt-realtime", # deployment name for your model
) as connection:
await connection.session.update(
session={
"output_modalities": ["text"],
"model": "gpt-realtime",
"type": "realtime",
# "output_modalities": ["text"],
# "model": "gpt-realtime",
# "type": "realtime",
}
)

while True:
user_input = input("Enter a message: ")
if user_input == "q":
Expand All @@ -48,14 +102,28 @@ async def main() -> None:
"content": [{"type": "input_text", "text": user_input}],
}
)

await connection.response.create()
async for event in connection:
if event.type == "response.output_text.delta":
print(f"Event: {event.type}")

if event.type == "error":
print(f"ERROR: {event}")

if event.type == "response.text.delta":
print(event.delta, flush=True, end="")
elif event.type == "response.output_text.done":
if event.type == "response.text.done":
print()
elif event.type == "response.done":
break
if event.type == "response.done":
print(f"final response: {event.response.output[0].content[0].transcript}")
print(f"usage: {event.response.usage}")

if event.type == "response.audio.delta":
audio_data = base64.b64decode(event.delta)
print(f"Received {len(audio_data)} bytes of audio data.")

if event.type == "response.audio_transcript.delta":
print(f"Received text delta: {event.delta}")

await credential.close()

Expand Down
71 changes: 50 additions & 21 deletions examples/realtime/push_to_talk_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@
# "pydub",
# "sounddevice",
# "openai[realtime]",
# "azure-identity",
# "aiohttp",
# "python-dotenv",
# ]
#
# [tool.uv.sources]
# openai = { path = "../../", editable = true }
# ///
from __future__ import annotations

from dotenv import load_dotenv
import httpx

load_dotenv()

import os
import base64
import asyncio
from typing import Any, cast
Expand All @@ -33,13 +42,14 @@
from textual import events
from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
from textual.app import App, ComposeResult
from textual.widgets import Button, Static, RichLog
from textual.widgets import Static, RichLog
from textual.reactive import reactive
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from textual.containers import Container

from openai import AsyncOpenAI
from openai.types.realtime.session import Session
from openai import AsyncAzureOpenAI
from openai.resources.realtime.realtime import AsyncRealtimeConnection
from openai.types.realtime.session_update_event import Session


class SessionDisplay(Static):
Expand Down Expand Up @@ -123,7 +133,7 @@ class RealtimeApp(App[None]):
}
"""

client: AsyncOpenAI
client: AsyncAzureOpenAI
should_send_audio: asyncio.Event
audio_player: AudioPlayerAsync
last_audio_item_id: str | None
Expand All @@ -135,7 +145,30 @@ def __init__(self) -> None:
super().__init__()
self.connection = None
self.session = None
self.client = AsyncOpenAI()

if not (api_key := os.environ.get("AZURE_OPENAI_API_KEY")):
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default")
else:
token_provider = None

endpoint = httpx.URL(os.environ["AZURE_OPENAI_ENDPOINT"])
if endpoint.scheme in ("ws", "wss"):
websocket_base_url, azure_endpoint = f"{endpoint}/openai", None
else:
websocket_base_url, azure_endpoint = None, endpoint

print(f"{websocket_base_url=}, {azure_endpoint=}")

self.client = AsyncAzureOpenAI(
azure_deployment="gpt-realtime",
azure_endpoint=str(azure_endpoint),
websocket_base_url=websocket_base_url,
azure_ad_token_provider=token_provider,
api_key=api_key,
api_version="2025-04-01-preview"
) # type: ignore

self.audio_player = AudioPlayerAsync()
self.last_audio_item_id = None
self.should_send_audio = asyncio.Event()
Expand All @@ -154,21 +187,21 @@ async def on_mount(self) -> None:
self.run_worker(self.send_mic_audio())

async def handle_realtime_connection(self) -> None:
async with self.client.realtime.connect(model="gpt-realtime") as conn:
async with self.client.beta.realtime.connect(model="gpt-realtime") as conn:
self.connection = conn
self.connected.set()

# note: this is the default and can be omitted
# if you want to manually handle VAD yourself, then set `'turn_detection': None`
await conn.session.update(
session={
"audio": {
"input": {"turn_detection": {"type": "server_vad"}},
},
"model": "gpt-realtime",
"type": "realtime",
}
)
# await conn.session.update(
# session={
# "audio": {
# "input": {"turn_detection": {"type": "server_vad"}},
# },
# "model": "gpt-realtime",
# "type": "realtime",
# }
# )

acc_items: dict[str, Any] = {}

Expand All @@ -184,7 +217,7 @@ async def handle_realtime_connection(self) -> None:
self.session = event.session
continue

if event.type == "response.output_audio.delta":
if event.type == "response.audio.delta":
if event.item_id != self.last_audio_item_id:
self.audio_player.reset_frame_count()
self.last_audio_item_id = event.item_id
Expand All @@ -193,7 +226,7 @@ async def handle_realtime_connection(self) -> None:
self.audio_player.add_data(bytes_data)
continue

if event.type == "response.output_audio_transcript.delta":
if event.type == "response.audio_transcript.delta":
try:
text = acc_items[event.item_id]
except KeyError:
Expand Down Expand Up @@ -258,10 +291,6 @@ async def send_mic_audio(self) -> None:

async def on_key(self, event: events.Key) -> None:
"""Handle key press events."""
if event.key == "enter":
self.query_one(Button).press()
return

if event.key == "q":
self.exit()
return
Expand Down