Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reolink prevent ONVIF push being lost due to ConnectionResetError #91070

Merged
merged 23 commits into from
Apr 15, 2023
Merged
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions homeassistant/components/reolink/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
self._base_url: str = ""
self._webhook_url: str = ""
self._webhook_reachable: asyncio.Event = asyncio.Event()
self._webhook_read_done: asyncio.Event = asyncio.Event()
self._lost_subscription: bool = False

@property
Expand Down Expand Up @@ -363,32 +364,52 @@ async def handle_webhook(
self, hass: HomeAssistant, webhook_id: str, request: Request
):
"""Shield the incoming webhook callback from cancellation."""
await asyncio.shield(self.handle_webhook_shielded(hass, webhook_id, request))
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._webhook_read_done.clear()
shielded_task = hass.async_create_task(
self._handle_webhook(hass, webhook_id, request)
)
asyncio.shield(shielded_task)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.debug("Webhook '%s' called", webhook_id)
await self._webhook_read_done.wait()

async def handle_webhook_shielded(
async def _handle_webhook(
self, hass: HomeAssistant, webhook_id: str, request: Request
):
"""Handle incoming webhook from Reolink for inbound messages and calls."""
try:
try:
data = await request.text()
except ConnectionResetError:
_LOGGER.debug(
"Webhook '%s' called, but lost connection before reading message, issuing poll",
webhook_id,
)
if await self._api.get_motion_state_all_ch():
async_dispatcher_send(hass, f"{webhook_id}_all", {})
return
_LOGGER.error(
"Could not poll motion state after losing connection during receiving ONVIF event"
)
return
finally:
self._webhook_read_done.set()
if not self._webhook_reachable.is_set():
self._webhook_reachable.set()
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")

_LOGGER.debug("Webhook '%s' called", webhook_id)
if not self._webhook_reachable.is_set():
self._webhook_reachable.set()

if not request.body_exists:
_LOGGER.debug("Webhook '%s' triggered without payload", webhook_id)
return

data = await request.text()
if not data:
_LOGGER.debug(
"Webhook '%s' triggered with unknown payload: %s", webhook_id, data
)
return
if not data:
_LOGGER.debug(
"Webhook '%s' triggered with unknown payload: %s", webhook_id, data
)
return

channels = await self._api.ONVIF_event_callback(data)
if (channels := await self._api.ONVIF_event_callback(data)) is None:
async_dispatcher_send(hass, f"{webhook_id}_all", {})
return

if channels is None:
async_dispatcher_send(hass, f"{webhook_id}_all", {})
else:
for channel in channels:
async_dispatcher_send(hass, f"{webhook_id}_{channel}", {})
except Exception as err: # pylint: disable=broad-except
# Make sure unexpected error in "_handle_webhook" are logged,
# since nothing is awaiting this function
_LOGGER.exception(err)