Skip to content

Commit

Permalink
Converge stream av open methods, options, and error handling (home-as…
Browse files Browse the repository at this point in the history
…sistant#134020)

* Converge stream av open methods, options, and error handling

* Remove exception that is never thrown

* Update exceptions thrown in generic tests

* Increase stream test coverage
  • Loading branch information
allenporter authored Dec 28, 2024
1 parent 07ae9b1 commit 6edf06f
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 206 deletions.
6 changes: 0 additions & 6 deletions homeassistant/components/generic/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ async def async_test_and_preview_stream(
"""
if not (stream_source := info.get(CONF_STREAM_SOURCE)):
return None
# Import from stream.worker as stream cannot reexport from worker
# without forcing the av dependency on default_config
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.stream.worker import StreamWorkerError

if not isinstance(stream_source, template_helper.Template):
stream_source = template_helper.Template(stream_source, hass)
Expand Down Expand Up @@ -294,8 +290,6 @@ async def async_test_and_preview_stream(
f"{DOMAIN}.test_stream",
)
hls_provider = stream.add_provider(HLS_PROVIDER)
except StreamWorkerError as err:
raise InvalidStreamException("unknown_with_details", str(err)) from err
except PermissionError as err:
raise InvalidStreamException("stream_not_permitted") from err
except OSError as err:
Expand Down
178 changes: 54 additions & 124 deletions homeassistant/components/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import asyncio
from collections.abc import Callable, Mapping
import copy
from enum import IntEnum
import logging
import secrets
import threading
Expand All @@ -41,12 +40,12 @@

from .const import (
ATTR_ENDPOINTS,
ATTR_PREFER_TCP,
ATTR_SETTINGS,
ATTR_STREAMS,
CONF_EXTRA_PART_WAIT_TIME,
CONF_LL_HLS,
CONF_PART_DURATION,
CONF_PREFER_TCP,
CONF_RTSP_TRANSPORT,
CONF_SEGMENT_DURATION,
CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
Expand All @@ -62,6 +61,7 @@
SOURCE_TIMEOUT,
STREAM_RESTART_INCREMENT,
STREAM_RESTART_RESET_TIME,
StreamClientError,
)
from .core import (
PROVIDERS,
Expand All @@ -73,11 +73,10 @@
StreamSettings,
)
from .diagnostics import Diagnostics
from .exceptions import StreamOpenClientError, StreamWorkerError
from .hls import HlsStreamOutput, async_setup_hls

if TYPE_CHECKING:
from av.container import InputContainer, OutputContainer

from homeassistant.components.camera import DynamicStreamSettings

__all__ = [
Expand All @@ -92,98 +91,15 @@
"RTSP_TRANSPORTS",
"SOURCE_TIMEOUT",
"Stream",
"StreamClientError",
"StreamOpenClientError",
"create_stream",
"Orientation",
]

_LOGGER = logging.getLogger(__name__)


class StreamClientError(IntEnum):
"""Enum for stream client errors."""

BadRequest = 400
Unauthorized = 401
Forbidden = 403
NotFound = 404
Other = 4


class StreamOpenClientError(HomeAssistantError):
"""Raised when client error received when trying to open a stream.
:param stream_client_error: The type of client error
"""

def __init__(
self, *args: Any, stream_client_error: StreamClientError, **kwargs: Any
) -> None:
self.stream_client_error = stream_client_error
super().__init__(*args, **kwargs)


async def _async_try_open_stream(
hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None
) -> InputContainer | OutputContainer:
"""Try to open a stream.
Will raise StreamOpenClientError if an http client error is encountered.
"""
return await hass.loop.run_in_executor(None, _try_open_stream, source, pyav_options)


def _try_open_stream(
source: str, pyav_options: dict[str, str] | None = None
) -> InputContainer | OutputContainer:
"""Try to open a stream.
Will raise StreamOpenClientError if an http client error is encountered.
"""
import av # pylint: disable=import-outside-toplevel

if pyav_options is None:
pyav_options = {}

default_pyav_options = {
"rtsp_flags": CONF_PREFER_TCP,
"timeout": str(SOURCE_TIMEOUT),
}

pyav_options = {
**default_pyav_options,
**pyav_options,
}

try:
container = av.open(source, options=pyav_options, timeout=5)

except av.HTTPBadRequestError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.BadRequest
) from ex

except av.HTTPUnauthorizedError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.Unauthorized
) from ex

except av.HTTPForbiddenError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.Forbidden
) from ex

except av.HTTPNotFoundError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.NotFound
) from ex

except av.HTTPOtherClientError as ex:
raise StreamOpenClientError(stream_client_error=StreamClientError.Other) from ex

else:
return container


async def async_check_stream_client_error(
hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None
) -> None:
Expand All @@ -192,18 +108,24 @@ async def async_check_stream_client_error(
Raise StreamOpenClientError if an http client error is encountered.
"""
await hass.loop.run_in_executor(
None, _check_stream_client_error, source, pyav_options
None, _check_stream_client_error, hass, source, pyav_options
)


def _check_stream_client_error(
source: str, pyav_options: dict[str, str] | None = None
hass: HomeAssistant, source: str, options: dict[str, str] | None = None
) -> None:
"""Check if a stream can be successfully opened.
Raise StreamOpenClientError if an http client error is encountered.
"""
_try_open_stream(source, pyav_options).close()
from .worker import try_open_stream # pylint: disable=import-outside-toplevel

pyav_options, _ = _convert_stream_options(hass, source, options or {})
try:
try_open_stream(source, pyav_options).close()
except StreamWorkerError as err:
raise StreamOpenClientError(str(err), err.error_code) from err


def redact_credentials(url: str) -> str:
Expand All @@ -219,6 +141,42 @@ def redact_credentials(url: str) -> str:
return str(yurl.update_query(redacted_query_params))


def _convert_stream_options(
hass: HomeAssistant,
stream_source: str,
stream_options: Mapping[str, str | bool | float],
) -> tuple[dict[str, str], StreamSettings]:
"""Convert options from stream options into PyAV options and stream settings."""
if DOMAIN not in hass.data:
raise HomeAssistantError("Stream integration is not set up.")

stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError(f"Invalid stream options: {exc}") from exc

if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
stream_settings.hls_part_timeout += extra_wait_time
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
# The PyAV options currently match the stream CONF constants, but this
# will not necessarily always be the case, so they are hard coded here
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"

# For RTSP streams, prefer TCP
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
pyav_options = {
"rtsp_flags": ATTR_PREFER_TCP,
"stimeout": "5000000",
**pyav_options,
}
return pyav_options, stream_settings


def create_stream(
hass: HomeAssistant,
stream_source: str,
Expand All @@ -234,41 +192,13 @@ def create_stream(
The stream_label is a string used as an additional message in logging.
"""

def convert_stream_options(
hass: HomeAssistant, stream_options: Mapping[str, str | bool | float]
) -> tuple[dict[str, str], StreamSettings]:
"""Convert options from stream options into PyAV options and stream settings."""
stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError("Invalid stream options") from exc

if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
stream_settings.hls_part_timeout += extra_wait_time
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
# The PyAV options currently match the stream CONF constants, but this
# will not necessarily always be the case, so they are hard coded here
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"

return pyav_options, stream_settings

if DOMAIN not in hass.config.components:
raise HomeAssistantError("Stream integration is not set up.")

# Convert extra stream options into PyAV options and stream settings
pyav_options, stream_settings = convert_stream_options(hass, options)
# For RTSP streams, prefer TCP
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
pyav_options = {
"rtsp_flags": "prefer_tcp",
"stimeout": "5000000",
**pyav_options,
}
pyav_options, stream_settings = _convert_stream_options(
hass, stream_source, options
)

stream = Stream(
hass,
Expand Down Expand Up @@ -531,7 +461,7 @@ def _run_worker(self) -> None:
"""Handle consuming streams and restart keepalive streams."""
# Keep import here so that we can import stream integration without installing reqs
# pylint: disable-next=import-outside-toplevel
from .worker import StreamState, StreamWorkerError, stream_worker
from .worker import StreamState, stream_worker

stream_state = StreamState(self.hass, self.outputs, self._diagnostics)
wait_timeout = 0
Expand Down
18 changes: 17 additions & 1 deletion homeassistant/components/stream/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from enum import IntEnum
from typing import Final

DOMAIN = "stream"
Expand Down Expand Up @@ -48,7 +49,7 @@
CONF_PART_DURATION = "part_duration"
CONF_SEGMENT_DURATION = "segment_duration"

CONF_PREFER_TCP = "prefer_tcp"
ATTR_PREFER_TCP = "prefer_tcp"
CONF_RTSP_TRANSPORT = "rtsp_transport"
# The first dict entry below may be used as the default when populating options
RTSP_TRANSPORTS = {
Expand All @@ -59,3 +60,18 @@
}
CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps"
CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time"


class StreamClientError(IntEnum):
"""Enum for stream client errors.
These are errors that can be returned by the stream client when trying to
open a stream. The caller should not interpret the int values directly, but
should use the enum values instead.
"""

BadRequest = 400
Unauthorized = 401
Forbidden = 403
NotFound = 404
Other = 4
32 changes: 32 additions & 0 deletions homeassistant/components/stream/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Stream component exceptions."""

from homeassistant.exceptions import HomeAssistantError

from .const import StreamClientError


class StreamOpenClientError(HomeAssistantError):
"""Raised when client error received when trying to open a stream.
:param stream_client_error: The type of client error
"""

def __init__(self, message: str, error_code: StreamClientError) -> None:
"""Initialize a stream open client error."""
super().__init__(message)
self.error_code = error_code


class StreamWorkerError(Exception):
"""An exception thrown while processing a stream."""

def __init__(
self, message: str, error_code: StreamClientError = StreamClientError.Other
) -> None:
"""Initialize a stream worker error."""
super().__init__(message)
self.error_code = error_code


class StreamEndedError(StreamWorkerError):
"""Raised when the stream is complete, exposed for facilitating testing."""
Loading

0 comments on commit 6edf06f

Please sign in to comment.