- FastAPI Integration: Native support for FastAPI WebSocket connections
- WebSockets Library: Support for the standard websockets library
- Event-Driven: Register callbacks/hooks for different events
- Easy Media Handling: Simple methods to send and receive media
- Type-Safe: Full type hints and Pydantic models for better IDE support
- Modular Design: Built to support multiple frameworks
pip install plivo-stream-sdkFor development (includes uvicorn for running examples):
pip install -e[dev]from fastapi import FastAPI, WebSocket
from plivo_stream import PlivoFastAPIStreamingHandler, MediaEvent
app = FastAPI()
@app.websocket("/stream")
async def websocket_endpoint(websocket: WebSocket):
handler = PlivoFastAPIStreamingHandler(websocket)
@handler.on_connected
async def on_connect():
print("Client connected!")
@handler.on_media
async def on_media(event: MediaEvent):
audio_bytes = event.get_raw_media() # raw decoded bytes
print(f"Received {len(audio_bytes)} bytes at chunk {event.media.chunk}")
@handler.on_disconnected
async def on_disconnect():
print("Client disconnected!")
await handler.start()import asyncio
import websockets
from plivo_stream import PlivoWebsocketStreamingHandler
from plivo_stream import MediaEvent
async def create_handler(websocket):
handler = PlivoWebsocketStreamingHandler()
@handler.on_connected
async def on_connect():
print("Client connected!")
@handler.on_media
async def on_media(event: MediaEvent):
audio_bytes = event.get_raw_media()
print(f"Received {len(audio_bytes)} bytes")
@handler.on_disconnected
async def on_disconnect():
print("Client disconnected!")
await handler.handle(websocket)
async def main():
async with websockets.serve(create_handler, "0.0.0.0", 8000):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())The SDK acts as a bridge between Plivo's server and your application, handling bidirectional audio streaming.
- Minimal Websocket example
- Minimal FastAPI example
- Demo using FastAPI, Deepgram STT, ElevenLabs TTS, OpenAI LLM
Main class for handling FastAPI WebSocket connections.
handler = PlivoFastAPIStreamingHandler(websocket)Register callbacks using decorators:
@handler.on_connected
Called when WebSocket connection is established.
@handler.on_connected
async def on_connect():
print("Connected!")@handler.on_disconnected
Called when WebSocket connection is closed.
@handler.on_disconnected
async def on_disconnect():
print("Disconnected!")@handler.on_media
Called when media (audio) data is received.
@handler.on_media
async def on_media(event: MediaEvent):
# Access Pydantic fields
print(event.media.track, event.media.timestamp, event.media.chunk)
# Get raw decoded audio bytes
audio_bytes = event.get_raw_media()@handler.on_start
Called when the stream starts; includes stream and call identifiers.
@handler.on_start
async def on_start(event: StartEvent):
print(f"Stream started: streamId={event.start.stream_id}, callId={event.start.call_id}")@handler.on_dtmf
Called when a DTMF tone is detected.
@handler.on_dtmf
async def on_dtmf(event: DtmfEvent):
print(f"DTMF received: {event.dtmf.digit}")@handler.on_played_stream
Called when buffered audio before a checkpoint has finished playing.
@handler.on_played_stream
async def on_played_stream(event: PlayedStreamEvent):
print(f"Checkpoint played: {event.name} on stream {event.stream_id}")@handler.on_cleared_audio
Called when the audio buffer is cleared.
@handler.on_cleared_audio
async def on_cleared_audio(event: ClearedAudioEvent):
print(f"Cleared audio on stream {event.stream_id}")@handler.on_event(event_type)
Called for specific Plivo event types.
@handler.on_event("start")
async def on_start(event: StreamEvent):
print(f"Stream started: {event.data}")@handler.on_error
Called when an error occurs.
@handler.on_error
async def on_error(error):
print(f"Error: {error}")send_media(media_data)
Send media data through the WebSocket.
# raw PCM/mulaw bytes (SDK will base64-encode for you)
await handler.send_media(audio_bytes)
# optionally specify content type and sample rate
await handler.send_media(audio_bytes, content_type="audio/x-l16", sample_rate=16000)send_checkpoint(checkpoint_name)
Send a checkpoint event to track message processing.
await handler.send_checkpoint("processing_complete")send_clear_audio()
Clear the audio buffer on the stream.
await handler.send_clear_audio()send_json(data)
Send arbitrary JSON data.
await handler.send_json({"event": "custom", "data": "value"})send_text(message)
Send text message.
await handler.send_text("Hello")start()
Start the WebSocket listener loop. This should be awaited in your endpoint.
await handler.start()stop()
Stop the WebSocket listener and close the connection.
await handler.stop()Main class for handling plain WebSocket connections using the websockets library.
handler = PlivoWebsocketStreamingHandler()Same decorator-based event hooks as PlivoFastAPIStreamingHandler:
@handler.on_connected@handler.on_disconnected@handler.on_media@handler.on_start@handler.on_dtmf@handler.on_played_stream@handler.on_cleared_audio@handler.on_event(event_type)@handler.on_error
Same methods as PlivoFastAPIStreamingHandler:
send_media(media_data)send_checkpoint(checkpoint_name)send_clear_audio()send_json(data)send_text(message)
handle(websocket)
Handle a WebSocket connection. This should be called from your websockets.serve handler.
async def connection_handler(websocket):
await handler.handle(websocket)stop()
Stop the WebSocket listener and close the connection.
await handler.stop()The SDK recognizes these Plivo Streaming API events:
connected- WebSocket connection establisheddisconnected- WebSocket connection closedmedia- Audio data receivedstart- Stream startederror- Error occurredplayedStream- Audio events buffered before the Checkpoint were successfully played out to the end userclearedAudio- Cleared all buffered media eventsdtmf- Sent when someone presses a touch-tone number key in the inbound stream
