File size: 8,047 Bytes
4ae0b03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
from __future__ import annotations
import inspect
import warnings
from json import dumps as json_dumps
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Iterable,
Iterator,
Mapping,
)
from urllib.parse import urlencode
from ._exceptions import StreamClosed, StreamConsumed
from ._multipart import MultipartStream
from ._types import (
AsyncByteStream,
RequestContent,
RequestData,
RequestFiles,
ResponseContent,
SyncByteStream,
)
from ._utils import peek_filelike_length, primitive_value_to_str
class ByteStream(AsyncByteStream, SyncByteStream):
def __init__(self, stream: bytes) -> None:
self._stream = stream
def __iter__(self) -> Iterator[bytes]:
yield self._stream
async def __aiter__(self) -> AsyncIterator[bytes]:
yield self._stream
class IteratorByteStream(SyncByteStream):
CHUNK_SIZE = 65_536
def __init__(self, stream: Iterable[bytes]) -> None:
self._stream = stream
self._is_stream_consumed = False
self._is_generator = inspect.isgenerator(stream)
def __iter__(self) -> Iterator[bytes]:
if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
if hasattr(self._stream, "read"):
# File-like interfaces should use 'read' directly.
chunk = self._stream.read(self.CHUNK_SIZE)
while chunk:
yield chunk
chunk = self._stream.read(self.CHUNK_SIZE)
else:
# Otherwise iterate.
for part in self._stream:
yield part
class AsyncIteratorByteStream(AsyncByteStream):
CHUNK_SIZE = 65_536
def __init__(self, stream: AsyncIterable[bytes]) -> None:
self._stream = stream
self._is_stream_consumed = False
self._is_generator = inspect.isasyncgen(stream)
async def __aiter__(self) -> AsyncIterator[bytes]:
if self._is_stream_consumed and self._is_generator:
raise StreamConsumed()
self._is_stream_consumed = True
if hasattr(self._stream, "aread"):
# File-like interfaces should use 'aread' directly.
chunk = await self._stream.aread(self.CHUNK_SIZE)
while chunk:
yield chunk
chunk = await self._stream.aread(self.CHUNK_SIZE)
else:
# Otherwise iterate.
async for part in self._stream:
yield part
class UnattachedStream(AsyncByteStream, SyncByteStream):
"""
If a request or response is serialized using pickle, then it is no longer
attached to a stream for I/O purposes. Any stream operations should result
in `httpx.StreamClosed`.
"""
def __iter__(self) -> Iterator[bytes]:
raise StreamClosed()
async def __aiter__(self) -> AsyncIterator[bytes]:
raise StreamClosed()
yield b"" # pragma: no cover
def encode_content(
content: str | bytes | Iterable[bytes] | AsyncIterable[bytes],
) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
if isinstance(content, (bytes, str)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = len(body)
headers = {"Content-Length": str(content_length)} if body else {}
return headers, ByteStream(body)
elif isinstance(content, Iterable) and not isinstance(content, dict):
# `not isinstance(content, dict)` is a bit oddly specific, but it
# catches a case that's easy for users to make in error, and would
# otherwise pass through here, like any other bytes-iterable,
# because `dict` happens to be iterable. See issue #2491.
content_length_or_none = peek_filelike_length(content)
if content_length_or_none is None:
headers = {"Transfer-Encoding": "chunked"}
else:
headers = {"Content-Length": str(content_length_or_none)}
return headers, IteratorByteStream(content) # type: ignore
elif isinstance(content, AsyncIterable):
headers = {"Transfer-Encoding": "chunked"}
return headers, AsyncIteratorByteStream(content)
raise TypeError(f"Unexpected type for 'content', {type(content)!r}")
def encode_urlencoded_data(
data: RequestData,
) -> tuple[dict[str, str], ByteStream]:
plain_data = []
for key, value in data.items():
if isinstance(value, (list, tuple)):
plain_data.extend([(key, primitive_value_to_str(item)) for item in value])
else:
plain_data.append((key, primitive_value_to_str(value)))
body = urlencode(plain_data, doseq=True).encode("utf-8")
content_length = str(len(body))
content_type = "application/x-www-form-urlencoded"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_multipart_data(
data: RequestData, files: RequestFiles, boundary: bytes | None
) -> tuple[dict[str, str], MultipartStream]:
multipart = MultipartStream(data=data, files=files, boundary=boundary)
headers = multipart.get_headers()
return headers, multipart
def encode_text(text: str) -> tuple[dict[str, str], ByteStream]:
body = text.encode("utf-8")
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_html(html: str) -> tuple[dict[str, str], ByteStream]:
body = html.encode("utf-8")
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_json(json: Any) -> tuple[dict[str, str], ByteStream]:
body = json_dumps(json).encode("utf-8")
content_length = str(len(body))
content_type = "application/json"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, ByteStream(body)
def encode_request(
content: RequestContent | None = None,
data: RequestData | None = None,
files: RequestFiles | None = None,
json: Any | None = None,
boundary: bytes | None = None,
) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
"""
if data is not None and not isinstance(data, Mapping):
# We prefer to separate `content=<bytes|str|byte iterator|bytes aiterator>`
# for raw request content, and `data=<form data>` for url encoded or
# multipart form content.
#
# However for compat with requests, we *do* still support
# `data=<bytes...>` usages. We deal with that case here, treating it
# as if `content=<...>` had been supplied instead.
message = "Use 'content=<...>' to upload raw bytes/text content."
warnings.warn(message, DeprecationWarning)
return encode_content(data)
if content is not None:
return encode_content(content)
elif files:
return encode_multipart_data(data or {}, files, boundary)
elif data:
return encode_urlencoded_data(data)
elif json is not None:
return encode_json(json)
return {}, ByteStream(b"")
def encode_response(
content: ResponseContent | None = None,
text: str | None = None,
html: str | None = None,
json: Any | None = None,
) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
"""
if content is not None:
return encode_content(content)
elif text is not None:
return encode_text(text)
elif html is not None:
return encode_html(html)
elif json is not None:
return encode_json(json)
return {}, ByteStream(b"")
|