conversation.item.done event implementation#451
Conversation
WalkthroughThe change updates OpenAI Realtime event handling and multimodal content assembly in src/draive/openai/realtime.py. It adds a handler for "conversation.item.done" that processes only items with an id and type "message", and triggers retrieval for assistant messages. The existing "conversation.item.retrieved" path is retained and clarified to operate only on messages, with logic to request completion when an item is not completed. Memory updates for user and assistant items now use multimodal builders; _assistant_content_to_multimodal was extended to include audio (ResourceContent with mime_type) and transcript (TextContent with transcript meta) parts alongside text. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes The diff is limited to a single file but contains multiple related changes: added event branch, adjusted retrieval/ retry flow, and expanded multimodal content assembly. Review requires checking conditional flows, memory updates, and new content part construction. Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/draive/openai/realtime.py (1)
344-344: Consider optimizing retrieve calls for user items only.Since
conversation.item.retrievednow only handles user messages (line 398), and assistant messages useconversation.item.doneinstead, callingretrievefor assistant items at line 344 generates events that are immediately discarded at line 417-418.Optimization:
- # request the full item to be stored in the memory - await connection.conversation.item.retrieve(item_id=event.item.id) + # request the full item to be stored in the memory (user items only) + if event.item.role == "user": + await connection.conversation.item.retrieve(item_id=event.item.id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
src/draive/openai/realtime.py(3 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use Python 3.12+ features and syntax across the codebase
Format code exclusively with Ruff (make format); do not use other formatters
Skip module-level docstrings
Files:
src/draive/openai/realtime.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import Haiway symbols directly (from haiway import State, ctx)
Use ctx.scope(...) to bind scoped Disposables and active State; avoid global state
Route all logs through ctx.log_debug/info/warn/error; do not use print
Use latest, most strict typing syntax (Python 3.12+), with strict typing only for public APIs
Avoid loose Any except at explicit third‑party boundaries
Prefer explicit attribute access with static types; avoid dynamic getattr except at narrow boundaries
Prefer Mapping/Sequence/Iterable in public types over dict/list/set
Use final where applicable; avoid inheritance and prefer composition
Use precise unions (|) and narrow with match/isinstance; avoid cast unless provably safe and localized
Model immutable data/config and facades with haiway.State; provide ergonomic classmethods like .of(...)
Avoid in-place mutation; use State.updated(...) or functional builders to produce new instances
Access active state via haiway.ctx inside async scopes (ctx.scope(...))
Use @statemethod for public state methods that dispatch on the active instance
Log around generation calls, tool dispatch, and provider requests/responses without leaking secrets; prefer structured/concise messages
Add metrics via ctx.record where applicable
All I/O is async; keep boundaries async and use ctx.spawn for detached tasks
Use structured concurrency and valid coroutine usage; rely on haiway/asyncio; avoid custom threading
Construct multimodal content with MultimodalContent.of(...) and compose blocks explicitly
Use ResourceContent/ResourceReference for media/data blobs
Wrap custom types/data within ArtifactContent; use hidden when needed
Add NumPy-style docstrings for public symbols with Parameters/Returns/Raises and rationale when non-obvious
Avoid docstrings on internal helpers; keep names self-explanatory
Keep docstrings high-quality; mkdocstrings pulls them into API reference
Never log secrets or full request bodies containing keys/tokens
Files:
src/draive/openai/realtime.py
src/draive/{openai,anthropic,mistral,gemini,vllm,ollama,bedrock,cohere}/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/{openai,anthropic,mistral,gemini,vllm,ollama,bedrock,cohere}/**/*.py: Provider-specific feature modules live under their respective provider directories
Translate provider/SDK errors into typed exceptions; do not raise bare Exception and preserve context
Use environment variables for credentials and resolve via helper functions like getenv_str
Files:
src/draive/openai/realtime.py
🧬 Code graph analysis (1)
src/draive/openai/realtime.py (4)
src/draive/models/types.py (14)
content(474-478)content(618-622)ModelOutput(573-672)of(272-294)of(315-347)of(375-411)of(444-467)of(534-558)of(587-611)of(735-761)of(790-816)of(843-870)of(935-963)ModelInput(429-501)src/draive/multimodal/content.py (1)
audio(161-176)src/draive/resources/types.py (1)
ResourceContent(126-212)src/draive/multimodal/text.py (1)
TextContent(11-82)
🔇 Additional comments (3)
src/draive/openai/realtime.py (3)
362-379: LGTM! Assistant message handling is correct.The switch to
ModelOutput.of()with_assistant_content_to_multimodalproperly captures assistant completions as outputs. Error handling preserves context by re-raising after logging.
384-418: LGTM! User message retrieval flow is correct.The new handler properly uses
ModelInput.of()with_user_content_to_multimodalfor user messages, and the retry logic at lines 392-394 ensures incomplete items are re-fetched.
862-876: LGTM! Audio and transcript handling implemented correctly.The new audio and transcript processing correctly mirrors the pattern in
_user_content_to_multimodal(lines 833-847), usingResourceContent.of()for audio blobs with proper MIME types andTextContent.of()for transcripts with metadata tagging.
|
|
||
| # Only record completed items, otherwise request once more | ||
| if event.item.status != "completed": | ||
| await connection.conversation.item.retrieve(item_id=event.item.id) |
There was a problem hiding this comment.
shouldn't we request retrieve on incomplete? also, is it possible to receive incomplete on "done" event?
4a254a8 to
0d36d6e
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/draive/openai/realtime.py (3)
827-833: Apply the same audio fix to user content builder for consistency.User audio is handled the same way; avoid double base64 here too.
- if part.audio: - parts.append( - ResourceContent.of( - part.audio, - mime_type=audio_format, - ) - ) + if part.audio: + audio_bytes: bytes + if isinstance(part.audio, str): + try: + audio_bytes = urlsafe_b64decode(part.audio) + except Exception: + audio_bytes = b64decode(part.audio) + else: + audio_bytes = part.audio + parts.append(ResourceContent.of(audio_bytes, mime_type=audio_format))
585-589: Dict mutated during iteration; iterate over a snapshot to avoid RuntimeError.Current code deletes keys while iterating
current_items.keys().- for item_id in current_items.keys(): + for item_id in list(current_items): try: await connection.conversation.item.delete(item_id=item_id) del current_items[item_id]
293-296: Nit: fix comment typo.“speach” → “speech”.
- # send event that VAD detected input speach + # send event that VAD detected input speech
♻️ Duplicate comments (1)
src/draive/openai/realtime.py (1)
371-371: Replace assertion with graceful handling (duplicate of prior review).Skip/log empty content instead of raising AssertionError in production.
- assert event.item.content # nosec: B101 + if not event.item.content: + ctx.log_warn( + "Skipping completed item without content", + item_id=event.item.id, + ) + continue
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
src/draive/openai/realtime.py(2 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use Python 3.12+ features and syntax across the codebase
Format code exclusively with Ruff (make format); do not use other formatters
Skip module-level docstrings
Files:
src/draive/openai/realtime.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import Haiway symbols directly (from haiway import State, ctx)
Use ctx.scope(...) to bind scoped Disposables and active State; avoid global state
Route all logs through ctx.log_debug/info/warn/error; do not use print
Use latest, most strict typing syntax (Python 3.12+), with strict typing only for public APIs
Avoid loose Any except at explicit third‑party boundaries
Prefer explicit attribute access with static types; avoid dynamic getattr except at narrow boundaries
Prefer Mapping/Sequence/Iterable in public types over dict/list/set
Use final where applicable; avoid inheritance and prefer composition
Use precise unions (|) and narrow with match/isinstance; avoid cast unless provably safe and localized
Model immutable data/config and facades with haiway.State; provide ergonomic classmethods like .of(...)
Avoid in-place mutation; use State.updated(...) or functional builders to produce new instances
Access active state via haiway.ctx inside async scopes (ctx.scope(...))
Use @statemethod for public state methods that dispatch on the active instance
Log around generation calls, tool dispatch, and provider requests/responses without leaking secrets; prefer structured/concise messages
Add metrics via ctx.record where applicable
All I/O is async; keep boundaries async and use ctx.spawn for detached tasks
Use structured concurrency and valid coroutine usage; rely on haiway/asyncio; avoid custom threading
Construct multimodal content with MultimodalContent.of(...) and compose blocks explicitly
Use ResourceContent/ResourceReference for media/data blobs
Wrap custom types/data within ArtifactContent; use hidden when needed
Add NumPy-style docstrings for public symbols with Parameters/Returns/Raises and rationale when non-obvious
Avoid docstrings on internal helpers; keep names self-explanatory
Keep docstrings high-quality; mkdocstrings pulls them into API reference
Never log secrets or full request bodies containing keys/tokens
Files:
src/draive/openai/realtime.py
src/draive/{openai,anthropic,mistral,gemini,vllm,ollama,bedrock,cohere}/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/{openai,anthropic,mistral,gemini,vllm,ollama,bedrock,cohere}/**/*.py: Provider-specific feature modules live under their respective provider directories
Translate provider/SDK errors into typed exceptions; do not raise bare Exception and preserve context
Use environment variables for credentials and resolve via helper functions like getenv_str
Files:
src/draive/openai/realtime.py
🧬 Code graph analysis (1)
src/draive/openai/realtime.py (3)
src/draive/multimodal/content.py (1)
audio(161-176)src/draive/resources/types.py (1)
ResourceContent(126-212)src/draive/multimodal/text.py (1)
TextContent(11-82)
🔇 Additional comments (1)
src/draive/openai/realtime.py (1)
349-358: Remove the defensive check; the code is correct as-is.OpenAI Realtime documentation confirms that
conversation.item.doneis emitted after an item reaches "completed" status and signals the item finished processing. The unconditionalretrieve()call is the correct behavior and does not require a status guard. The original review suggestion was overly cautious.
| if part.audio: | ||
| parts.append( | ||
| ResourceContent.of( | ||
| part.audio, | ||
| mime_type=audio_format, | ||
| ) | ||
| ) | ||
|
|
||
| if part.transcript: | ||
| parts.append( | ||
| TextContent.of( | ||
| part.transcript, | ||
| meta={"transcript": True}, | ||
| ) | ||
| ) | ||
|
|
There was a problem hiding this comment.
Assistant audio persistence likely double-encodes; decode to bytes before ResourceContent.of.
ResourceContent.of(str) base64‑encodes the string; if part.audio is base64, this corrupts audio. Decode to bytes first.
- if part.audio:
- parts.append(
- ResourceContent.of(
- part.audio,
- mime_type=audio_format,
- )
- )
+ if part.audio:
+ # Normalize to raw bytes (handle base64 str or bytes)
+ audio_bytes: bytes
+ if isinstance(part.audio, str):
+ try:
+ audio_bytes = urlsafe_b64decode(part.audio)
+ except Exception:
+ audio_bytes = b64decode(part.audio)
+ else:
+ audio_bytes = part.audio
+ parts.append(ResourceContent.of(audio_bytes, mime_type=audio_format))
- if part.transcript:
+ if part.transcript:
parts.append(
TextContent.of(
part.transcript,
meta={"transcript": True},
)
)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/draive/openai/realtime.py around lines 856 to 871, the code passes
part.audio (likely a base64 string) directly to ResourceContent.of which will
Base64-encode a string and corrupt the audio; update the code to ensure
part.audio is bytes before calling ResourceContent.of by detecting if part.audio
is a str and, if so, decoding it with base64.b64decode (otherwise pass bytes
through), then call ResourceContent.of with the resulting bytes and the same
mime_type.
Description
Aligned realtime memory updates with OpenAI’s event flow so we only store finalized assistant outputs and capture richer payloads.
conversation.item.doneas the authoritative signal for assistant completions; skip incompleteitems instead of re-fetching prematurely.
conversation.item.retrievedfor user messages and continue refetching until completion._assistant_content_to_multimodaltoappend audio blobs (with the correct MIME type) and transcript text tagged via metadata.