Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/voice_agents/email_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def register_for_event(self, context: RunContext):
async def entrypoint(ctx: JobContext):
session = AgentSession(
vad=silero.VAD.load(),
llm=inference.LLM("google/gemini-2.5-flash"),
llm=inference.LLM("openai/gpt-4.1-mini"),
stt=inference.STT("deepgram/nova-3"),
tts=inference.TTS("cartesia/sonic-3"),
)
Expand Down
15 changes: 15 additions & 0 deletions livekit-agents/livekit/agents/voice/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,18 @@ def _handle_task_done(_: asyncio.Task[Any]) -> None:
old_agent = old_activity.agent
session = old_activity.session

old_allow_interruptions = True
if speech_handle:
if speech_handle.interrupted:
raise RuntimeError(
f"{self.__class__.__name__} cannot be awaited inside a function tool that is already interrupted"
)

# lock the speech handle to prevent interruptions until the task is complete
# there should be no await before this line to avoid race conditions
old_allow_interruptions = speech_handle.allow_interruptions
speech_handle.allow_interruptions = False

blocked_tasks = [current_task]
if (
old_activity._on_enter_task
Expand Down Expand Up @@ -790,6 +802,9 @@ def _handle_task_done(_: asyncio.Task[Any]) -> None:
return await asyncio.shield(self.__fut)

finally:
if speech_handle:
speech_handle.allow_interruptions = old_allow_interruptions

# run_state could have changed after self.__fut
run_state = session._global_run_state

Expand Down
189 changes: 82 additions & 107 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None:
# for false interruption handling
self._paused_speech: SpeechHandle | None = None
self._false_interruption_timer: asyncio.TimerHandle | None = None
self._interrupt_paused_speech_task: asyncio.Task[None] | None = None
self._cancel_speech_pause_task: asyncio.Task[None] | None = None

self._stt_eos_received: bool = False

# fired when a speech_task finishes or when a new speech_handle is scheduled
Expand Down Expand Up @@ -754,8 +755,11 @@ async def _close_session(self) -> None:
*(mcp_server.aclose() for mcp_server in self.mcp_servers), return_exceptions=True
)

await self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task)
self._interrupt_paused_speech_task = None
await self._cancel_speech_pause(
old_task=self._cancel_speech_pause_task,
interrupt=False, # don't interrupt the paused speech, it's managed by _pause_scheduling_task
)
self._cancel_speech_pause_task = None

async def aclose(self) -> None:
# `aclose` must only be called by AgentSession
Expand Down Expand Up @@ -1371,8 +1375,8 @@ def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = No
# schedule a resume timer if interrupted after end_of_speech
self._start_false_interruption_timer(timeout)

self._interrupt_paused_speech_task = asyncio.create_task(
self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task)
self._cancel_speech_pause_task = asyncio.create_task(
self._cancel_speech_pause(old_task=self._cancel_speech_pause_task)
)

def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None:
Expand Down Expand Up @@ -1490,7 +1494,7 @@ async def _user_turn_completed_task(
extra={"user_input": info.new_transcript},
)
return
await self._interrupt_paused_speech(self._interrupt_paused_speech_task)
await self._cancel_speech_pause(self._cancel_speech_pause_task)

await current_speech.interrupt()

Expand Down Expand Up @@ -2079,20 +2083,16 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
)

current_span.set_attribute(trace_types.ATTR_SPEECH_INTERRUPTED, speech_handle.interrupted)
has_speech_message = False

# add the tools messages that triggers this reply to the chat context
if _previous_tools_messages:
self._agent._chat_ctx.insert(_previous_tools_messages)
self._session._tool_items_added(_previous_tools_messages)

forwarded_text = text_out.text if text_out else ""
if speech_handle.interrupted:
await utils.aio.cancel_and_wait(*tasks)
await text_tee.aclose()

forwarded_text = text_out.text if text_out else ""
if forwarded_text:
has_speech_message = True
# if the audio playout was enabled, clear the buffer
if audio_output is not None:
audio_output.clear_buffer()
Expand All @@ -2109,55 +2109,39 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
else:
forwarded_text = ""

if forwarded_text:
msg = chat_ctx.add_message(
role="assistant",
content=forwarded_text,
id=llm_gen_data.id,
interrupted=True,
created_at=reply_started_at,
metrics=assistant_metrics,
)
self._agent._chat_ctx.insert(msg)
self._session._conversation_item_added(msg)
speech_handle._item_added([msg])
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

if self._session.agent_state == "speaking":
self._session._update_agent_state("listening")

speech_handle._mark_generation_done()
await utils.aio.cancel_and_wait(exe_task)
return
Comment on lines -2113 to -2131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this some duplicated logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we have some duplicated code for interrupted and not interrupted. I merged them in this pr.


