Skip to content

Try fetching the snapshot URI without user/pass if it fails #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions onvif/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os.path
from typing import Any
from collections.abc import Callable

import httpx
from httpx import AsyncClient, BasicAuth, DigestAuth
from zeep.cache import SqliteCache
Expand All @@ -27,7 +26,14 @@
from .settings import DEFAULT_SETTINGS
from .transport import ASYNC_TRANSPORT
from .types import FastDateTime, ForgivingTime
from .util import create_no_verify_ssl_context, normalize_url, path_isfile, utcnow
from .util import (
create_no_verify_ssl_context,
normalize_url,
path_isfile,
utcnow,
strip_user_pass_url,
obscure_user_pass_url,
)
from .wrappers import retry_connection_error # noqa: F401
from .wsa import WsAddressingIfMissingPlugin

Expand Down Expand Up @@ -573,12 +579,16 @@
else:
auth = DigestAuth(self.user, self.passwd)

try:
response = await self._snapshot_client.get(uri, auth=auth)
except httpx.TimeoutException as error:
raise ONVIFTimeoutError(f"Timed out fetching {uri}: {error}") from error
except httpx.RequestError as error:
raise ONVIFError(f"Error fetching {uri}: {error}") from error
response = await self._try_snapshot_uri(uri, auth)

Check warning on line 582 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L582

Added line #L582 was not covered by tests

# If the request fails with a 401, make sure to strip any
# sample user/pass from the URL and try again
if (

Check warning on line 586 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L586

Added line #L586 was not covered by tests
response.status_code == 401
and (stripped_uri := strip_user_pass_url(uri))
and stripped_uri != uri
):
response = await self._try_snapshot_uri(stripped_uri, auth)

Check warning on line 591 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L591

Added line #L591 was not covered by tests

if response.status_code == 401:
raise ONVIFAuthError(f"Failed to authenticate to {uri}")
Expand All @@ -588,6 +598,20 @@

return None

async def _try_snapshot_uri(
self, uri: str, auth: BasicAuth | DigestAuth | None
) -> httpx.Response:
try:
return await self._snapshot_client.get(uri, auth=auth)
except httpx.TimeoutException as error:
raise ONVIFTimeoutError(

Check warning on line 607 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L604-L607

Added lines #L604 - L607 were not covered by tests
f"Timed out fetching {obscure_user_pass_url(uri)}: {error}"
) from error
except httpx.RequestError as error:
raise ONVIFError(

Check warning on line 611 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L610-L611

Added lines #L610 - L611 were not covered by tests
f"Error fetching {obscure_user_pass_url(uri)}: {error}"
) from error

def get_definition(
self, name: str, port_type: str | None = None
) -> tuple[str, str, str]:
Expand Down
36 changes: 35 additions & 1 deletion onvif/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import ssl
from typing import Any
from urllib.parse import ParseResultBytes, urlparse, urlunparse

from yarl import URL
from multidict import CIMultiDict
from zeep.exceptions import Fault

utcnow: partial[dt.datetime] = partial(dt.datetime.now, dt.timezone.utc)
Expand All @@ -18,6 +19,8 @@
# to minimize the impact of the blocking I/O.
path_isfile = lru_cache(maxsize=128)(os.path.isfile)

_CREDENTIAL_KEYS = ("username", "password", "user", "pass")


def normalize_url(url: bytes | str | None) -> str | None:
"""Normalize URL.
Expand Down Expand Up @@ -105,3 +108,34 @@ def create_no_verify_ssl_context() -> ssl.SSLContext:
# ssl.OP_LEGACY_SERVER_CONNECT is only available in Python 3.12a4+
sslcontext.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4)
return sslcontext


def strip_user_pass_url(url: str) -> str:
"""Strip password from URL."""
parsed_url = URL(url)
query = parsed_url.query
new_query: CIMultiDict | None = None
for key in _CREDENTIAL_KEYS:
if key in query:
if new_query is None:
new_query = CIMultiDict(parsed_url.query)
new_query.popall(key)
if new_query is not None:
return str(parsed_url.with_query(new_query))
return url


def obscure_user_pass_url(url: str) -> str:
"""Obscure user and password from URL."""
parsed_url = URL(url)
query = parsed_url.query
new_query: CIMultiDict | None = None
for key in _CREDENTIAL_KEYS:
if key in query:
if new_query is None:
new_query = CIMultiDict(parsed_url.query)
new_query.popall(key)
new_query[key] = "********"
if new_query is not None:
return str(parsed_url.with_query(new_query))
return url
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
ciso8601==2.3.2
httpx==0.28.1
zeep[async]==4.3.1
yarl>=1.10.0
14 changes: 14 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from zeep.loader import parse_xml
from onvif.util import strip_user_pass_url, obscure_user_pass_url

from onvif.client import ONVIFCamera
from onvif.settings import DEFAULT_SETTINGS
Expand Down Expand Up @@ -40,3 +41,16 @@ async def test_normalize_url_with_missing_url():
)
result = operation.process_reply(envelope)
assert normalize_url(result.SubscriptionReference.Address._value_1) is None


def test_strip_user_pass_url():
assert strip_user_pass_url("http://1.2.3.4/?user=foo&pass=bar") == "http://1.2.3.4/"
assert strip_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/"


def test_obscure_user_pass_url():
assert (
obscure_user_pass_url("http://1.2.3.4/?user=foo&pass=bar")
== "http://1.2.3.4/?user=********&pass=********"
)
assert obscure_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/"
Loading