Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ def generate_reply(
@abstractmethod
def commit_audio(self) -> None: ...

# commit the user turn to the server
@abstractmethod
def commit_user_turn(self) -> None: ...

# clear the input audio buffer to the server
@abstractmethod
def clear_audio(self) -> None: ...
Expand Down
27 changes: 20 additions & 7 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1015,14 +1015,21 @@ def clear_user_turn(self) -> None:
self._rt_session.clear_audio()

def commit_user_turn(self, *, transcript_timeout: float, stt_flush_duration: float) -> None:
skip_reply: bool = False
if self._rt_session is not None:
self._rt_session.commit_user_turn()
# commit audio buffer and trigger response generation
# `skip_reply` prevents duplicate reply from _on_user_turn_completed
# but keeps flushing STT transcript into the chat context
self._rt_session.commit_audio()
self._session.generate_reply()
skip_reply = True

assert self._audio_recognition is not None
self._audio_recognition.commit_user_turn(
audio_detached=not self._session.input.audio_enabled,
transcript_timeout=transcript_timeout,
stt_flush_duration=stt_flush_duration,
skip_reply=skip_reply,
)

def _schedule_speech(self, speech: SpeechHandle, priority: int, force: bool = False) -> None:
Expand Down Expand Up @@ -1476,11 +1483,23 @@ async def _user_turn_completed_task(
# interrupt all background speeches and wait for them to finish to update the chat context
await asyncio.gather(*self._interrupt_background_speeches(force=False))

user_message = llm.ChatMessage(
role="user",
content=[info.new_transcript],
transcript_confidence=info.transcript_confidence,
)

if isinstance(self.llm, llm.RealtimeModel):
if self.llm.capabilities.turn_detection:
return

if self._rt_session is not None:
if info.skip_reply:
if info.new_transcript != "":
# only add user message to chat context if reply should be skipped
self._agent._chat_ctx.items.append(user_message)
self._session._conversation_item_added(user_message)
return
self._rt_session.commit_audio()

if (current_speech := self._current_speech) is not None:
Expand All @@ -1497,12 +1516,6 @@ async def _user_turn_completed_task(
if self._rt_session is not None:
self._rt_session.interrupt()

user_message = llm.ChatMessage(
role="user",
content=[info.new_transcript],
transcript_confidence=info.transcript_confidence,
)

if self._scheduling_paused:
logger.warning(
"skipping on_user_turn_completed, speech scheduling is paused",
Expand Down
8 changes: 6 additions & 2 deletions livekit-agents/livekit/agents/voice/audio_recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

@dataclass
class _EndOfTurnInfo:
skip_reply: bool
"""If True, a reply was already triggered and should be skipped after end of turn detection."""
new_transcript: str
transcript_confidence: float

Expand Down Expand Up @@ -253,6 +255,7 @@ def commit_user_turn(
audio_detached: bool,
transcript_timeout: float,
stt_flush_duration: float = 2.0,
skip_reply: bool = False,
) -> None:
if not self._stt or self._closing.is_set():
return
Expand Down Expand Up @@ -313,7 +316,7 @@ async def _commit_user_turn() -> None:

self._audio_interim_transcript = ""
chat_ctx = self._hooks.retrieve_chat_ctx().copy()
self._run_eou_detection(chat_ctx)
self._run_eou_detection(chat_ctx, skip_reply=skip_reply)
self._user_turn_committed = True

if self._commit_user_turn_atask is not None:
Expand Down Expand Up @@ -516,7 +519,7 @@ async def _on_vad_event(self, ev: vad.VADEvent) -> None:
chat_ctx = self._hooks.retrieve_chat_ctx().copy()
self._run_eou_detection(chat_ctx)

def _run_eou_detection(self, chat_ctx: llm.ChatContext) -> None:
def _run_eou_detection(self, chat_ctx: llm.ChatContext, skip_reply: bool = False) -> None:
if self._stt and not self._audio_transcript and self._turn_detection_mode != "manual":
# stt enabled but no transcript yet
return
Expand Down Expand Up @@ -622,6 +625,7 @@ async def _bounce_eou_task(

committed = self._hooks.on_end_of_turn(
_EndOfTurnInfo(
skip_reply=skip_reply,
new_transcript=self._audio_transcript,
transcript_confidence=confidence_avg,
transcription_delay=transcription_delay or 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,9 +2005,6 @@ def commit_audio(self) -> None:
def clear_audio(self) -> None:
logger.warning("clear_audio is not supported by Nova Sonic's Realtime API")

def commit_user_turn(self) -> None:
logger.warning("commit_user_turn is not supported by Nova Sonic's Realtime API")

def push_video(self, frame: rtc.VideoFrame) -> None:
logger.warning("video is not supported by Nova Sonic's Realtime API")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,9 +1312,6 @@ def commit_audio(self) -> None:
def clear_audio(self) -> None:
logger.warning("clear_audio is not supported by Gemini Realtime API.")

def commit_user_turn(self) -> None:
logger.warning("commit_user_turn is not supported by Gemini Realtime API.")

def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]:
if self._input_resampler:
if frame.sample_rate != self._input_resampler._input_rate:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,23 +1311,6 @@ def clear_audio(self) -> None:
self.send_event(InputAudioBufferClearEvent(type="input_audio_buffer.clear"))
self._pushed_duration_s = 0

def commit_user_turn(self) -> None:
if self._realtime_model._opts.turn_detection is not None and (
self._realtime_model._opts.turn_detection.interrupt_response
or self._realtime_model._opts.turn_detection.create_response
):
logger.warning(
"commit_user_turn is triggered when auto response is enabled. Model behavior may be unexpected."
)

self.commit_audio()
self.send_event(
ResponseCreateEvent(
type="response.create",
response=RealtimeResponseCreateParams(),
)
)

def generate_reply(
self, *, instructions: NotGivenOr[str] = NOT_GIVEN
) -> asyncio.Future[llm.GenerationCreatedEvent]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,23 +1120,6 @@ def clear_audio(self) -> None:
self.send_event(InputAudioBufferClearEvent(type="input_audio_buffer.clear"))
self._pushed_duration_s = 0

def commit_user_turn(self) -> None:
if self._realtime_model._opts.turn_detection is not None and (
self._realtime_model._opts.turn_detection.interrupt_response
or self._realtime_model._opts.turn_detection.create_response
):
logger.warning(
"commit_user_turn is triggered when auto response is enabled. Model behavior may be unexpected."
)

self.commit_audio()
self.send_event(
ResponseCreateEvent(
type="response.create",
response=Response(),
)
)

def generate_reply(
self, *, instructions: NotGivenOr[str] = NOT_GIVEN
) -> asyncio.Future[llm.GenerationCreatedEvent]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,9 +1138,6 @@ def commit_audio(self) -> None:
def clear_audio(self) -> None:
logger.warning("clear audio is not supported by Ultravox.")

def commit_user_turn(self) -> None:
logger.warning("commit_user_turn is not supported by Ultravox.")

def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]:
"""Resample audio frame to the required sample rate."""
if self._input_resampler:
Expand Down