Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VoIP audio queue #91577

Merged
merged 2 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions homeassistant/components/assist_pipeline/vad.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class VoiceCommandSegmenter:
reset_seconds: float = 1.0
"""Seconds before reset start/stop time counters."""

_in_command: bool = False
in_command: bool = False
"""True if inside voice command."""

_speech_seconds_left: float = 0.0
Expand Down Expand Up @@ -62,7 +62,7 @@ def reset(self) -> None:
self._silence_seconds_left = self.silence_seconds
self._timeout_seconds_left = self.timeout_seconds
self._reset_seconds_left = self.reset_seconds
self._in_command = False
self.in_command = False

def process(self, samples: bytes) -> bool:
"""Process a 16-bit 16Khz mono audio samples.
Expand Down Expand Up @@ -101,13 +101,13 @@ def _process_chunk(self, chunk: bytes) -> bool:
if self._timeout_seconds_left <= 0:
return False

if not self._in_command:
if not self.in_command:
if is_speech:
self._reset_seconds_left = self.reset_seconds
self._speech_seconds_left -= self._seconds_per_chunk
if self._speech_seconds_left <= 0:
# Inside voice command
self._in_command = True
self.in_command = True
else:
# Reset if enough silence
self._reset_seconds_left -= self._seconds_per_chunk
Expand Down
59 changes: 41 additions & 18 deletions homeassistant/components/voip/voip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from __future__ import annotations

import asyncio
from collections import deque
from collections.abc import AsyncIterable
import logging
import time
from typing import TYPE_CHECKING
Expand All @@ -26,6 +28,7 @@
if TYPE_CHECKING:
from .devices import VoIPDevice, VoIPDevices

_BUFFERED_CHUNKS_BEFORE_SPEECH = 100 # ~2 seconds
_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -95,9 +98,7 @@ def connection_lost(self, exc):
def on_chunk(self, audio_bytes: bytes) -> None:
"""Handle raw audio chunk."""
if self._pipeline_task is None:
# Clear audio queue
while not self._audio_queue.empty():
self._audio_queue.get_nowait()
self._clear_audio_queue()

# Run pipeline until voice command finishes, then start over
self._pipeline_task = self.hass.async_create_background_task(
Expand All @@ -114,30 +115,18 @@ async def _run_pipeline(
_LOGGER.debug("Starting pipeline")

async def stt_stream():
segmenter = VoiceCommandSegmenter()

try:
# Timeout if no audio comes in for a while.
# This means the caller hung up.
async with async_timeout.timeout(self.audio_timeout):
chunk = await self._audio_queue.get()

while chunk:
if not segmenter.process(chunk):
# Voice command is finished
break

async for chunk in self._segment_audio():
yield chunk

async with async_timeout.timeout(self.audio_timeout):
chunk = await self._audio_queue.get()
except asyncio.TimeoutError:
# Expected after caller hangs up
_LOGGER.debug("Audio timeout")

if self.transport is not None:
self.transport.close()
self.transport = None
finally:
self._clear_audio_queue()

try:
# Run pipeline with a timeout
Expand Down Expand Up @@ -172,6 +161,40 @@ async def stt_stream():
# Allow pipeline to run again
self._pipeline_task = None

async def _segment_audio(self) -> AsyncIterable[bytes]:
segmenter = VoiceCommandSegmenter()
chunk_buffer: deque[bytes] = deque(maxlen=_BUFFERED_CHUNKS_BEFORE_SPEECH)

# Timeout if no audio comes in for a while.
# This means the caller hung up.
async with async_timeout.timeout(self.audio_timeout):
chunk = await self._audio_queue.get()

while chunk:
if not segmenter.process(chunk):
# Voice command is finished
break

if segmenter.in_command:
if chunk_buffer:
# Release audio in buffer first
for buffered_chunk in chunk_buffer:
yield buffered_chunk

chunk_buffer.clear()

yield chunk
else:
# Buffer until command starts
chunk_buffer.append(chunk)

async with async_timeout.timeout(self.audio_timeout):
chunk = await self._audio_queue.get()

def _clear_audio_queue(self) -> None:
while not self._audio_queue.empty():
self._audio_queue.get_nowait()

def _event_callback(self, event: PipelineEvent):
if not event.data:
return
Expand Down