Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional accept functionality #2663

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 137 additions & 159 deletions sanic/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,141 +35,96 @@

def parse_arg_as_accept(f):
def func(self, other, *args, **kwargs):
if not isinstance(other, Accept) and other:
other = Accept.parse(other)
if not isinstance(other, MediaType) and other:
other = MediaType._parse(other)
return f(self, other, *args, **kwargs)

return func


class MediaType(str):
def __new__(cls, value: str):
return str.__new__(cls, value)

def __init__(self, value: str) -> None:
self.value = value
self.is_wildcard = self.check_if_wildcard(value)

def __eq__(self, other):
if self.is_wildcard:
return True

if self.match(other):
return True

other_is_wildcard = (
other.is_wildcard
if isinstance(other, MediaType)
else self.check_if_wildcard(other)
)

return other_is_wildcard

def match(self, other):
other_value = other.value if isinstance(other, MediaType) else other
return self.value == other_value

@staticmethod
def check_if_wildcard(value):
return value == "*"


class Accept(str):
def __new__(cls, value: str, *args, **kwargs):
return str.__new__(cls, value)
class MediaType:
"""A media type, as used in the Accept header."""

def __init__(
self,
value: str,
type_: MediaType,
subtype: MediaType,
*,
q: str = "1.0",
**kwargs: str,
type_: str,
subtype: str,
**params: str,
):
qvalue = float(q)
if qvalue > 1 or qvalue < 0:
raise InvalidHeader(
f"Accept header qvalue must be between 0 and 1, not: {qvalue}"
)
self.value = value
self.type_ = type_
self.subtype = subtype
self.qvalue = qvalue
self.params = kwargs

def _compare(self, other, method):
try:
return method(self.qvalue, other.qvalue)
except (AttributeError, TypeError):
return NotImplemented

@parse_arg_as_accept
def __lt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s < o)

@parse_arg_as_accept
def __le__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s <= o)
self.q = float(params.get("q", "1.0"))
self.params = params
self.mime = f"{type_}/{subtype}"

@parse_arg_as_accept
def __eq__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s == o)
def __repr__(self):
return self.mime + "".join(f";{k}={v}" for k, v in self.params.items())

@parse_arg_as_accept
def __ge__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s >= o)

@parse_arg_as_accept
def __gt__(self, other: Union[str, Accept]):
return self._compare(other, lambda s, o: s > o)

@parse_arg_as_accept
def __ne__(self, other: Union[str, Accept]): # type: ignore
return self._compare(other, lambda s, o: s != o)
def __eq__(self, other):
"""Check for mime (str or MediaType) identical type/subtype."""
if isinstance(other, str):
return self.mime == other
if isinstance(other, MediaType):
return self.mime == other.mime
return NotImplemented

