from __future__ import annotations | |
import email.utils | |
import re | |
import typing as t | |
import warnings | |
from datetime import date | |
from datetime import datetime | |
from datetime import time | |
from datetime import timedelta | |
from datetime import timezone | |
from enum import Enum | |
from hashlib import sha1 | |
from time import mktime | |
from time import struct_time | |
from urllib.parse import quote | |
from urllib.parse import unquote | |
from urllib.request import parse_http_list as _parse_list_header | |
from ._internal import _dt_as_utc | |
from ._internal import _plain_int | |
from _typeshed.wsgi import WSGIEnvironment | |
_token_chars = frozenset( | |
"!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" | |
) | |
_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') | |
_entity_headers = frozenset( | |
[ | |
"allow", | |
"content-encoding", | |
"content-language", | |
"content-length", | |
"content-location", | |
"content-md5", | |
"content-range", | |
"content-type", | |
"expires", | |
"last-modified", | |
] | |
) | |
_hop_by_hop_headers = frozenset( | |
[ | |
"connection", | |
"keep-alive", | |
"proxy-authenticate", | |
"proxy-authorization", | |
"te", | |
"trailer", | |
"transfer-encoding", | |
"upgrade", | |
] | |
) | |
100: "Continue", | |
101: "Switching Protocols", | |
102: "Processing", | |
103: "Early Hints", # see RFC 8297 | |
200: "OK", | |
201: "Created", | |
202: "Accepted", | |
203: "Non Authoritative Information", | |
204: "No Content", | |
205: "Reset Content", | |
206: "Partial Content", | |
207: "Multi Status", | |
208: "Already Reported", # see RFC 5842 | |
226: "IM Used", # see RFC 3229 | |
300: "Multiple Choices", | |
301: "Moved Permanently", | |
302: "Found", | |
303: "See Other", | |
304: "Not Modified", | |
305: "Use Proxy", | |
306: "Switch Proxy", # unused | |
307: "Temporary Redirect", | |
308: "Permanent Redirect", | |
400: "Bad Request", | |
401: "Unauthorized", | |
402: "Payment Required", # unused | |
403: "Forbidden", | |
404: "Not Found", | |
405: "Method Not Allowed", | |
406: "Not Acceptable", | |
407: "Proxy Authentication Required", | |
408: "Request Timeout", | |
409: "Conflict", | |
410: "Gone", | |
411: "Length Required", | |
412: "Precondition Failed", | |
413: "Request Entity Too Large", | |
414: "Request URI Too Long", | |
415: "Unsupported Media Type", | |
416: "Requested Range Not Satisfiable", | |
417: "Expectation Failed", | |
418: "I'm a teapot", # see RFC 2324 | |
421: "Misdirected Request", # see RFC 7540 | |
422: "Unprocessable Entity", | |
423: "Locked", | |
424: "Failed Dependency", | |
425: "Too Early", # see RFC 8470 | |
426: "Upgrade Required", | |
428: "Precondition Required", # see RFC 6585 | |
429: "Too Many Requests", | |
431: "Request Header Fields Too Large", | |
449: "Retry With", # proprietary MS extension | |
451: "Unavailable For Legal Reasons", | |
500: "Internal Server Error", | |
501: "Not Implemented", | |
502: "Bad Gateway", | |
503: "Service Unavailable", | |
504: "Gateway Timeout", | |
505: "HTTP Version Not Supported", | |
506: "Variant Also Negotiates", # see RFC 2295 | |
507: "Insufficient Storage", | |
508: "Loop Detected", # see RFC 5842 | |
510: "Not Extended", | |
511: "Network Authentication Failed", | |
} | |
class COEP(Enum): | |
"""Cross Origin Embedder Policies""" | |
UNSAFE_NONE = "unsafe-none" | |
REQUIRE_CORP = "require-corp" | |
class COOP(Enum): | |
"""Cross Origin Opener Policies""" | |
UNSAFE_NONE = "unsafe-none" | |
SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" | |
SAME_ORIGIN = "same-origin" | |
def quote_header_value(value: t.Any, allow_token: bool = True) -> str: | |
"""Add double quotes around a header value. If the header contains only ASCII token | |
characters, it will be returned unchanged. If the header contains ``"`` or ``\\`` | |
characters, they will be escaped with an additional ``\\`` character. | |
This is the reverse of :func:`unquote_header_value`. | |
:param value: The value to quote. Will be converted to a string. | |
:param allow_token: Disable to quote the value even if it only has token characters. | |
.. versionchanged:: 3.0 | |
Passing bytes is not supported. | |
.. versionchanged:: 3.0 | |
The ``extra_chars`` parameter is removed. | |
.. versionchanged:: 2.3 | |
The value is quoted if it is the empty string. | |
.. versionadded:: 0.5 | |
""" | |
value_str = str(value) | |
if not value_str: | |
return '""' | |
if allow_token: | |
token_chars = _token_chars | |
if token_chars.issuperset(value_str): | |
return value_str | |
value_str = value_str.replace("\\", "\\\\").replace('"', '\\"') | |
return f'"{value_str}"' | |
def unquote_header_value(value: str) -> str: | |
"""Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a | |
header value. | |
This is the reverse of :func:`quote_header_value`. | |
:param value: The header value to unquote. | |
.. versionchanged:: 3.0 | |
The ``is_filename`` parameter is removed. | |
""" | |
if len(value) >= 2 and value[0] == value[-1] == '"': | |
value = value[1:-1] | |
return value.replace("\\\\", "\\").replace('\\"', '"') | |
return value | |
def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str: | |
"""Produce a header value and ``key=value`` parameters separated by semicolons | |
``;``. For example, the ``Content-Type`` header. | |
.. code-block:: python | |
dump_options_header("text/html", {"charset": "UTF-8"}) | |
'text/html; charset=UTF-8' | |
This is the reverse of :func:`parse_options_header`. | |
If a value contains non-token characters, it will be quoted. | |
If a value is ``None``, the parameter is skipped. | |
In some keys for some headers, a UTF-8 value can be encoded using a special | |
``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will | |
not produce that format automatically, but if a given key ends with an asterisk | |
``*``, the value is assumed to have that form and will not be quoted further. | |
:param header: The primary header value. | |
:param options: Parameters to encode as ``key=value`` pairs. | |
.. versionchanged:: 2.3 | |
Keys with ``None`` values are skipped rather than treated as a bare key. | |
.. versionchanged:: 2.2.3 | |
If a key ends with ``*``, its value will not be quoted. | |
""" | |
segments = [] | |
if header is not None: | |
segments.append(header) | |
for key, value in options.items(): | |
if value is None: | |
continue | |
if key[-1] == "*": | |
segments.append(f"{key}={value}") | |
else: | |
segments.append(f"{key}={quote_header_value(value)}") | |
return "; ".join(segments) | |
def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str: | |
"""Produce a header value from a list of items or ``key=value`` pairs, separated by | |
commas ``,``. | |
This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and | |
:func:`parse_set_header`. | |
If a value contains non-token characters, it will be quoted. | |
If a value is ``None``, the key is output alone. | |
In some keys for some headers, a UTF-8 value can be encoded using a special | |
``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will | |
not produce that format automatically, but if a given key ends with an asterisk | |
``*``, the value is assumed to have that form and will not be quoted further. | |
.. code-block:: python | |
dump_header(["foo", "bar baz"]) | |
'foo, "bar baz"' | |
dump_header({"foo": "bar baz"}) | |
'foo="bar baz"' | |
:param iterable: The items to create a header from. | |
.. versionchanged:: 3.0 | |
The ``allow_token`` parameter is removed. | |
.. versionchanged:: 2.2.3 | |
If a key ends with ``*``, its value will not be quoted. | |
""" | |
if isinstance(iterable, dict): | |
items = [] | |
for key, value in iterable.items(): | |
if value is None: | |
items.append(key) | |
elif key[-1] == "*": | |
items.append(f"{key}={value}") | |
else: | |
items.append(f"{key}={quote_header_value(value)}") | |
else: | |
items = [quote_header_value(x) for x in iterable] | |
return ", ".join(items) | |
def dump_csp_header(header: ds.ContentSecurityPolicy) -> str: | |
"""Dump a Content Security Policy header. | |
These are structured into policies such as "default-src 'self'; | |
script-src 'self'". | |
.. versionadded:: 1.0.0 | |
Support for Content Security Policy headers was added. | |
""" | |
return "; ".join(f"{key} {value}" for key, value in header.items()) | |
def parse_list_header(value: str) -> list[str]: | |
"""Parse a header value that consists of a list of comma separated items according | |
to `RFC 9110 <>`__. | |
This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes | |
from values. | |
.. code-block:: python | |
parse_list_header('token, "quoted value"') | |
['token', 'quoted value'] | |
This is the reverse of :func:`dump_header`. | |
:param value: The header value to parse. | |
""" | |
result = [] | |
for item in _parse_list_header(value): | |
if len(item) >= 2 and item[0] == item[-1] == '"': | |
item = item[1:-1] | |
result.append(item) | |
return result | |
def parse_dict_header(value: str) -> dict[str, str | None]: | |
"""Parse a list header using :func:`parse_list_header`, then parse each item as a | |
``key=value`` pair. | |
.. code-block:: python | |
parse_dict_header('a=b, c="d, e", f') | |
{"a": "b", "c": "d, e", "f": None} | |
This is the reverse of :func:`dump_header`. | |
If a key does not have a value, it is ``None``. | |
This handles charsets for values as described in | |
`RFC 2231 <>`__. Only ASCII, UTF-8, | |
and ISO-8859-1 charsets are accepted, otherwise the value remains quoted. | |
:param value: The header value to parse. | |
.. versionchanged:: 3.0 | |
Passing bytes is not supported. | |
.. versionchanged:: 3.0 | |
The ``cls`` argument is removed. | |
.. versionchanged:: 2.3 | |
Added support for ``key*=charset''value`` encoded items. | |
.. versionchanged:: 0.9 | |
The ``cls`` argument was added. | |
""" | |
result: dict[str, str | None] = {} | |
for item in parse_list_header(value): | |
key, has_value, value = item.partition("=") | |
key = key.strip() | |
if not has_value: | |
result[key] = None | |
continue | |
value = value.strip() | |
encoding: str | None = None | |
if key[-1] == "*": | |
# key*=charset''value becomes key=value, where value is percent encoded | |
# adapted from parse_options_header, without the continuation handling | |
key = key[:-1] | |
match = _charset_value_re.match(value) | |
if match: | |
# If there is a charset marker in the value, split it off. | |
encoding, value = match.groups() | |
encoding = encoding.lower() | |
# A safe list of encodings. Modern clients should only send ASCII or UTF-8. | |
# This list will not be extended further. An invalid encoding will leave the | |
# value quoted. | |
if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: | |
# invalid bytes are replaced during unquoting | |
value = unquote(value, encoding=encoding) | |
if len(value) >= 2 and value[0] == value[-1] == '"': | |
value = value[1:-1] | |
result[key] = value | |
return result | |
# | |
_parameter_re = re.compile( | |
r""" | |
# don't match multiple empty parts, that causes backtracking | |
\s*;\s* # find the part delimiter | |
(?: | |
([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars | |
= # equals, with no space on either side | |
( # value, token or quoted string | |
[\w!#$%&'*+\-.^`|~]+ # one or more token chars | |
| | |
"(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes | |
) | |
)? # optionally match key=value, to account for empty parts | |
""", | |
re.ASCII | re.VERBOSE, | |
) | |
# | |
_charset_value_re = re.compile( | |
r""" | |
([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty | |
[\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty | |
([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding | |
""", | |
re.ASCII | re.VERBOSE, | |
) | |
# | |
_continuation_re = re.compile(r"\*(\d+)$", re.ASCII) | |
def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]: | |
"""Parse a header that consists of a value with ``key=value`` parameters separated | |
by semicolons ``;``. For example, the ``Content-Type`` header. | |
.. code-block:: python | |
parse_options_header("text/html; charset=UTF-8") | |
('text/html', {'charset': 'UTF-8'}) | |
parse_options_header("") | |
("", {}) | |
This is the reverse of :func:`dump_options_header`. | |
This parses valid parameter parts as described in | |
`RFC 9110 <>`__. Invalid parts are | |
skipped. | |
This handles continuations and charsets as described in | |
`RFC 2231 <>`__, although not as | |
strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted, | |
otherwise the value remains quoted. | |
Clients may not be consistent in how they handle a quote character within a quoted | |
value. The `HTML Standard <>`__ | |
replaces it with ``%22`` in multipart form data. | |
`RFC 9110 <>`__ uses backslash | |
escapes in HTTP headers. Both are decoded to the ``"`` character. | |
Clients may not be consistent in how they handle non-ASCII characters. HTML | |
documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with | |
HTML character references, which can be decoded using :func:`html.unescape`. | |
:param value: The header value to parse. | |
:return: ``(value, options)``, where ``options`` is a dict | |
.. versionchanged:: 2.3 | |
Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted | |
values, are discarded instead of treating as ``None``. | |
.. versionchanged:: 2.3 | |
Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values. | |
.. versionchanged:: 2.3 | |
Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled. | |
.. versionchanged:: 2.2 | |
Option names are always converted to lowercase. | |
.. versionchanged:: 2.2 | |
The ``multiple`` parameter was removed. | |
.. versionchanged:: 0.15 | |
:rfc:`2231` parameter continuations are handled. | |
.. versionadded:: 0.5 | |
""" | |
if value is None: | |
return "", {} | |
value, _, rest = value.partition(";") | |
value = value.strip() | |
rest = rest.strip() | |
if not value or not rest: | |
# empty (invalid) value, or value without options | |
return value, {} | |
rest = f";{rest}" | |
options: dict[str, str] = {} | |
encoding: str | None = None | |
continued_encoding: str | None = None | |
for pk, pv in _parameter_re.findall(rest): | |
if not pk: | |
# empty or invalid part | |
continue | |
pk = pk.lower() | |
if pk[-1] == "*": | |
# key*=charset''value becomes key=value, where value is percent encoded | |
pk = pk[:-1] | |
match = _charset_value_re.match(pv) | |
if match: | |
# If there is a valid charset marker in the value, split it off. | |
encoding, pv = match.groups() | |
# This might be the empty string, handled next. | |
encoding = encoding.lower() | |
# No charset marker, or marker with empty charset value. | |
if not encoding: | |
encoding = continued_encoding | |
# A safe list of encodings. Modern clients should only send ASCII or UTF-8. | |
# This list will not be extended further. An invalid encoding will leave the | |
# value quoted. | |
if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: | |
# Continuation parts don't require their own charset marker. This is | |
# looser than the RFC, it will persist across different keys and allows | |
# changing the charset during a continuation. But this implementation is | |
# much simpler than tracking the full state. | |
continued_encoding = encoding | |
# invalid bytes are replaced during unquoting | |
pv = unquote(pv, encoding=encoding) | |
# Remove quotes. At this point the value cannot be empty or a single quote. | |
if pv[0] == pv[-1] == '"': | |
# HTTP headers use slash, multipart form data uses percent | |
pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"') | |
match = | |
if match: | |
# key*0=a; key*1=b becomes key=ab | |
pk = pk[: match.start()] | |
options[pk] = options.get(pk, "") + pv | |
else: | |
options[pk] = pv | |
return value, options | |
_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII) | |
_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") | |
def parse_accept_header(value: str | None) -> ds.Accept: ... | |
def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ... | |
def parse_accept_header( | |
value: str | None, cls: type[_TAnyAccept] | None = None | |
) -> _TAnyAccept: | |
"""Parse an ``Accept`` header according to | |
`RFC 9110 <>`__. | |
Returns an :class:`.Accept` instance, which can sort and inspect items based on | |
their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or | |
``Accept-Language``, pass the appropriate :class:`.Accept` subclass. | |
:param value: The header value to parse. | |
:param cls: The :class:`.Accept` class to wrap the result in. | |
:return: An instance of ``cls``. | |
.. versionchanged:: 2.3 | |
Parse according to RFC 9110. Items with invalid ``q`` values are skipped. | |
""" | |
if cls is None: | |
cls = t.cast(t.Type[_TAnyAccept], ds.Accept) | |
if not value: | |
return cls(None) | |
result = [] | |
for item in parse_list_header(value): | |
item, options = parse_options_header(item) | |
if "q" in options: | |
# pop q, remaining options are reconstructed | |
q_str = options.pop("q").strip() | |
if _q_value_re.fullmatch(q_str) is None: | |
# ignore an invalid q | |
continue | |
q = float(q_str) | |
if q < 0 or q > 1: | |
# ignore an invalid q | |
continue | |
else: | |
q = 1 | |
if options: | |
# reconstruct the media type with any options | |
item = dump_options_header(item, options) | |
result.append((item, q)) | |
return cls(result) | |
_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl") | |
def parse_cache_control_header( | |
value: str | None, | |
on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, | |
) -> ds.RequestCacheControl: ... | |
def parse_cache_control_header( | |
value: str | None, | |
on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, | |
cls: type[_TAnyCC] = ..., | |
) -> _TAnyCC: ... | |
def parse_cache_control_header( | |
value: str | None, | |
on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, | |
cls: type[_TAnyCC] | None = None, | |
) -> _TAnyCC: | |
"""Parse a cache control header. The RFC differs between response and | |
request cache control, this method does not. It's your responsibility | |
to not use the wrong control statements. | |
.. versionadded:: 0.5 | |
The `cls` was added. If not specified an immutable | |
:class:`~werkzeug.datastructures.RequestCacheControl` is returned. | |
:param value: a cache control header to be parsed. | |
:param on_update: an optional callable that is called every time a value | |
on the :class:`~werkzeug.datastructures.CacheControl` | |
object is changed. | |
:param cls: the class for the returned object. By default | |
:class:`~werkzeug.datastructures.RequestCacheControl` is used. | |
:return: a `cls` object. | |
""" | |
if cls is None: | |
cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl) | |
if not value: | |
return cls((), on_update) | |
return cls(parse_dict_header(value), on_update) | |
_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") | |
def parse_csp_header( | |
value: str | None, | |
on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, | |
) -> ds.ContentSecurityPolicy: ... | |
def parse_csp_header( | |
value: str | None, | |
on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, | |
cls: type[_TAnyCSP] = ..., | |
) -> _TAnyCSP: ... | |
def parse_csp_header( | |
value: str | None, | |
on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, | |
cls: type[_TAnyCSP] | None = None, | |
) -> _TAnyCSP: | |
"""Parse a Content Security Policy header. | |
.. versionadded:: 1.0.0 | |
Support for Content Security Policy headers was added. | |
:param value: a csp header to be parsed. | |
:param on_update: an optional callable that is called every time a value | |
on the object is changed. | |
:param cls: the class for the returned object. By default | |
:class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. | |
:return: a `cls` object. | |
""" | |
if cls is None: | |
cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy) | |
if value is None: | |
return cls((), on_update) | |
items = [] | |
for policy in value.split(";"): | |
policy = policy.strip() | |
# Ignore badly formatted policies (no space) | |
if " " in policy: | |
directive, value = policy.strip().split(" ", 1) | |
items.append((directive.strip(), value.strip())) | |
return cls(items, on_update) | |
def parse_set_header( | |
value: str | None, | |
on_update: t.Callable[[ds.HeaderSet], None] | None = None, | |
) -> ds.HeaderSet: | |
"""Parse a set-like header and return a | |
:class:`~werkzeug.datastructures.HeaderSet` object: | |
>>> hs = parse_set_header('token, "quoted value"') | |
The return value is an object that treats the items case-insensitively | |
and keeps the order of the items: | |
>>> 'TOKEN' in hs | |
True | |
>>> hs.index('quoted value') | |
1 | |
>>> hs | |
HeaderSet(['token', 'quoted value']) | |
To create a header from the :class:`HeaderSet` again, use the | |
:func:`dump_header` function. | |
:param value: a set header to be parsed. | |
:param on_update: an optional callable that is called every time a | |
value on the :class:`~werkzeug.datastructures.HeaderSet` | |
object is changed. | |
:return: a :class:`~werkzeug.datastructures.HeaderSet` | |
""" | |
if not value: | |
return ds.HeaderSet(None, on_update) | |
return ds.HeaderSet(parse_list_header(value), on_update) | |
def parse_if_range_header(value: str | None) -> ds.IfRange: | |
"""Parses an if-range header which can be an etag or a date. Returns | |
a :class:`~werkzeug.datastructures.IfRange` object. | |
.. versionchanged:: 2.0 | |
If the value represents a datetime, it is timezone-aware. | |
.. versionadded:: 0.7 | |
""" | |
if not value: | |
return ds.IfRange() | |
date = parse_date(value) | |
if date is not None: | |
return ds.IfRange(date=date) | |
# drop weakness information | |
return ds.IfRange(unquote_etag(value)[0]) | |
def parse_range_header( | |
value: str | None, make_inclusive: bool = True | |
) -> ds.Range | None: | |
"""Parses a range header into a :class:`~werkzeug.datastructures.Range` | |
object. If the header is missing or malformed `None` is returned. | |
`ranges` is a list of ``(start, stop)`` tuples where the ranges are | |
non-inclusive. | |
.. versionadded:: 0.7 | |
""" | |
if not value or "=" not in value: | |
return None | |
ranges = [] | |
last_end = 0 | |
units, rng = value.split("=", 1) | |
units = units.strip().lower() | |
for item in rng.split(","): | |
item = item.strip() | |
if "-" not in item: | |
return None | |
if item.startswith("-"): | |
if last_end < 0: | |
return None | |
try: | |
begin = _plain_int(item) | |
except ValueError: | |
return None | |
end = None | |
last_end = -1 | |
elif "-" in item: | |
begin_str, end_str = item.split("-", 1) | |
begin_str = begin_str.strip() | |
end_str = end_str.strip() | |
try: | |
begin = _plain_int(begin_str) | |
except ValueError: | |
return None | |
if begin < last_end or last_end < 0: | |
return None | |
if end_str: | |
try: | |
end = _plain_int(end_str) + 1 | |
except ValueError: | |
return None | |
if begin >= end: | |
return None | |
else: | |
end = None | |
last_end = end if end is not None else -1 | |
ranges.append((begin, end)) | |
return ds.Range(units, ranges) | |
def parse_content_range_header( | |
value: str | None, | |
on_update: t.Callable[[ds.ContentRange], None] | None = None, | |
) -> ds.ContentRange | None: | |
"""Parses a range header into a | |
:class:`~werkzeug.datastructures.ContentRange` object or `None` if | |
parsing is not possible. | |
.. versionadded:: 0.7 | |
:param value: a content range header to be parsed. | |
:param on_update: an optional callable that is called every time a value | |
on the :class:`~werkzeug.datastructures.ContentRange` | |
object is changed. | |
""" | |
if value is None: | |
return None | |
try: | |
units, rangedef = (value or "").strip().split(None, 1) | |
except ValueError: | |
return None | |
if "/" not in rangedef: | |
return None | |
rng, length_str = rangedef.split("/", 1) | |
if length_str == "*": | |
length = None | |
else: | |
try: | |
length = _plain_int(length_str) | |
except ValueError: | |
return None | |
if rng == "*": | |
if not is_byte_range_valid(None, None, length): | |
return None | |
return ds.ContentRange(units, None, None, length, on_update=on_update) | |
elif "-" not in rng: | |
return None | |
start_str, stop_str = rng.split("-", 1) | |
try: | |
start = _plain_int(start_str) | |
stop = _plain_int(stop_str) + 1 | |
except ValueError: | |
return None | |
if is_byte_range_valid(start, stop, length): | |
return ds.ContentRange(units, start, stop, length, on_update=on_update) | |
return None | |
def quote_etag(etag: str, weak: bool = False) -> str: | |
"""Quote an etag. | |
:param etag: the etag to quote. | |
:param weak: set to `True` to tag it "weak". | |
""" | |
if '"' in etag: | |
raise ValueError("invalid etag") | |
etag = f'"{etag}"' | |
if weak: | |
etag = f"W/{etag}" | |
return etag | |
def unquote_etag( | |
etag: str | None, | |
) -> tuple[str, bool] | tuple[None, None]: | |
"""Unquote a single etag: | |
>>> unquote_etag('W/"bar"') | |
('bar', True) | |
>>> unquote_etag('"bar"') | |
('bar', False) | |
:param etag: the etag identifier to unquote. | |
:return: a ``(etag, weak)`` tuple. | |
""" | |
if not etag: | |
return None, None | |
etag = etag.strip() | |
weak = False | |
if etag.startswith(("W/", "w/")): | |
weak = True | |
etag = etag[2:] | |
if etag[:1] == etag[-1:] == '"': | |
etag = etag[1:-1] | |
return etag, weak | |
def parse_etags(value: str | None) -> ds.ETags: | |
"""Parse an etag header. | |
:param value: the tag header to parse | |
:return: an :class:`~werkzeug.datastructures.ETags` object. | |
""" | |
if not value: | |
return ds.ETags() | |
strong = [] | |
weak = [] | |
end = len(value) | |
pos = 0 | |
while pos < end: | |
match = _etag_re.match(value, pos) | |
if match is None: | |
break | |
is_weak, quoted, raw = match.groups() | |
if raw == "*": | |
return ds.ETags(star_tag=True) | |
elif quoted: | |
raw = quoted | |
if is_weak: | |
weak.append(raw) | |
else: | |
strong.append(raw) | |
pos = match.end() | |
return ds.ETags(strong, weak) | |
def generate_etag(data: bytes) -> str: | |
"""Generate an etag for some data. | |
.. versionchanged:: 2.0 | |
Use SHA-1. MD5 may not be available in some environments. | |
""" | |
return sha1(data).hexdigest() | |
def parse_date(value: str | None) -> datetime | None: | |
"""Parse an :rfc:`2822` date into a timezone-aware | |
:class:`datetime.datetime` object, or ``None`` if parsing fails. | |
This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It | |
returns ``None`` if parsing fails instead of raising an exception, | |
and always returns a timezone-aware datetime object. If the string | |
doesn't have timezone information, it is assumed to be UTC. | |
:param value: A string with a supported date format. | |
.. versionchanged:: 2.0 | |
Return a timezone-aware datetime object. Use | |
``email.utils.parsedate_to_datetime``. | |
""" | |
if value is None: | |
return None | |
try: | |
dt = email.utils.parsedate_to_datetime(value) | |
except (TypeError, ValueError): | |
return None | |
if dt.tzinfo is None: | |
return dt.replace(tzinfo=timezone.utc) | |
return dt | |
def http_date( | |
timestamp: datetime | date | int | float | struct_time | None = None, | |
) -> str: | |
"""Format a datetime object or timestamp into an :rfc:`2822` date | |
string. | |
This is a wrapper for :func:`email.utils.format_datetime`. It | |
assumes naive datetime objects are in UTC instead of raising an | |
exception. | |
:param timestamp: The datetime or timestamp to format. Defaults to | |
the current time. | |
.. versionchanged:: 2.0 | |
Use ``email.utils.format_datetime``. Accept ``date`` objects. | |
""" | |
if isinstance(timestamp, date): | |
if not isinstance(timestamp, datetime): | |
# Assume plain date is midnight UTC. | |
timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) | |
else: | |
# Ensure datetime is timezone-aware. | |
timestamp = _dt_as_utc(timestamp) | |
return email.utils.format_datetime(timestamp, usegmt=True) | |
if isinstance(timestamp, struct_time): | |
timestamp = mktime(timestamp) | |
return email.utils.formatdate(timestamp, usegmt=True) | |
def parse_age(value: str | None = None) -> timedelta | None: | |
"""Parses a base-10 integer count of seconds into a timedelta. | |
If parsing fails, the return value is `None`. | |
:param value: a string consisting of an integer represented in base-10 | |
:return: a :class:`datetime.timedelta` object or `None`. | |
""" | |
if not value: | |
return None | |
try: | |
seconds = int(value) | |
except ValueError: | |
return None | |
if seconds < 0: | |
return None | |
try: | |
return timedelta(seconds=seconds) | |
except OverflowError: | |
return None | |
def dump_age(age: timedelta | int | None = None) -> str | None: | |
"""Formats the duration as a base-10 integer. | |
:param age: should be an integer number of seconds, | |
a :class:`datetime.timedelta` object, or, | |
if the age is unknown, `None` (default). | |
""" | |
if age is None: | |
return None | |
if isinstance(age, timedelta): | |
age = int(age.total_seconds()) | |
else: | |
age = int(age) | |
if age < 0: | |
raise ValueError("age cannot be negative") | |
return str(age) | |
def is_resource_modified( | |
environ: WSGIEnvironment, | |
etag: str | None = None, | |
data: bytes | None = None, | |
last_modified: datetime | str | None = None, | |
ignore_if_range: bool = True, | |
) -> bool: | |
"""Convenience method for conditional requests. | |
:param environ: the WSGI environment of the request to be checked. | |
:param etag: the etag for the response for comparison. | |
:param data: or alternatively the data of the response to automatically | |
generate an etag using :func:`generate_etag`. | |
:param last_modified: an optional date of the last modification. | |
:param ignore_if_range: If `False`, `If-Range` header will be taken into | |
account. | |
:return: `True` if the resource was modified, otherwise `False`. | |
.. versionchanged:: 2.0 | |
SHA-1 is used to generate an etag value for the data. MD5 may | |
not be available in some environments. | |
.. versionchanged:: 1.0.0 | |
The check is run for methods other than ``GET`` and ``HEAD``. | |
""" | |
return _sansio_http.is_resource_modified( | |
http_range=environ.get("HTTP_RANGE"), | |
http_if_range=environ.get("HTTP_IF_RANGE"), | |
http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), | |
http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), | |
http_if_match=environ.get("HTTP_IF_MATCH"), | |
etag=etag, | |
data=data, | |
last_modified=last_modified, | |
ignore_if_range=ignore_if_range, | |
) | |
def remove_entity_headers( | |
headers: ds.Headers | list[tuple[str, str]], | |
allowed: t.Iterable[str] = ("expires", "content-location"), | |
) -> None: | |
"""Remove all entity headers from a list or :class:`Headers` object. This | |
operation works in-place. `Expires` and `Content-Location` headers are | |
by default not removed. The reason for this is :rfc:`2616` section | |
10.3.5 which specifies some entity headers that should be sent. | |
.. versionchanged:: 0.5 | |
added `allowed` parameter. | |
:param headers: a list or :class:`Headers` object. | |
:param allowed: a list of headers that should still be allowed even though | |
they are entity headers. | |
""" | |
allowed = {x.lower() for x in allowed} | |
headers[:] = [ | |
(key, value) | |
for key, value in headers | |
if not is_entity_header(key) or key.lower() in allowed | |
] | |
def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None: | |
"""Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or | |
:class:`Headers` object. This operation works in-place. | |
.. versionadded:: 0.5 | |
:param headers: a list or :class:`Headers` object. | |
""" | |
headers[:] = [ | |
(key, value) for key, value in headers if not is_hop_by_hop_header(key) | |
] | |
def is_entity_header(header: str) -> bool: | |
"""Check if a header is an entity header. | |
.. versionadded:: 0.5 | |
:param header: the header to test. | |
:return: `True` if it's an entity header, `False` otherwise. | |
""" | |
return header.lower() in _entity_headers | |
def is_hop_by_hop_header(header: str) -> bool: | |
"""Check if a header is an HTTP/1.1 "Hop-by-Hop" header. | |
.. versionadded:: 0.5 | |
:param header: the header to test. | |
:return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. | |
""" | |
return header.lower() in _hop_by_hop_headers | |
def parse_cookie( | |
header: WSGIEnvironment | str | None, | |
cls: type[ds.MultiDict[str, str]] | None = None, | |
) -> ds.MultiDict[str, str]: | |
"""Parse a cookie from a string or WSGI environ. | |
The same key can be provided multiple times, the values are stored | |
in-order. The default :class:`MultiDict` will have the first value | |
first, and all values can be retrieved with | |
:meth:`MultiDict.getlist`. | |
:param header: The cookie header as a string, or a WSGI environ dict | |
with a ``HTTP_COOKIE`` key. | |
:param cls: A dict-like class to store the parsed cookies in. | |
Defaults to :class:`MultiDict`. | |
.. versionchanged:: 3.0 | |
Passing bytes, and the ``charset`` and ``errors`` parameters, were removed. | |
.. versionchanged:: 1.0 | |
Returns a :class:`MultiDict` instead of a ``TypeConversionDict``. | |
.. versionchanged:: 0.5 | |
Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls`` | |
parameter was added. | |
""" | |
if isinstance(header, dict): | |
cookie = header.get("HTTP_COOKIE") | |
else: | |
cookie = header | |
if cookie: | |
cookie = cookie.encode("latin1").decode() | |
return _sansio_http.parse_cookie(cookie=cookie, cls=cls) | |
_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A) | |
_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A) | |
_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"} | |
_cookie_slash_map.update( | |
(v.to_bytes(1, "big"), b"\\%03o" % v) | |
for v in [*range(0x20), *b",;", *range(0x7F, 256)] | |
) | |
def dump_cookie( | |
key: str, | |
value: str = "", | |
max_age: timedelta | int | None = None, | |
expires: str | datetime | int | float | None = None, | |
path: str | None = "/", | |
domain: str | None = None, | |
secure: bool = False, | |
httponly: bool = False, | |
sync_expires: bool = True, | |
max_size: int = 4093, | |
samesite: str | None = None, | |
) -> str: | |
"""Create a Set-Cookie header without the ``Set-Cookie`` prefix. | |
The return value is usually restricted to ascii as the vast majority | |
of values are properly escaped, but that is no guarantee. It's | |
tunneled through latin1 as required by :pep:`3333`. | |
The return value is not ASCII safe if the key contains unicode | |
characters. This is technically against the specification but | |
happens in the wild. It's strongly recommended to not use | |
non-ASCII values for the keys. | |
:param max_age: should be a number of seconds, or `None` (default) if | |
the cookie should last only as long as the client's | |
browser session. Additionally `timedelta` objects | |
are accepted, too. | |
:param expires: should be a `datetime` object or unix timestamp. | |
:param path: limits the cookie to a given path, per default it will | |
span the whole domain. | |
:param domain: Use this if you want to set a cross-domain cookie. For | |
example, ``domain=""`` will set a cookie | |
that is readable by the domain ````, | |
```` etc. Otherwise, a cookie will only | |
be readable by the domain that set it. | |
:param secure: The cookie will only be available via HTTPS | |
:param httponly: disallow JavaScript to access the cookie. This is an | |
extension to the cookie standard and probably not | |
supported by all browsers. | |
:param charset: the encoding for string values. | |
:param sync_expires: automatically set expires if max_age is defined | |
but expires not. | |
:param max_size: Warn if the final header value exceeds this size. The | |
default, 4093, should be safely `supported by most browsers | |
<cookie_>`_. Set to 0 to disable this check. | |
:param samesite: Limits the scope of the cookie such that it will | |
only be attached to requests if those requests are same-site. | |
.. _`cookie`: | |
.. versionchanged:: 3.0 | |
Passing bytes, and the ``charset`` parameter, were removed. | |
.. versionchanged:: 2.3.3 | |
The ``path`` parameter is ``/`` by default. | |
.. versionchanged:: 2.3.1 | |
The value allows more characters without quoting. | |
.. versionchanged:: 2.3 | |
``localhost`` and other names without a dot are allowed for the domain. A | |
leading dot is ignored. | |
.. versionchanged:: 2.3 | |
The ``path`` parameter is ``None`` by default. | |
.. versionchanged:: 1.0.0 | |
The string ``'None'`` is accepted for ``samesite``. | |
""" | |
if path is not None: | |
# safe = | |
# as well as percent for things that are already quoted | |
# excluding semicolon since it's part of the header syntax | |
path = quote(path, safe="%!$&'()*+,/:=@") | |
if domain: | |
domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii") | |
if isinstance(max_age, timedelta): | |
max_age = int(max_age.total_seconds()) | |
if expires is not None: | |
if not isinstance(expires, str): | |
expires = http_date(expires) | |
elif max_age is not None and sync_expires: | |
expires = http_date( + max_age) | |
if samesite is not None: | |
samesite = samesite.title() | |
if samesite not in {"Strict", "Lax", "None"}: | |
raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") | |
# Quote value if it contains characters not allowed by RFC 6265. Slash-escape with | |
# three octal digits, which matches http.cookies, although the RFC suggests base64. | |
if not _cookie_no_quote_re.fullmatch(value): | |
# Work with bytes here, since a UTF-8 character could be multiple bytes. | |
value = _cookie_slash_re.sub( | |
lambda m: _cookie_slash_map[], value.encode() | |
).decode("ascii") | |
value = f'"{value}"' | |
# Send a non-ASCII key as mojibake. Everything else should already be ASCII. | |
# TODO Remove encoding dance, it seems like clients accept UTF-8 keys | |
buf = [f"{key.encode().decode('latin1')}={value}"] | |
for k, v in ( | |
("Domain", domain), | |
("Expires", expires), | |
("Max-Age", max_age), | |
("Secure", secure), | |
("HttpOnly", httponly), | |
("Path", path), | |
("SameSite", samesite), | |
): | |
if v is None or v is False: | |
continue | |
if v is True: | |
buf.append(k) | |
continue | |
buf.append(f"{k}={v}") | |
rv = "; ".join(buf) | |
# Warn if the final value of the cookie is larger than the limit. If the cookie is | |
# too large, then it may be silently ignored by the browser, which can be quite hard | |
# to debug. | |
cookie_size = len(rv) | |
if max_size and cookie_size > max_size: | |
value_size = len(value) | |
warnings.warn( | |
f"The '{key}' cookie is too large: the value was {value_size} bytes but the" | |
f" header required {cookie_size - value_size} extra bytes. The final size" | |
f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" | |
" silently ignore cookies larger than this.", | |
stacklevel=2, | |
) | |
return rv | |
def is_byte_range_valid( | |
start: int | None, stop: int | None, length: int | None | |
) -> bool: | |
"""Checks if a given byte content range is valid for the given length. | |
.. versionadded:: 0.7 | |
""" | |
if (start is None) != (stop is None): | |
return False | |
elif start is None: | |
return length is None or length >= 0 | |
elif length is None: | |
return 0 <= start < stop # type: ignore | |
elif start >= stop: # type: ignore | |
return False | |
return 0 <= start < length | |
# circular dependencies | |
from . import datastructures as ds | |
from .sansio import http as _sansio_http | |