Skip to content

fix(daily): queue outbound messages until transport joins#3615

Open
omChauhanDev wants to merge 3 commits intopipecat-ai:mainfrom
omChauhanDev:fix/daily-transport-message-queue
Open

fix(daily): queue outbound messages until transport joins#3615
omChauhanDev wants to merge 3 commits intopipecat-ai:mainfrom
omChauhanDev:fix/daily-transport-message-queue

Conversation

@omChauhanDev
Copy link
Contributor

@omChauhanDev omChauhanDev commented Feb 1, 2026

Please describe the changes in your PR. If it is addressing an issue, please reference that as well.

Fixes #3578

Problem

A race condition exists during pipeline startup when using RTVIObserver with DailyTransport. The observer emits messages (e.g., bot-ready, metrics) immediately upon receiving the StartFrame, but DailyTransport.join() is asynchronous. This causes the transport to reject messages with ERROR | Unable to send message: Unable to send messages before joining.

Solution

Buffer outbound messages in _join_message_queue and flush them once join succeeds.

Testing

Before :
image

After :
image

Also verified(at client side) that initial messages are now successfully received after the transport joins.

@codecov
Copy link

codecov bot commented Feb 1, 2026

Codecov Report

❌ Patch coverage is 0% with 8 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/pipecat/transports/daily/transport.py 0.00% 8 Missing ⚠️
Files with missing lines Coverage Δ
src/pipecat/transports/daily/transport.py 0.00% <0.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

self._audio_task = None
self._video_task = None
self._message_queue: Optional[asyncio.Queue] = None
self._message_task = None
Copy link
Contributor

@aconchillo aconchillo Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need an additional task. We can just queue the messages and flush them in join() when we know we are joined.

Also, maybe rename _message_queue to _join_message_queue so it's a bit more clear what it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just add a small function self._flush_join_messages().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this sounds much cleaner....done thanks

@omChauhanDev omChauhanDev force-pushed the fix/daily-transport-message-queue branch from 9525584 to a4e187e Compare February 9, 2026 00:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RTVIObserver sends message before DailyTransport join completes, causing "Unable to send messages before joining" error

2 participants