if read_transcript_from_tts and text_out and not text_out.text:
elif read_transcript_from_tts and text_out and not text_out.text:
logger.warning(
"`use_tts_aligned_transcript` is enabled but no agent transcript was returned from tts"
)

if text_out and text_out.text:
has_speech_message = True
if forwarded_text:
msg = chat_ctx.add_message(
role="assistant",
content=text_out.text,
content=forwarded_text,
id=llm_gen_data.id,
interrupted=False,
interrupted=speech_handle.interrupted,
created_at=reply_started_at,
metrics=assistant_metrics,
)
self._agent._chat_ctx.insert(msg)
self._session._conversation_item_added(msg)
speech_handle._item_added([msg])
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, text_out.text)
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

if len(tool_output.output) > 0:
if not speech_handle.interrupted and len(tool_output.output) > 0:
self._session._update_agent_state("thinking")
elif self._session.agent_state == "speaking":
self._session._update_agent_state("listening")

await text_tee.aclose()

speech_handle._mark_generation_done() # mark the playout done before waiting for the tool execution # noqa: E501

if speech_handle.interrupted:
await utils.aio.cancel_and_wait(exe_task)
return
Comment on lines +2140 to +2142
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a guard for cancellation https://github.com/livekit/agents/blob/livekit-agents@1.3.12/livekit-agents/livekit/agents/voice/generation.py#L648-L658, we will cancel the tool execution task but not the user's function


# wait for the tool execution to complete
self._background_speeches.add(speech_handle)
try:
await exe_task
Expand Down Expand Up @@ -2229,7 +2213,7 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
),
# in case the current reply only generated tools (no speech), re-use the current user_metrics for the next
# tool response generation
_previous_user_metrics=user_metrics if not has_speech_message else None,
_previous_user_metrics=user_metrics if not forwarded_text else None,
_previous_tools_messages=tool_messages,
),
speech_handle=speech_handle,
Expand Down Expand Up @@ -2580,83 +2564,66 @@ def _create_assistant_message(
msg.metrics = assistant_metrics
return msg

msg_gen, text_out, audio_out = (
message_outputs[0] if len(message_outputs) > 0 else (None, None, None)
) # there should be only one message

forwarded_text = text_out.text if text_out else ""
if speech_handle.interrupted:
await utils.aio.cancel_and_wait(*tasks)

if len(message_outputs) > 0:
# there should be only one message
msg_gen, text_out, audio_out = message_outputs[0]
forwarded_text = text_out.text if text_out else ""
if audio_output is not None:
audio_output.clear_buffer()
if msg_gen and audio_output is not None:
audio_output.clear_buffer()

playback_ev = await audio_output.wait_for_playout()
playback_position = playback_ev.playback_position
if (
audio_out is not None
and audio_out.first_frame_fut.done()
and not audio_out.first_frame_fut.cancelled()
):
# playback_ev is valid only if the first frame was already played
if playback_ev.synchronized_transcript is not None:
forwarded_text = playback_ev.synchronized_transcript
else:
forwarded_text = ""
playback_position = 0

# truncate server-side message (if supported)
if self.llm.capabilities.message_truncation:
msg_modalities = await msg_gen.modalities
self._rt_session.truncate(
message_id=msg_gen.message_id,
modalities=msg_modalities,
audio_end_ms=int(playback_position * 1000),
audio_transcript=forwarded_text,
)
playback_ev = await audio_output.wait_for_playout()
playback_position = playback_ev.playback_position
if (
audio_out is not None
and audio_out.first_frame_fut.done()
and not audio_out.first_frame_fut.cancelled()
):
# playback_ev is valid only if the first frame was already played
if playback_ev.synchronized_transcript is not None:
forwarded_text = playback_ev.synchronized_transcript
else:
forwarded_text = ""
playback_position = 0

msg: llm.ChatMessage | None = None
if forwarded_text:
msg = _create_assistant_message(
# truncate server-side message (if supported)
if self.llm.capabilities.message_truncation:
msg_modalities = await msg_gen.modalities
self._rt_session.truncate(
message_id=msg_gen.message_id,
forwarded_text=forwarded_text,
interrupted=True,
modalities=msg_modalities,
audio_end_ms=int(playback_position * 1000),
audio_transcript=forwarded_text,
)
self._agent._chat_ctx.items.append(msg)
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

speech_handle._mark_generation_done()
await utils.aio.cancel_and_wait(exe_task)

for tee in tees:
await tee.aclose()
return

if len(message_outputs) > 0:
# there should be only one message
msg_gen, text_out, _ = message_outputs[0]
forwarded_text = text_out.text if text_out else ""
if forwarded_text:
msg = _create_assistant_message(
message_id=msg_gen.message_id,
forwarded_text=forwarded_text,
interrupted=False,
)
self._agent._chat_ctx.items.append(msg)
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)
elif read_transcript_from_tts and text_out and not text_out.text:
logger.warning(
"`use_tts_aligned_transcript` is enabled but no agent transcript was returned from tts"
)

elif read_transcript_from_tts and text_out is not None:
logger.warning(
"`use_tts_aligned_transcript` is enabled but no agent transcript was returned from tts"
)
if msg_gen and forwarded_text:
msg = _create_assistant_message(
message_id=msg_gen.message_id,
forwarded_text=forwarded_text,
interrupted=speech_handle.interrupted,
)
self._agent._chat_ctx.items.append(msg)
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

for tee in tees:
await tee.aclose()
speech_handle._mark_generation_done()

speech_handle._mark_generation_done() # mark the playout done before waiting for the tool execution # noqa: E501
if speech_handle.interrupted:
await utils.aio.cancel_and_wait(exe_task)
return

# wait for the tool execution to complete
tool_output.first_tool_started_fut.add_done_callback(
lambda _: self._session._update_agent_state("thinking")
)
Expand Down Expand Up @@ -2806,7 +2773,9 @@ def _on_false_interruption() -> None:
timeout, _on_false_interruption
)

