Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-seahorses-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

support agent.say inside the before_llm_cb
84 changes: 54 additions & 30 deletions livekit-agents/livekit/agents/pipeline/pipeline_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ async def say(

if self._playing_speech and not self._playing_speech.nested_speech_done:
self._playing_speech.add_nested_speech(new_handle)
elif self._speech_q:
self._speech_q[0].add_nested_speech(new_handle)
else:
self._add_speech_for_playout(new_handle)

Expand Down Expand Up @@ -776,6 +778,47 @@ async def _synthesize_answer_task(
SpeechDataContextVar.reset(tk)

async def _play_speech(self, speech_handle: SpeechHandle) -> None:
fnc_done_fut = asyncio.Future[None]()
playing_lock = asyncio.Lock()
nested_speech_played = asyncio.Event()

async def _play_nested_speech():
speech_handle._nested_speech_done_fut = asyncio.Future[None]()
while not speech_handle.nested_speech_done:
nesting_changed = asyncio.create_task(
speech_handle.nested_speech_changed.wait()
)
nesting_done_fut: asyncio.Future = speech_handle._nested_speech_done_fut
await asyncio.wait(
[nesting_changed, nesting_done_fut, fnc_done_fut],
return_when=asyncio.FIRST_COMPLETED,
)
if not nesting_changed.done():
nesting_changed.cancel()

while speech_handle.nested_speech_handles:
nested_speech_played.clear()
speech = speech_handle.nested_speech_handles[0]
if speech_handle.nested_speech_done:
# in case tool speech is added after nested speech done
speech.cancel(cancel_nested=True)
speech_handle.nested_speech_handles.pop(0)
continue

async with playing_lock:
self._playing_speech = speech
await self._play_speech(speech)
speech_handle.nested_speech_handles.pop(0)
self._playing_speech = speech_handle

nested_speech_played.set()
speech_handle.nested_speech_changed.clear()
# break if the function calls task is done
if fnc_done_fut.done():
speech_handle.mark_nested_speech_done()

nested_speech_task = asyncio.create_task(_play_nested_speech())

try:
await speech_handle.wait_for_initialization()
except asyncio.CancelledError:
Expand All @@ -789,6 +832,11 @@ async def _play_speech(self, speech_handle: SpeechHandle) -> None:

user_question = speech_handle.user_question

# wait for all pre-added nested speech to be played
while speech_handle.nested_speech_handles:
await nested_speech_played.wait()

await playing_lock.acquire()
play_handle = synthesis_handle.play()
join_fut = play_handle.join()

Expand Down Expand Up @@ -884,6 +932,7 @@ def _commit_user_question_if_needed() -> None:
"speech_id": speech_handle.id,
},
)
playing_lock.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wrap playing_lock.release() in finally block?
something like

await playing_lock.acquire()
try:
    ...(existing logic)
finally:
    playing_lock.release()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be okay as if the main speech failed we should stop the whole speech handle and its nested speeches.


@utils.log_exceptions(logger=logger)
async def _execute_function_calls() -> None:
Expand Down Expand Up @@ -1010,40 +1059,15 @@ async def _execute_function_calls() -> None:
_CallContextVar.reset(tk)

if not is_using_tools:
# skip the function calls execution
fnc_done_fut.set_result(None)
await nested_speech_task
speech_handle._set_done()
return

speech_handle._nested_speech_done_fut = asyncio.Future[None]()
fnc_task = asyncio.create_task(_execute_function_calls())
while not speech_handle.nested_speech_done:
nesting_changed = asyncio.create_task(
speech_handle.nested_speech_changed.wait()
)
nesting_done_fut: asyncio.Future = speech_handle._nested_speech_done_fut
await asyncio.wait(
[nesting_changed, fnc_task, nesting_done_fut],
return_when=asyncio.FIRST_COMPLETED,
)
if not nesting_changed.done():
nesting_changed.cancel()

while speech_handle.nested_speech_handles:
speech = speech_handle.nested_speech_handles[0]
if speech_handle.nested_speech_done:
# in case tool speech is added after nested speech done
speech.cancel(cancel_nested=True)
speech_handle.nested_speech_handles.pop(0)
continue

self._playing_speech = speech
await self._play_speech(speech)
speech_handle.nested_speech_handles.pop(0)
self._playing_speech = speech_handle

speech_handle.nested_speech_changed.clear()
# break if the function calls task is done
if fnc_task.done():
speech_handle.mark_nested_speech_done()
fnc_task.add_done_callback(lambda _: fnc_done_fut.set_result(None))
await nested_speech_task

if not fnc_task.done():
logger.debug(
Expand Down
Loading