Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converge stream av open methods, options, and error handling #134020

Merged
merged 4 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions homeassistant/components/generic/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ async def async_test_and_preview_stream(
"""
if not (stream_source := info.get(CONF_STREAM_SOURCE)):
return None
# Import from stream.worker as stream cannot reexport from worker
# without forcing the av dependency on default_config
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.stream.worker import StreamWorkerError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is removing rather than moving to new import location. This can never be thrown here since its caught by Stream and never exposed. We can fix exception handling here by exposing the error codes in a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allenporter
Ok this makes sense.
It looks like this change is what has lowered the coverage in #133927

Lines like

                    if err.details:
                        errors["error_details"] = err.details

...no longer have any way of being triggered.
I will add a test to raise StreamOpenClientError within the generic config flow to restore the test coverage.

Copy link
Contributor Author

@allenporter allenporter Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sorry for missing that -- btw there is still not yet a way that StreamOpenClientError is thrown here, but need to add in a follow up.


if not isinstance(stream_source, template_helper.Template):
stream_source = template_helper.Template(stream_source, hass)
Expand Down Expand Up @@ -294,8 +290,6 @@ async def async_test_and_preview_stream(
f"{DOMAIN}.test_stream",
)
hls_provider = stream.add_provider(HLS_PROVIDER)
except StreamWorkerError as err:
raise InvalidStreamException("unknown_with_details", str(err)) from err
except PermissionError as err:
raise InvalidStreamException("stream_not_permitted") from err
except OSError as err:
Expand Down
178 changes: 54 additions & 124 deletions homeassistant/components/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import asyncio
from collections.abc import Callable, Mapping
import copy
from enum import IntEnum
import logging
import secrets
import threading
Expand All @@ -41,12 +40,12 @@

from .const import (
ATTR_ENDPOINTS,
ATTR_PREFER_TCP,
ATTR_SETTINGS,
ATTR_STREAMS,
CONF_EXTRA_PART_WAIT_TIME,
CONF_LL_HLS,
CONF_PART_DURATION,
CONF_PREFER_TCP,
CONF_RTSP_TRANSPORT,
CONF_SEGMENT_DURATION,
CONF_USE_WALLCLOCK_AS_TIMESTAMPS,
Expand All @@ -62,6 +61,7 @@
SOURCE_TIMEOUT,
STREAM_RESTART_INCREMENT,
STREAM_RESTART_RESET_TIME,
StreamClientError,
)
from .core import (
PROVIDERS,
Expand All @@ -73,11 +73,10 @@
StreamSettings,
)
from .diagnostics import Diagnostics
from .exceptions import StreamOpenClientError, StreamWorkerError
from .hls import HlsStreamOutput, async_setup_hls

if TYPE_CHECKING:
from av.container import InputContainer, OutputContainer

from homeassistant.components.camera import DynamicStreamSettings

__all__ = [
Expand All @@ -92,98 +91,15 @@
"RTSP_TRANSPORTS",
"SOURCE_TIMEOUT",
"Stream",
"StreamClientError",
"StreamOpenClientError",
"create_stream",
"Orientation",
]

_LOGGER = logging.getLogger(__name__)


class StreamClientError(IntEnum):
"""Enum for stream client errors."""

BadRequest = 400
Unauthorized = 401
Forbidden = 403
NotFound = 404
Other = 4


class StreamOpenClientError(HomeAssistantError):
"""Raised when client error received when trying to open a stream.

:param stream_client_error: The type of client error
"""

def __init__(
self, *args: Any, stream_client_error: StreamClientError, **kwargs: Any
) -> None:
self.stream_client_error = stream_client_error
super().__init__(*args, **kwargs)


async def _async_try_open_stream(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed since it is not used anywhere.

hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None
) -> InputContainer | OutputContainer:
"""Try to open a stream.

Will raise StreamOpenClientError if an http client error is encountered.
"""
return await hass.loop.run_in_executor(None, _try_open_stream, source, pyav_options)


def _try_open_stream(
source: str, pyav_options: dict[str, str] | None = None
) -> InputContainer | OutputContainer:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changed to only be InputContainer when moving since we're only using one of the open constructors for input.

"""Try to open a stream.

Will raise StreamOpenClientError if an http client error is encountered.
"""
import av # pylint: disable=import-outside-toplevel

if pyav_options is None:
pyav_options = {}

default_pyav_options = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These options are slightly different than the defaults used by the stream component. These have been removed and converted with the existing default.

"rtsp_flags": CONF_PREFER_TCP,
"timeout": str(SOURCE_TIMEOUT),
}

pyav_options = {
**default_pyav_options,
**pyav_options,
}

try:
container = av.open(source, options=pyav_options, timeout=5)

except av.HTTPBadRequestError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.BadRequest
) from ex

except av.HTTPUnauthorizedError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.Unauthorized
) from ex

except av.HTTPForbiddenError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.Forbidden
) from ex

except av.HTTPNotFoundError as ex:
raise StreamOpenClientError(
stream_client_error=StreamClientError.NotFound
) from ex

except av.HTTPOtherClientError as ex:
raise StreamOpenClientError(stream_client_error=StreamClientError.Other) from ex

else:
return container


async def async_check_stream_client_error(
hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None
) -> None:
Expand All @@ -192,18 +108,24 @@ async def async_check_stream_client_error(
Raise StreamOpenClientError if an http client error is encountered.
"""
await hass.loop.run_in_executor(
None, _check_stream_client_error, source, pyav_options
None, _check_stream_client_error, hass, source, pyav_options
)


def _check_stream_client_error(
source: str, pyav_options: dict[str, str] | None = None
hass: HomeAssistant, source: str, options: dict[str, str] | None = None
) -> None:
"""Check if a stream can be successfully opened.

Raise StreamOpenClientError if an http client error is encountered.
"""
_try_open_stream(source, pyav_options).close()
from .worker import try_open_stream # pylint: disable=import-outside-toplevel

pyav_options, _ = _convert_stream_options(hass, source, options or {})
try:
try_open_stream(source, pyav_options).close()
except StreamWorkerError as err:
raise StreamOpenClientError(str(err), err.error_code) from err


def redact_credentials(url: str) -> str:
Expand All @@ -219,6 +141,42 @@ def redact_credentials(url: str) -> str:
return str(yurl.update_query(redacted_query_params))


def _convert_stream_options(
hass: HomeAssistant,
stream_source: str,
stream_options: Mapping[str, str | bool | float],
) -> tuple[dict[str, str], StreamSettings]:
"""Convert options from stream options into PyAV options and stream settings."""
if DOMAIN not in hass.data:
raise HomeAssistantError("Stream integration is not set up.")
allenporter marked this conversation as resolved.
Show resolved Hide resolved

stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError("Invalid stream options") from exc

if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
stream_settings.hls_part_timeout += extra_wait_time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add coverage for this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is easy to get coverage for, but hard to test since stream settings are consumed deeper in HLS. I'm skipping this for now.

if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
# The PyAV options currently match the stream CONF constants, but this
# will not necessarily always be the case, so they are hard coded here
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"
allenporter marked this conversation as resolved.
Show resolved Hide resolved

# For RTSP streams, prefer TCP
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
pyav_options = {
"rtsp_flags": ATTR_PREFER_TCP,
"stimeout": "5000000",
**pyav_options,
}
return pyav_options, stream_settings


def create_stream(
hass: HomeAssistant,
stream_source: str,
Expand All @@ -234,41 +192,13 @@ def create_stream(
The stream_label is a string used as an additional message in logging.
"""

def convert_stream_options(
hass: HomeAssistant, stream_options: Mapping[str, str | bool | float]
) -> tuple[dict[str, str], StreamSettings]:
"""Convert options from stream options into PyAV options and stream settings."""
stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError("Invalid stream options") from exc

if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
stream_settings.hls_part_timeout += extra_wait_time
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
# The PyAV options currently match the stream CONF constants, but this
# will not necessarily always be the case, so they are hard coded here
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"

return pyav_options, stream_settings

if DOMAIN not in hass.config.components:
raise HomeAssistantError("Stream integration is not set up.")

# Convert extra stream options into PyAV options and stream settings
pyav_options, stream_settings = convert_stream_options(hass, options)
# For RTSP streams, prefer TCP
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
pyav_options = {
"rtsp_flags": "prefer_tcp",
"stimeout": "5000000",
**pyav_options,
}
pyav_options, stream_settings = _convert_stream_options(
hass, stream_source, options
)

stream = Stream(
hass,
Expand Down Expand Up @@ -531,7 +461,7 @@ def _run_worker(self) -> None:
"""Handle consuming streams and restart keepalive streams."""
# Keep import here so that we can import stream integration without installing reqs
# pylint: disable-next=import-outside-toplevel
from .worker import StreamState, StreamWorkerError, stream_worker
from .worker import StreamState, stream_worker

stream_state = StreamState(self.hass, self.outputs, self._diagnostics)
wait_timeout = 0
Expand Down
18 changes: 17 additions & 1 deletion homeassistant/components/stream/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from enum import IntEnum
from typing import Final

DOMAIN = "stream"
Expand Down Expand Up @@ -48,7 +49,7 @@
CONF_PART_DURATION = "part_duration"
CONF_SEGMENT_DURATION = "segment_duration"

CONF_PREFER_TCP = "prefer_tcp"
ATTR_PREFER_TCP = "prefer_tcp"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaming this since its not a configuration option. This is not exposed in __all__ today.

CONF_RTSP_TRANSPORT = "rtsp_transport"
# The first dict entry below may be used as the default when populating options
RTSP_TRANSPORTS = {
Expand All @@ -59,3 +60,18 @@
}
CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps"
CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time"


class StreamClientError(IntEnum):
"""Enum for stream client errors.

These are errors that can be returned by the stream client when trying to
open a stream. The caller should not interpret the int values directly, but
should use the enum values instead.
"""

BadRequest = 400
Unauthorized = 401
Forbidden = 403
NotFound = 404
Other = 4
32 changes: 32 additions & 0 deletions homeassistant/components/stream/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Stream component exceptions."""

from homeassistant.exceptions import HomeAssistantError

from .const import StreamClientError


class StreamOpenClientError(HomeAssistantError):
"""Raised when client error received when trying to open a stream.

:param stream_client_error: The type of client error
"""

def __init__(self, message: str, error_code: StreamClientError) -> None:
"""Initialize a stream open client error."""
super().__init__(message)
self.error_code = error_code


class StreamWorkerError(Exception):
"""An exception thrown while processing a stream."""

def __init__(
self, message: str, error_code: StreamClientError = StreamClientError.Other
) -> None:
"""Initialize a stream worker error."""
super().__init__(message)
self.error_code = error_code


class StreamEndedError(StreamWorkerError):
"""Raised when the stream is complete, exposed for facilitating testing."""
Loading
Loading