Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reolink prevent ONVIF push being lost due to ConnectionResetError #91070

Merged
merged 23 commits into from
Apr 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions homeassistant/components/reolink/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,34 +361,70 @@ def unregister_webhook(self):

async def handle_webhook(
self, hass: HomeAssistant, webhook_id: str, request: Request
):
) -> None:
"""Shield the incoming webhook callback from cancellation."""
await asyncio.shield(self.handle_webhook_shielded(hass, webhook_id, request))
bdraco marked this conversation as resolved.
Show resolved Hide resolved

async def handle_webhook_shielded(
self, hass: HomeAssistant, webhook_id: str, request: Request
):
"""Handle incoming webhook from Reolink for inbound messages and calls."""

shielded_future = asyncio.shield(
self._handle_webhook(hass, webhook_id, request)
)
_LOGGER.debug("Webhook '%s' called", webhook_id)
if not self._webhook_reachable.is_set():
self._webhook_reachable.set()
ir.async_delete_issue(self._hass, DOMAIN, "webhook_url")
await shielded_future

if not request.body_exists:
_LOGGER.debug("Webhook '%s' triggered without payload", webhook_id)
async def _handle_webhook(
self, hass: HomeAssistant, webhook_id: str, request: Request
) -> None:
"""Handle incoming webhook from Reolink for inbound messages and calls."""
try:
data = await request.text()
except ConnectionResetError:
# We lost the connection before reading the message, fallback to polling
# No need for a background task here as we already know the connection is lost
_LOGGER.debug(
"Webhook '%s' called, but lost connection before reading message, issuing poll",
webhook_id,
)
if not await self._api.get_motion_state_all_ch():
_LOGGER.error(
"Could not poll motion state after losing connection during receiving ONVIF event"
)
return
async_dispatcher_send(hass, f"{webhook_id}_all", {})
return

data = await request.text()
if not data:
_LOGGER.debug(
"Webhook '%s' triggered with unknown payload: %s", webhook_id, data
)
return

channels = await self._api.ONVIF_event_callback(data)
# We received the data but we want handle_webhook to return as soon as possible
# so we process the data in the background
hass.async_create_background_task(
self._process_webhook_data(hass, webhook_id, data),
"Process Reolink webhook",
)

async def _process_webhook_data(
self, hass: HomeAssistant, webhook_id: str, data: str
) -> None:
"""Process the data from the webhook."""
# This task is executed in the background so we need to catch exceptions
# and log them
try:
channels = await self._api.ONVIF_event_callback(data)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.exception(
"Error processing ONVIF event for Reolink %s: %s",
self._api.nvr_name,
ex,
)
return

if channels is None:
async_dispatcher_send(hass, f"{webhook_id}_all", {})
else:
for channel in channels:
async_dispatcher_send(hass, f"{webhook_id}_{channel}", {})
return

for channel in channels:
async_dispatcher_send(hass, f"{webhook_id}_{channel}", {})