@parse_arg_as_accept
def match(
self,
other,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
type_match = (
self.type_ == other.type_
if allow_type_wildcard
else (
self.type_.match(other.type_)
and not self.type_.is_wildcard
and not other.type_.is_wildcard
)
)
subtype_match = (
self.subtype == other.subtype
if allow_subtype_wildcard
else (
self.subtype.match(other.subtype)
and not self.subtype.is_wildcard
and not other.subtype.is_wildcard
mime: str,
allow_type_wildcard=True,
allow_subtype_wildcard=True,
) -> Optional[MediaType]:
"""Check if this media type matches the given mime type/subtype.

Wildcards are supported both ways on both type and subtype.

Note: Use the `==` operator instead to check for literal matches
without expanding wildcards.

@param media_type: A type/subtype string to match.
@return `self` if the media types are compatible, else `None`
"""
mt = MediaType._parse(mime)
return (
self
if (
# Subtype match
(self.subtype in (mt.subtype, "*") or mt.subtype == "*")
# Type match
and (self.type_ in (mt.type_, "*") or mt.type_ == "*")
# Allow disabling wildcards (backwards compatibility with tests)
and (
allow_type_wildcard
or self.type_ != "*"
and mt.type_ != "*"
)
and (
allow_subtype_wildcard
or self.subtype != "*"
and mt.subtype != "*"
)
)
else None
)

return type_match and subtype_match
@property
def has_wildcard(self) -> bool:
"""Return True if this media type has a wildcard in it."""
return "*" in (self.subtype, self.type_)

@classmethod
def parse(cls, raw: str) -> Accept:
invalid = False
mtype = raw.strip()
@property
def is_wildcard(self) -> bool:
"""Return True if this is the wildcard `*/*`"""
return self.type_ == "*" and self.subtype == "*"

try:
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/")
except ValueError:
invalid = True
@classmethod
def _parse(cls, mime_with_params: str) -> MediaType:
mtype = mime_with_params.strip()

if invalid or not type_ or not subtype:
raise InvalidHeader(f"Header contains invalid Accept value: {raw}")
media, *raw_params = mtype.split(";")
type_, subtype = media.split("/", 1)
if not type_ or not subtype:
raise ValueError(f"Invalid media type: {mtype}")

params = dict(
[
Expand All @@ -178,28 +133,79 @@ def parse(cls, raw: str) -> Accept:
]
)

return cls(mtype, MediaType(type_), MediaType(subtype), **params)
return cls(type_.lstrip(), subtype.rstrip(), **params)


class AcceptContainer(list):
def __contains__(self, o: object) -> bool:
return any(item.match(o) for item in self)
class Matched(str):
"""A matching result of a MIME string against a MediaType."""

def match(
self,
o: object,
*,
allow_type_wildcard: bool = True,
allow_subtype_wildcard: bool = True,
) -> bool:
return any(
item.match(
o,
allow_type_wildcard=allow_type_wildcard,
allow_subtype_wildcard=allow_subtype_wildcard,
)
for item in self
def __new__(cls, mime: str, m: Optional[MediaType]):
return super().__new__(cls, mime)

def __init__(self, mime: str, m: Optional[MediaType]):
self.m = m

def __repr__(self):
return f"<{self} matched {self.m}>" if self else "<no match>"


class AcceptList(list):
"""A list of media types, as used in the Accept header.

The Accept header entries are listed in order of preference, starting
with the most preferred. This class is a list of `MediaType` objects,
that encapsulate also the q value or any other parameters.

Two separate methods are provided for searching the list:
- 'match' for finding the most preferred match (wildcards supported)
- operator 'in' for checking explicit matches (wildcards as literals)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being done? Is it ignoring q values?

Copy link
Member

@Tronic Tronic Feb 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the class that this docstring is for? Operator in is the one from list. Check the docs of AcceptList.match for how the order is determined (it considers q and order of both lists).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check how the API docs render, but the class tries to give an overview, with the match function going to specifics of that rather complex matching logic. Hopefully that shows up decently.

"""

def match(self, *mimes: str) -> Matched:
"""Find a media type accepted by the client.

This method can be used to find which of the media types requested by
the client is most preferred against the ones given as arguments.

The ordering of preference is set by:
1. The q values on the Accept header, and those being equal,
2. The order of the arguments (first is most preferred), and
3. The first matching entry on the Accept header.

Wildcards are matched both ways. A match is usually found, as the
Accept headers typically include `*/*`, in particular if the header
is missing, is not manually set, or if the client is a browser.

Note: the returned object behaves as a string of the mime argument
that matched, and is empty/falsy if no match was found. The matched
header entry `MediaType` or `None` is available as the `m` attribute.

@param mimes: Any MIME types to search for in order of preference.
@return A match object with the mime string and the MediaType object.
"""
l = sorted(
[
(-acc.q, i, j, mime, acc) # Sort by -q, i, j
for j, acc in enumerate(self)
for i, mime in enumerate(mimes)
if acc.match(mime)
]
)
return Matched(*(l[0][3:] if l else ("", None)))


def parse_accept(accept: str) -> AcceptList:
"""Parse an Accept header and order the acceptable media types in
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
if not accept:
return AcceptList()
try:
a = [MediaType._parse(mtype) for mtype in accept.split(",")]
return AcceptList(sorted(a, key=lambda mtype: -mtype.q))
except ValueError:
raise InvalidHeader(f"Invalid header value in Accept: {accept}")


def parse_content_header(value: str) -> Tuple[str, Options]:
Expand Down Expand Up @@ -368,34 +374,6 @@ def format_http1_response(status: int, headers: HeaderBytesIterable) -> bytes:
return ret


def _sort_accept_value(accept: Accept):
return (
accept.qvalue,
len(accept.params),
accept.subtype != "*",
accept.type_ != "*",
)


def parse_accept(accept: str) -> AcceptContainer:
"""Parse an Accept header and order the acceptable media types in
accorsing to RFC 7231, s. 5.3.2
https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
"""
media_types = accept.split(",")
accept_list: List[Accept] = []

for mtype in media_types:
if not mtype:
continue

accept_list.append(Accept.parse(mtype))

return AcceptContainer(
sorted(accept_list, key=_sort_accept_value, reverse=True)
)


def parse_credentials(
header: Optional[str],
prefixes: Union[List, Tuple, Set] = None,
Expand Down
6 changes: 3 additions & 3 deletions sanic/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from sanic.exceptions import BadRequest, BadURL, ServerError
from sanic.headers import (
AcceptContainer,
AcceptList,
Options,
parse_accept,
parse_content_header,
Expand Down Expand Up @@ -167,7 +167,7 @@ def __init__(
self.conn_info: Optional[ConnInfo] = None
self.ctx = SimpleNamespace()
self.parsed_forwarded: Optional[Options] = None
self.parsed_accept: Optional[AcceptContainer] = None
self.parsed_accept: Optional[AcceptList] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_json = None
self.parsed_form: Optional[RequestParameters] = None
Expand Down Expand Up @@ -499,7 +499,7 @@ def load_json(self, loads=None):
return self.parsed_json

@property
def accept(self) -> AcceptContainer:
def accept(self) -> AcceptList:
"""
:return: The ``Accept`` header parsed
:rtype: AcceptContainer
Expand Down
Loading