async def _interrupt_paused_speech(self, old_task: asyncio.Task[None] | None = None) -> None:
async def _cancel_speech_pause(
self, old_task: asyncio.Task[None] | None = None, *, interrupt: bool = True
) -> None:
if old_task is not None:
await old_task

Expand All @@ -2817,8 +2786,14 @@ async def _interrupt_paused_speech(self, old_task: asyncio.Task[None] | None = N
if not self._paused_speech:
return

if not self._paused_speech.interrupted and self._paused_speech.allow_interruptions:
await self._paused_speech.interrupt() # ensure the speech is done
if (
interrupt
and not self._paused_speech.interrupted
and self._paused_speech.allow_interruptions
):
self._paused_speech.interrupt()
# ensure the generation is done
await self._paused_speech._wait_for_generation()
self._paused_speech = None

if self._session.options.resume_false_interruption and self._session.output.audio:
Comment on lines 2797 to 2799

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Paused speech state cleared prematurely when allow_interruptions is False

When _cancel_speech_pause is called while an AgentTask has temporarily disabled interruptions (by setting speech_handle.allow_interruptions = False), the method still clears _paused_speech = None and calls resume() even though it skipped the interrupt logic.

Click to expand

Scenario

  1. Speech is playing with allow_interruptions=True
  2. User speaks, triggering _interrupt_by_audio_activity which pauses the audio and sets _paused_speech = self._current_speech
  3. Tool execution calls await AgentTask(), which sets speech_handle.allow_interruptions = False (line 769 in agent.py)
  4. User's final transcript triggers on_final_transcript which creates a task to call _cancel_speech_pause
  5. In _cancel_speech_pause, the condition at line 2789-2792 evaluates to False because allow_interruptions is now False
  6. The interrupt block is skipped, but _paused_speech = None is still executed (line 2797)
  7. Audio is resumed if resume_false_interruption option is set (line 2799-2800)

Impact

The paused speech reference is cleared prematurely while an AgentTask is running. When the AgentTask completes and restores allow_interruptions, the false interruption handling state has already been cleared. This could cause:

  • Inconsistent state tracking where _paused_speech is None but the speech wasn't properly interrupted
  • The false interruption detection logic won't work correctly after AgentTask completes
  • Audio might resume unexpectedly during AgentTask execution

(Refers to lines 2797-2800)

Recommendation: Consider not clearing _paused_speech and not calling resume() when the speech's allow_interruptions is False due to an AgentTask lock. The cleanup should happen either when the speech is successfully interrupted or when the AgentTask completes and the original interruption handling can proceed.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down
5 changes: 4 additions & 1 deletion livekit-agents/livekit/agents/voice/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,10 @@ async def _traceable_fnc_tool(
# TODO(theomonnom): Add the agent handoff inside the current_span
_tool_completed(output)

task = asyncio.create_task(_traceable_fnc_tool(function_callable, fnc_call))
task = asyncio.create_task(
_traceable_fnc_tool(function_callable, fnc_call),
name=f"func_exec_{fnc_call.name}", # task name is used for logging when the task is cancelled
)
_set_activity_task_info(
task, speech_handle=speech_handle, function_call=fnc_call, inline_task=True
)
Expand Down