Spaces:
Running
Running
from __future__ import annotations | |
import io | |
import typing | |
from base64 import b64encode | |
from enum import Enum | |
from ..exceptions import UnrewindableBodyError | |
from .util import to_bytes | |
if typing.TYPE_CHECKING: | |
from typing import Final | |
# Pass as a value within ``headers`` to skip | |
# emitting some HTTP headers that are added automatically. | |
# The only headers that are supported are ``Accept-Encoding``, | |
# ``Host``, and ``User-Agent``. | |
SKIP_HEADER = "@@@SKIP_HEADER@@@" | |
SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) | |
ACCEPT_ENCODING = "gzip,deflate" | |
try: | |
try: | |
import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 | |
except ImportError: | |
import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 | |
except ImportError: | |
pass | |
else: | |
ACCEPT_ENCODING += ",br" | |
try: | |
import zstandard as _unused_module_zstd # noqa: F401 | |
except ImportError: | |
pass | |
else: | |
ACCEPT_ENCODING += ",zstd" | |
class _TYPE_FAILEDTELL(Enum): | |
token = 0 | |
_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token | |
_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] | |
# When sending a request with these methods we aren't expecting | |
# a body so don't need to set an explicit 'Content-Length: 0' | |
# The reason we do this in the negative instead of tracking methods | |
# which 'should' have a body is because unknown methods should be | |
# treated as if they were 'POST' which *does* expect a body. | |
_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} | |
def make_headers( | |
keep_alive: bool | None = None, | |
accept_encoding: bool | list[str] | str | None = None, | |
user_agent: str | None = None, | |
basic_auth: str | None = None, | |
proxy_basic_auth: str | None = None, | |
disable_cache: bool | None = None, | |
) -> dict[str, str]: | |
""" | |
Shortcuts for generating request headers. | |
:param keep_alive: | |
If ``True``, adds 'connection: keep-alive' header. | |
:param accept_encoding: | |
Can be a boolean, list, or string. | |
``True`` translates to 'gzip,deflate'. If the dependencies for | |
Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or Zstandard | |
(the ``zstandard`` package) algorithms are installed, then their encodings are | |
included in the string ('br' and 'zstd', respectively). | |
List will get joined by comma. | |
String will be used as provided. | |
:param user_agent: | |
String representing the user-agent you want, such as | |
"python-urllib3/0.6" | |
:param basic_auth: | |
Colon-separated username:password string for 'authorization: basic ...' | |
auth header. | |
:param proxy_basic_auth: | |
Colon-separated username:password string for 'proxy-authorization: basic ...' | |
auth header. | |
:param disable_cache: | |
If ``True``, adds 'cache-control: no-cache' header. | |
Example: | |
.. code-block:: python | |
import urllib3 | |
print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) | |
# {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} | |
print(urllib3.util.make_headers(accept_encoding=True)) | |
# {'accept-encoding': 'gzip,deflate'} | |
""" | |
headers: dict[str, str] = {} | |
if accept_encoding: | |
if isinstance(accept_encoding, str): | |
pass | |
elif isinstance(accept_encoding, list): | |
accept_encoding = ",".join(accept_encoding) | |
else: | |
accept_encoding = ACCEPT_ENCODING | |
headers["accept-encoding"] = accept_encoding | |
if user_agent: | |
headers["user-agent"] = user_agent | |
if keep_alive: | |
headers["connection"] = "keep-alive" | |
if basic_auth: | |
headers["authorization"] = ( | |
f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" | |
) | |
if proxy_basic_auth: | |
headers["proxy-authorization"] = ( | |
f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" | |
) | |
if disable_cache: | |
headers["cache-control"] = "no-cache" | |
return headers | |
def set_file_position( | |
body: typing.Any, pos: _TYPE_BODY_POSITION | None | |
) -> _TYPE_BODY_POSITION | None: | |
""" | |
If a position is provided, move file to that point. | |
Otherwise, we'll attempt to record a position for future use. | |
""" | |
if pos is not None: | |
rewind_body(body, pos) | |
elif getattr(body, "tell", None) is not None: | |
try: | |
pos = body.tell() | |
except OSError: | |
# This differentiates from None, allowing us to catch | |
# a failed `tell()` later when trying to rewind the body. | |
pos = _FAILEDTELL | |
return pos | |
def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: | |
""" | |
Attempt to rewind body to a certain position. | |
Primarily used for request redirects and retries. | |
:param body: | |
File-like object that supports seek. | |
:param int pos: | |
Position to seek to in file. | |
""" | |
body_seek = getattr(body, "seek", None) | |
if body_seek is not None and isinstance(body_pos, int): | |
try: | |
body_seek(body_pos) | |
except OSError as e: | |
raise UnrewindableBodyError( | |
"An error occurred when rewinding request body for redirect/retry." | |
) from e | |
elif body_pos is _FAILEDTELL: | |
raise UnrewindableBodyError( | |
"Unable to record file position for rewinding " | |
"request body during a redirect/retry." | |
) | |
else: | |
raise ValueError( | |
f"body_pos must be of type integer, instead it was {type(body_pos)}." | |
) | |
class ChunksAndContentLength(typing.NamedTuple): | |
chunks: typing.Iterable[bytes] | None | |
content_length: int | None | |
def body_to_chunks( | |
body: typing.Any | None, method: str, blocksize: int | |
) -> ChunksAndContentLength: | |
"""Takes the HTTP request method, body, and blocksize and | |
transforms them into an iterable of chunks to pass to | |
socket.sendall() and an optional 'Content-Length' header. | |
A 'Content-Length' of 'None' indicates the length of the body | |
can't be determined so should use 'Transfer-Encoding: chunked' | |
for framing instead. | |
""" | |
chunks: typing.Iterable[bytes] | None | |
content_length: int | None | |
# No body, we need to make a recommendation on 'Content-Length' | |
# based on whether that request method is expected to have | |
# a body or not. | |
if body is None: | |
chunks = None | |
if method.upper() not in _METHODS_NOT_EXPECTING_BODY: | |
content_length = 0 | |
else: | |
content_length = None | |
# Bytes or strings become bytes | |
elif isinstance(body, (str, bytes)): | |
chunks = (to_bytes(body),) | |
content_length = len(chunks[0]) | |
# File-like object, TODO: use seek() and tell() for length? | |
elif hasattr(body, "read"): | |
def chunk_readable() -> typing.Iterable[bytes]: | |
nonlocal body, blocksize | |
encode = isinstance(body, io.TextIOBase) | |
while True: | |
datablock = body.read(blocksize) | |
if not datablock: | |
break | |
if encode: | |
datablock = datablock.encode("utf-8") | |
yield datablock | |
chunks = chunk_readable() | |
content_length = None | |
# Otherwise we need to start checking via duck-typing. | |
else: | |
try: | |
# Check if the body implements the buffer API. | |
mv = memoryview(body) | |
except TypeError: | |
try: | |
# Check if the body is an iterable | |
chunks = iter(body) | |
content_length = None | |
except TypeError: | |
raise TypeError( | |
f"'body' must be a bytes-like object, file-like " | |
f"object, or iterable. Instead was {body!r}" | |
) from None | |
else: | |
# Since it implements the buffer API can be passed directly to socket.sendall() | |
chunks = (body,) | |
content_length = mv.nbytes | |
return ChunksAndContentLength(chunks=chunks, content_length=content_length) | |