-
-
Notifications
You must be signed in to change notification settings - Fork 31.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Converge stream av open methods, options, and error handling #134020
Changes from 2 commits
e0cbd06
a6d30cd
2544397
2138e4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ | |
import asyncio | ||
from collections.abc import Callable, Mapping | ||
import copy | ||
from enum import IntEnum | ||
import logging | ||
import secrets | ||
import threading | ||
|
@@ -41,12 +40,12 @@ | |
|
||
from .const import ( | ||
ATTR_ENDPOINTS, | ||
ATTR_PREFER_TCP, | ||
ATTR_SETTINGS, | ||
ATTR_STREAMS, | ||
CONF_EXTRA_PART_WAIT_TIME, | ||
CONF_LL_HLS, | ||
CONF_PART_DURATION, | ||
CONF_PREFER_TCP, | ||
CONF_RTSP_TRANSPORT, | ||
CONF_SEGMENT_DURATION, | ||
CONF_USE_WALLCLOCK_AS_TIMESTAMPS, | ||
|
@@ -62,6 +61,7 @@ | |
SOURCE_TIMEOUT, | ||
STREAM_RESTART_INCREMENT, | ||
STREAM_RESTART_RESET_TIME, | ||
StreamClientError, | ||
) | ||
from .core import ( | ||
PROVIDERS, | ||
|
@@ -73,11 +73,10 @@ | |
StreamSettings, | ||
) | ||
from .diagnostics import Diagnostics | ||
from .exceptions import StreamOpenClientError, StreamWorkerError | ||
from .hls import HlsStreamOutput, async_setup_hls | ||
|
||
if TYPE_CHECKING: | ||
from av.container import InputContainer, OutputContainer | ||
|
||
from homeassistant.components.camera import DynamicStreamSettings | ||
|
||
__all__ = [ | ||
|
@@ -92,98 +91,15 @@ | |
"RTSP_TRANSPORTS", | ||
"SOURCE_TIMEOUT", | ||
"Stream", | ||
"StreamClientError", | ||
"StreamOpenClientError", | ||
"create_stream", | ||
"Orientation", | ||
] | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class StreamClientError(IntEnum): | ||
"""Enum for stream client errors.""" | ||
|
||
BadRequest = 400 | ||
Unauthorized = 401 | ||
Forbidden = 403 | ||
NotFound = 404 | ||
Other = 4 | ||
|
||
|
||
class StreamOpenClientError(HomeAssistantError): | ||
"""Raised when client error received when trying to open a stream. | ||
|
||
:param stream_client_error: The type of client error | ||
""" | ||
|
||
def __init__( | ||
self, *args: Any, stream_client_error: StreamClientError, **kwargs: Any | ||
) -> None: | ||
self.stream_client_error = stream_client_error | ||
super().__init__(*args, **kwargs) | ||
|
||
|
||
async def _async_try_open_stream( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is removed since it is not used anywhere. |
||
hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None | ||
) -> InputContainer | OutputContainer: | ||
"""Try to open a stream. | ||
|
||
Will raise StreamOpenClientError if an http client error is encountered. | ||
""" | ||
return await hass.loop.run_in_executor(None, _try_open_stream, source, pyav_options) | ||
|
||
|
||
def _try_open_stream( | ||
source: str, pyav_options: dict[str, str] | None = None | ||
) -> InputContainer | OutputContainer: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is changed to only be |
||
"""Try to open a stream. | ||
|
||
Will raise StreamOpenClientError if an http client error is encountered. | ||
""" | ||
import av # pylint: disable=import-outside-toplevel | ||
|
||
if pyav_options is None: | ||
pyav_options = {} | ||
|
||
default_pyav_options = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These options are slightly different than the defaults used by the stream component. These have been removed and converted with the existing default. |
||
"rtsp_flags": CONF_PREFER_TCP, | ||
"timeout": str(SOURCE_TIMEOUT), | ||
} | ||
|
||
pyav_options = { | ||
**default_pyav_options, | ||
**pyav_options, | ||
} | ||
|
||
try: | ||
container = av.open(source, options=pyav_options, timeout=5) | ||
|
||
except av.HTTPBadRequestError as ex: | ||
raise StreamOpenClientError( | ||
stream_client_error=StreamClientError.BadRequest | ||
) from ex | ||
|
||
except av.HTTPUnauthorizedError as ex: | ||
raise StreamOpenClientError( | ||
stream_client_error=StreamClientError.Unauthorized | ||
) from ex | ||
|
||
except av.HTTPForbiddenError as ex: | ||
raise StreamOpenClientError( | ||
stream_client_error=StreamClientError.Forbidden | ||
) from ex | ||
|
||
except av.HTTPNotFoundError as ex: | ||
raise StreamOpenClientError( | ||
stream_client_error=StreamClientError.NotFound | ||
) from ex | ||
|
||
except av.HTTPOtherClientError as ex: | ||
raise StreamOpenClientError(stream_client_error=StreamClientError.Other) from ex | ||
|
||
else: | ||
return container | ||
|
||
|
||
async def async_check_stream_client_error( | ||
hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None | ||
) -> None: | ||
|
@@ -192,18 +108,24 @@ async def async_check_stream_client_error( | |
Raise StreamOpenClientError if an http client error is encountered. | ||
""" | ||
await hass.loop.run_in_executor( | ||
None, _check_stream_client_error, source, pyav_options | ||
None, _check_stream_client_error, hass, source, pyav_options | ||
) | ||
|
||
|
||
def _check_stream_client_error( | ||
source: str, pyav_options: dict[str, str] | None = None | ||
hass: HomeAssistant, source: str, options: dict[str, str] | None = None | ||
) -> None: | ||
"""Check if a stream can be successfully opened. | ||
|
||
Raise StreamOpenClientError if an http client error is encountered. | ||
""" | ||
_try_open_stream(source, pyav_options).close() | ||
from .worker import try_open_stream # pylint: disable=import-outside-toplevel | ||
|
||
pyav_options, _ = _convert_stream_options(hass, source, options or {}) | ||
try: | ||
try_open_stream(source, pyav_options).close() | ||
except StreamWorkerError as err: | ||
raise StreamOpenClientError(str(err), err.error_code) from err | ||
|
||
|
||
def redact_credentials(url: str) -> str: | ||
|
@@ -219,6 +141,42 @@ def redact_credentials(url: str) -> str: | |
return str(yurl.update_query(redacted_query_params)) | ||
|
||
|
||
def _convert_stream_options( | ||
hass: HomeAssistant, | ||
stream_source: str, | ||
stream_options: Mapping[str, str | bool | float], | ||
) -> tuple[dict[str, str], StreamSettings]: | ||
"""Convert options from stream options into PyAV options and stream settings.""" | ||
if DOMAIN not in hass.data: | ||
raise HomeAssistantError("Stream integration is not set up.") | ||
allenporter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS]) | ||
pyav_options: dict[str, str] = {} | ||
try: | ||
STREAM_OPTIONS_SCHEMA(stream_options) | ||
except vol.Invalid as exc: | ||
raise HomeAssistantError("Invalid stream options") from exc | ||
|
||
if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME): | ||
stream_settings.hls_part_timeout += extra_wait_time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to add coverage for this line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is easy to get coverage for, but hard to test since stream settings are consumed deeper in HLS. I'm skipping this for now. |
||
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): | ||
assert isinstance(rtsp_transport, str) | ||
# The PyAV options currently match the stream CONF constants, but this | ||
# will not necessarily always be the case, so they are hard coded here | ||
pyav_options["rtsp_transport"] = rtsp_transport | ||
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): | ||
pyav_options["use_wallclock_as_timestamps"] = "1" | ||
allenporter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# For RTSP streams, prefer TCP | ||
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://": | ||
pyav_options = { | ||
"rtsp_flags": ATTR_PREFER_TCP, | ||
"stimeout": "5000000", | ||
**pyav_options, | ||
} | ||
return pyav_options, stream_settings | ||
|
||
|
||
def create_stream( | ||
hass: HomeAssistant, | ||
stream_source: str, | ||
|
@@ -234,41 +192,13 @@ def create_stream( | |
The stream_label is a string used as an additional message in logging. | ||
""" | ||
|
||
def convert_stream_options( | ||
hass: HomeAssistant, stream_options: Mapping[str, str | bool | float] | ||
) -> tuple[dict[str, str], StreamSettings]: | ||
"""Convert options from stream options into PyAV options and stream settings.""" | ||
stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS]) | ||
pyav_options: dict[str, str] = {} | ||
try: | ||
STREAM_OPTIONS_SCHEMA(stream_options) | ||
except vol.Invalid as exc: | ||
raise HomeAssistantError("Invalid stream options") from exc | ||
|
||
if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME): | ||
stream_settings.hls_part_timeout += extra_wait_time | ||
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): | ||
assert isinstance(rtsp_transport, str) | ||
# The PyAV options currently match the stream CONF constants, but this | ||
# will not necessarily always be the case, so they are hard coded here | ||
pyav_options["rtsp_transport"] = rtsp_transport | ||
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): | ||
pyav_options["use_wallclock_as_timestamps"] = "1" | ||
|
||
return pyav_options, stream_settings | ||
|
||
if DOMAIN not in hass.config.components: | ||
raise HomeAssistantError("Stream integration is not set up.") | ||
|
||
# Convert extra stream options into PyAV options and stream settings | ||
pyav_options, stream_settings = convert_stream_options(hass, options) | ||
# For RTSP streams, prefer TCP | ||
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://": | ||
pyav_options = { | ||
"rtsp_flags": "prefer_tcp", | ||
"stimeout": "5000000", | ||
**pyav_options, | ||
} | ||
pyav_options, stream_settings = _convert_stream_options( | ||
hass, stream_source, options | ||
) | ||
|
||
stream = Stream( | ||
hass, | ||
|
@@ -531,7 +461,7 @@ def _run_worker(self) -> None: | |
"""Handle consuming streams and restart keepalive streams.""" | ||
# Keep import here so that we can import stream integration without installing reqs | ||
# pylint: disable-next=import-outside-toplevel | ||
from .worker import StreamState, StreamWorkerError, stream_worker | ||
from .worker import StreamState, stream_worker | ||
|
||
stream_state = StreamState(self.hass, self.outputs, self._diagnostics) | ||
wait_timeout = 0 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
from __future__ import annotations | ||
|
||
from enum import IntEnum | ||
from typing import Final | ||
|
||
DOMAIN = "stream" | ||
|
@@ -48,7 +49,7 @@ | |
CONF_PART_DURATION = "part_duration" | ||
CONF_SEGMENT_DURATION = "segment_duration" | ||
|
||
CONF_PREFER_TCP = "prefer_tcp" | ||
ATTR_PREFER_TCP = "prefer_tcp" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reaming this since its not a configuration option. This is not exposed in |
||
CONF_RTSP_TRANSPORT = "rtsp_transport" | ||
# The first dict entry below may be used as the default when populating options | ||
RTSP_TRANSPORTS = { | ||
|
@@ -59,3 +60,18 @@ | |
} | ||
CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps" | ||
CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time" | ||
|
||
|
||
class StreamClientError(IntEnum): | ||
"""Enum for stream client errors. | ||
|
||
These are errors that can be returned by the stream client when trying to | ||
open a stream. The caller should not interpret the int values directly, but | ||
should use the enum values instead. | ||
""" | ||
|
||
BadRequest = 400 | ||
Unauthorized = 401 | ||
Forbidden = 403 | ||
NotFound = 404 | ||
Other = 4 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
"""Stream component exceptions.""" | ||
|
||
from homeassistant.exceptions import HomeAssistantError | ||
|
||
from .const import StreamClientError | ||
|
||
|
||
class StreamOpenClientError(HomeAssistantError): | ||
"""Raised when client error received when trying to open a stream. | ||
|
||
:param stream_client_error: The type of client error | ||
""" | ||
|
||
def __init__(self, message: str, error_code: StreamClientError) -> None: | ||
"""Initialize a stream open client error.""" | ||
super().__init__(message) | ||
self.error_code = error_code | ||
|
||
|
||
class StreamWorkerError(Exception): | ||
"""An exception thrown while processing a stream.""" | ||
|
||
def __init__( | ||
self, message: str, error_code: StreamClientError = StreamClientError.Other | ||
) -> None: | ||
"""Initialize a stream worker error.""" | ||
super().__init__(message) | ||
self.error_code = error_code | ||
|
||
|
||
class StreamEndedError(StreamWorkerError): | ||
"""Raised when the stream is complete, exposed for facilitating testing.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, this is removing rather than moving to new import location. This can never be thrown here since its caught by
Stream
and never exposed. We can fix exception handling here by exposing the error codes in a follow up.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allenporter
Ok this makes sense.
It looks like this change is what has lowered the coverage in #133927
Lines like
...no longer have any way of being triggered.
I will add a test to raise
StreamOpenClientError
within the generic config flow to restore the test coverage.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sorry for missing that -- btw there is still not yet a way that
StreamOpenClientError
is thrown here, but need to add in a follow up.