From 55967c22cffa823ab8a207b891b8acd0937fc7f6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 20 Oct 2024 19:20:58 -1000 Subject: [PATCH] Move URL parsing functions to their own module (#1360) --- yarl/_parse.py | 188 +++++++++++++++++++++++++++++++++++++++++ yarl/_url.py | 225 +++++-------------------------------------------- 2 files changed, 208 insertions(+), 205 deletions(-) create mode 100644 yarl/_parse.py diff --git a/yarl/_parse.py b/yarl/_parse.py new file mode 100644 index 00000000..67e57481 --- /dev/null +++ b/yarl/_parse.py @@ -0,0 +1,188 @@ +"""URL parsing utilities.""" + +import re +import unicodedata +from functools import lru_cache +from typing import Union +from urllib.parse import SplitResult, scheme_chars, uses_netloc + +from ._quoters import QUOTER + +# Leading and trailing C0 control and space to be stripped per WHATWG spec. +# == "".join([chr(i) for i in range(0, 0x20 + 1)]) +WHATWG_C0_CONTROL_OR_SPACE = ( + "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10" + "\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f " +) + +# Unsafe bytes to be removed per WHATWG spec +UNSAFE_URL_BYTES_TO_REMOVE = ["\t", "\r", "\n"] +USES_AUTHORITY = frozenset(uses_netloc) + + +@lru_cache +def split_url(url: str) -> SplitResult: + """Split URL into parts.""" + # Adapted from urllib.parse.urlsplit + # Only lstrip url as some applications rely on preserving trailing space. + # (https://url.spec.whatwg.org/#concept-basic-url-parser would strip both) + url = url.lstrip(WHATWG_C0_CONTROL_OR_SPACE) + for b in UNSAFE_URL_BYTES_TO_REMOVE: + if b in url: + url = url.replace(b, "") + + scheme = netloc = query = fragment = "" + i = url.find(":") + if i > 0 and url[0] in scheme_chars: + for c in url[1:i]: + if c not in scheme_chars: + break + else: + scheme, url = url[:i].lower(), url[i + 1 :] + has_hash = "#" in url + has_question_mark = "?" in url + if url[:2] == "//": + delim = len(url) # position of end of domain part of url, default is end + if has_hash and has_question_mark: + delim_chars = "/?#" + elif has_question_mark: + delim_chars = "/?" + elif has_hash: + delim_chars = "/#" + else: + delim_chars = "/" + for c in delim_chars: # look for delimiters; the order is NOT important + wdelim = url.find(c, 2) # find first of this delim + if wdelim >= 0 and wdelim < delim: # if found + delim = wdelim # use earliest delim position + netloc = url[2:delim] + url = url[delim:] + has_left_bracket = "[" in netloc + has_right_bracket = "]" in netloc + if (has_left_bracket and not has_right_bracket) or ( + has_right_bracket and not has_left_bracket + ): + raise ValueError("Invalid IPv6 URL") + if has_left_bracket: + bracketed_host = netloc.partition("[")[2].partition("]")[0] + # Valid bracketed hosts are defined in + # https://www.rfc-editor.org/rfc/rfc3986#page-49 + # https://url.spec.whatwg.org/ + if bracketed_host[0] == "v": + if not re.match(r"\Av[a-fA-F0-9]+\..+\Z", bracketed_host): + raise ValueError("IPvFuture address is invalid") + elif ":" not in bracketed_host: + raise ValueError("An IPv4 address cannot be in brackets") + if has_hash: + url, _, fragment = url.partition("#") + if has_question_mark: + url, _, query = url.partition("?") + if netloc and not netloc.isascii(): + _check_netloc(netloc) + return tuple.__new__(SplitResult, (scheme, netloc, url, query, fragment)) + + +def _check_netloc(netloc: str) -> None: + # Adapted from urllib.parse._checknetloc + # looking for characters like \u2100 that expand to 'a/c' + # IDNA uses NFKC equivalence, so normalize for this check + + # ignore characters already included + # but not the surrounding text + n = netloc.replace("@", "").replace(":", "").replace("#", "").replace("?", "") + normalized_netloc = unicodedata.normalize("NFKC", n) + if n == normalized_netloc: + return + # Note that there are no unicode decompositions for the character '@' so + # its currently impossible to have test coverage for this branch, however if the + # one should be added in the future we want to make sure its still checked. + for c in "/?#@:": # pragma: no branch + if c in normalized_netloc: + raise ValueError( + f"netloc '{netloc}' contains invalid " + "characters under NFKC normalization" + ) + + +@lru_cache # match the same size as urlsplit +def split_netloc( + netloc: str, +) -> tuple[Union[str, None], Union[str, None], Union[str, None], Union[int, None]]: + """Split netloc into username, password, host and port.""" + if "@" not in netloc: + username: Union[str, None] = None + password: Union[str, None] = None + hostinfo = netloc + else: + userinfo, _, hostinfo = netloc.rpartition("@") + username, have_password, password = userinfo.partition(":") + if not have_password: + password = None + + if "[" in hostinfo: + _, _, bracketed = hostinfo.partition("[") + hostname, _, port_str = bracketed.partition("]") + _, _, port_str = port_str.partition(":") + else: + hostname, _, port_str = hostinfo.partition(":") + + if not port_str: + return username or None, password, hostname or None, None + + try: + port = int(port_str) + except ValueError: + raise ValueError("Invalid URL: port can't be converted to integer") + if not (0 <= port <= 65535): + raise ValueError("Port out of range 0-65535") + return username or None, password, hostname or None, port + + +def unsplit_result( + scheme: str, netloc: str, url: str, query: str, fragment: str +) -> str: + """Unsplit a URL without any normalization.""" + if netloc or (scheme and scheme in USES_AUTHORITY) or url[:2] == "//": + if url and url[:1] != "/": + url = f"//{netloc or ''}/{url}" + else: + url = f"//{netloc or ''}{url}" + if scheme: + url = f"{scheme}:{url}" + if query: + url = f"{url}?{query}" + return f"{url}#{fragment}" if fragment else url + + +@lru_cache # match the same size as urlsplit +def make_netloc( + user: Union[str, None], + password: Union[str, None], + host: Union[str, None], + port: Union[int, None], + encode: bool = False, +) -> str: + """Make netloc from parts. + + The user and password are encoded if encode is True. + + The host must already be encoded with _encode_host. + """ + if host is None: + return "" + ret = host + if port is not None: + ret = f"{ret}:{port}" + if user is None and password is None: + return ret + if password is not None: + if not user: + user = "" + elif encode: + user = QUOTER(user) + if encode: + password = QUOTER(password) + user = f"{user}:{password}" + elif user and encode: + user = QUOTER(user) + return f"{user}@{ret}" if user else ret diff --git a/yarl/_url.py b/yarl/_url.py index 0a3b9c25..4589d801 100644 --- a/yarl/_url.py +++ b/yarl/_url.py @@ -1,25 +1,18 @@ import re import sys -import unicodedata import warnings from collections.abc import Mapping, Sequence from contextlib import suppress from functools import _CacheInfo, lru_cache from ipaddress import ip_address from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Union, overload -from urllib.parse import ( - SplitResult, - parse_qsl, - quote, - scheme_chars, - uses_netloc, - uses_relative, -) +from urllib.parse import SplitResult, parse_qsl, quote, uses_relative import idna from multidict import MultiDict, MultiDictProxy from propcache.api import under_cached_property as cached_property +from ._parse import USES_AUTHORITY, make_netloc, split_netloc, split_url, unsplit_result from ._query import ( Query, QueryVariable, @@ -44,22 +37,12 @@ ) DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21} -USES_AUTHORITY = frozenset(uses_netloc) USES_RELATIVE = frozenset(uses_relative) # Special schemes https://url.spec.whatwg.org/#special-scheme # are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp")) -# Leading and trailing C0 control and space to be stripped per WHATWG spec. -# == "".join([chr(i) for i in range(0, 0x20 + 1)]) -WHATWG_C0_CONTROL_OR_SPACE = ( - "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10" - "\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f " -) - -# Unsafe bytes to be removed per WHATWG spec -UNSAFE_URL_BYTES_TO_REMOVE = ["\t", "\r", "\n"] # reg-name: unreserved / pct-encoded / sub-delims # this pattern matches anything that is *not* in those classes. and is only used @@ -174,174 +157,6 @@ def _normalize_path(path: str) -> str: return prefix + "/".join(_normalize_path_segments(segments)) -@lru_cache -def _split_url(url: str) -> SplitResult: - """Split URL into parts.""" - # Adapted from urllib.parse.urlsplit - # Only lstrip url as some applications rely on preserving trailing space. - # (https://url.spec.whatwg.org/#concept-basic-url-parser would strip both) - url = url.lstrip(WHATWG_C0_CONTROL_OR_SPACE) - for b in UNSAFE_URL_BYTES_TO_REMOVE: - if b in url: - url = url.replace(b, "") - - scheme = netloc = query = fragment = "" - i = url.find(":") - if i > 0 and url[0] in scheme_chars: - for c in url[1:i]: - if c not in scheme_chars: - break - else: - scheme, url = url[:i].lower(), url[i + 1 :] - has_hash = "#" in url - has_question_mark = "?" in url - if url[:2] == "//": - delim = len(url) # position of end of domain part of url, default is end - if has_hash and has_question_mark: - delim_chars = "/?#" - elif has_question_mark: - delim_chars = "/?" - elif has_hash: - delim_chars = "/#" - else: - delim_chars = "/" - for c in delim_chars: # look for delimiters; the order is NOT important - wdelim = url.find(c, 2) # find first of this delim - if wdelim >= 0 and wdelim < delim: # if found - delim = wdelim # use earliest delim position - netloc = url[2:delim] - url = url[delim:] - has_left_bracket = "[" in netloc - has_right_bracket = "]" in netloc - if (has_left_bracket and not has_right_bracket) or ( - has_right_bracket and not has_left_bracket - ): - raise ValueError("Invalid IPv6 URL") - if has_left_bracket: - bracketed_host = netloc.partition("[")[2].partition("]")[0] - # Valid bracketed hosts are defined in - # https://www.rfc-editor.org/rfc/rfc3986#page-49 - # https://url.spec.whatwg.org/ - if bracketed_host[0] == "v": - if not re.match(r"\Av[a-fA-F0-9]+\..+\Z", bracketed_host): - raise ValueError("IPvFuture address is invalid") - elif ":" not in bracketed_host: - raise ValueError("An IPv4 address cannot be in brackets") - if has_hash: - url, _, fragment = url.partition("#") - if has_question_mark: - url, _, query = url.partition("?") - if netloc and not netloc.isascii(): - _check_netloc(netloc) - return tuple.__new__(SplitResult, (scheme, netloc, url, query, fragment)) - - -def _check_netloc(netloc: str) -> None: - # Adapted from urllib.parse._checknetloc - # looking for characters like \u2100 that expand to 'a/c' - # IDNA uses NFKC equivalence, so normalize for this check - - # ignore characters already included - # but not the surrounding text - n = netloc.replace("@", "").replace(":", "").replace("#", "").replace("?", "") - normalized_netloc = unicodedata.normalize("NFKC", n) - if n == normalized_netloc: - return - # Note that there are no unicode decompositions for the character '@' so - # its currently impossible to have test coverage for this branch, however if the - # one should be added in the future we want to make sure its still checked. - for c in "/?#@:": # pragma: no branch - if c in normalized_netloc: - raise ValueError( - f"netloc '{netloc}' contains invalid " - "characters under NFKC normalization" - ) - - -@lru_cache # match the same size as urlsplit -def _split_netloc( - netloc: str, -) -> tuple[Union[str, None], Union[str, None], Union[str, None], Union[int, None]]: - """Split netloc into username, password, host and port.""" - if "@" not in netloc: - username: Union[str, None] = None - password: Union[str, None] = None - hostinfo = netloc - else: - userinfo, _, hostinfo = netloc.rpartition("@") - username, have_password, password = userinfo.partition(":") - if not have_password: - password = None - - if "[" in hostinfo: - _, _, bracketed = hostinfo.partition("[") - hostname, _, port_str = bracketed.partition("]") - _, _, port_str = port_str.partition(":") - else: - hostname, _, port_str = hostinfo.partition(":") - - if not port_str: - return username or None, password, hostname or None, None - - try: - port = int(port_str) - except ValueError: - raise ValueError("Invalid URL: port can't be converted to integer") - if not (0 <= port <= 65535): - raise ValueError("Port out of range 0-65535") - return username or None, password, hostname or None, port - - -def _unsplit_result( - scheme: str, netloc: str, url: str, query: str, fragment: str -) -> str: - """Unsplit a URL without any normalization.""" - if netloc or (scheme and scheme in USES_AUTHORITY) or url[:2] == "//": - if url and url[:1] != "/": - url = f"//{netloc or ''}/{url}" - else: - url = f"//{netloc or ''}{url}" - if scheme: - url = f"{scheme}:{url}" - if query: - url = f"{url}?{query}" - return f"{url}#{fragment}" if fragment else url - - -@lru_cache # match the same size as urlsplit -def _make_netloc( - user: Union[str, None], - password: Union[str, None], - host: Union[str, None], - port: Union[int, None], - encode: bool = False, -) -> str: - """Make netloc from parts. - - The user and password are encoded if encode is True. - - The host must already be encoded with _encode_host. - """ - if host is None: - return "" - ret = host - if port is not None: - ret = f"{ret}:{port}" - if user is None and password is None: - return ret - if password is not None: - if not user: - user = "" - elif encode: - user = QUOTER(user) - if encode: - password = QUOTER(password) - user = f"{user}:{password}" - elif user and encode: - user = QUOTER(user) - return f"{user}@{ret}" if user else ret - - def _raise_for_authority_missing_abs_path() -> None: """Raise when he path in URL with authority starts lacks a leading slash.""" msg = "Path in a URL with authority should start with a slash ('/') if set" @@ -433,14 +248,14 @@ def __new__( if strict is not None: # pragma: no cover warnings.warn("strict parameter is ignored") if type(val) is str: - val = _split_url(val) + val = split_url(val) elif type(val) is cls: return val elif type(val) is SplitResult: if not encoded: raise ValueError("Cannot apply decoding to SplitResult") elif isinstance(val, str): - val = _split_url(str(val)) + val = split_url(str(val)) else: raise TypeError("Constructor parameter should be str") @@ -457,7 +272,7 @@ def __new__( else: if ":" in netloc or "@" in netloc or "[" in netloc: # Complex netloc - username, password, host, port = _split_netloc(netloc) + username, password, host, port = split_netloc(netloc) else: username = password = port = None host = netloc @@ -482,7 +297,7 @@ def __new__( else: raw_user = REQUOTER(username) if username else username raw_password = REQUOTER(password) if password else password - netloc = _make_netloc(raw_user, raw_password, host, port) + netloc = make_netloc(raw_user, raw_password, host, port) cache["raw_user"] = raw_user cache["raw_password"] = raw_password @@ -571,13 +386,13 @@ def build( if user is None and password is None: netloc = host if port is None else f"{host}:{port}" else: - netloc = _make_netloc(user, password, host, port) + netloc = make_netloc(user, password, host, port) else: netloc = "" else: # not encoded _host: Union[str, None] = None if authority: - user, password, _host, port = _split_netloc(authority) + user, password, _host, port = split_netloc(authority) _host = _encode_host(_host, validate_host=False) if _host else "" elif host: _host = _encode_host(host, validate_host=True) @@ -590,7 +405,7 @@ def build( if user is None and password is None: netloc = _host if port is None else f"{_host}:{port}" else: - netloc = _make_netloc(user, password, _host, port, True) + netloc = make_netloc(user, password, _host, port, True) path = PATH_QUOTER(path) if path else path if path and netloc: @@ -641,8 +456,8 @@ def __str__(self) -> str: # port normalization - using None for default ports to remove from rendering # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3 host = self.host_subcomponent - netloc = _make_netloc(self.raw_user, self.raw_password, host, None) - return _unsplit_result(scheme, netloc, path, query, fragment) + netloc = make_netloc(self.raw_user, self.raw_password, host, None) + return unsplit_result(scheme, netloc, path, query, fragment) def __repr__(self) -> str: return f"{self.__class__.__name__}('{str(self)}')" @@ -721,7 +536,7 @@ def __setstate__(self, state): def _cache_netloc(self) -> None: """Cache the netloc parts of the URL.""" c = self._cache - split_loc = _split_netloc(self._val.netloc) + split_loc = split_netloc(self._val.netloc) c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc def is_absolute(self) -> bool: @@ -774,7 +589,7 @@ def _origin(self) -> "URL": raise ValueError("URL should have scheme") if "@" in netloc: encoded_host = self.host_subcomponent - netloc = _make_netloc(None, None, encoded_host, self.explicit_port) + netloc = make_netloc(None, None, encoded_host, self.explicit_port) elif not path and not query and not fragment: return self return self._from_tup((scheme, netloc, "", "", "")) @@ -834,7 +649,7 @@ def authority(self) -> str: Empty string for relative URLs. """ - return _make_netloc(self.user, self.password, self.host, self.port) + return make_netloc(self.user, self.password, self.host, self.port) @cached_property def raw_user(self) -> Union[str, None]: @@ -1193,7 +1008,7 @@ def with_user(self, user: Union[str, None]) -> "URL": if not netloc: raise ValueError("user replacement is not allowed for relative URLs") encoded_host = self.host_subcomponent or "" - netloc = _make_netloc(user, password, encoded_host, self.explicit_port) + netloc = make_netloc(user, password, encoded_host, self.explicit_port) return self._from_tup((scheme, netloc, path, query, fragment)) def with_password(self, password: Union[str, None]) -> "URL": @@ -1216,7 +1031,7 @@ def with_password(self, password: Union[str, None]) -> "URL": raise ValueError("password replacement is not allowed for relative URLs") encoded_host = self.host_subcomponent or "" port = self.explicit_port - netloc = _make_netloc(self.raw_user, password, encoded_host, port) + netloc = make_netloc(self.raw_user, password, encoded_host, port) return self._from_tup((scheme, netloc, path, query, fragment)) def with_host(self, host: str) -> "URL": @@ -1238,7 +1053,7 @@ def with_host(self, host: str) -> "URL": raise ValueError("host removing is not allowed") encoded_host = _encode_host(host, validate_host=True) if host else "" port = self.explicit_port - netloc = _make_netloc(self.raw_user, self.raw_password, encoded_host, port) + netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) return self._from_tup((scheme, netloc, path, query, fragment)) def with_port(self, port: Union[int, None]) -> "URL": @@ -1257,7 +1072,7 @@ def with_port(self, port: Union[int, None]) -> "URL": if not netloc: raise ValueError("port replacement is not allowed for relative URLs") encoded_host = self.host_subcomponent or "" - netloc = _make_netloc(self.raw_user, self.raw_password, encoded_host, port) + netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) return self._from_tup((scheme, netloc, path, query, fragment)) def with_path(self, path: str, *, encoded: bool = False) -> "URL": @@ -1533,9 +1348,9 @@ def human_repr(self) -> str: fragment = _human_quote(self.fragment, "") if TYPE_CHECKING: assert fragment is not None - netloc = _make_netloc(user, password, host, self.explicit_port) + netloc = make_netloc(user, password, host, self.explicit_port) scheme = self._val.scheme - return _unsplit_result(scheme, netloc, path, query_string, fragment) + return unsplit_result(scheme, netloc, path, query_string, fragment) def _human_quote(s: Union[str, None], unsafe: str) -> Union[str, None]: