| | import asyncio |
| | import collections.abc |
| | import datetime |
| | import enum |
| | import json |
| | import math |
| | import time |
| | import warnings |
| | import zlib |
| | from concurrent.futures import Executor |
| | from http import HTTPStatus |
| | from http.cookies import SimpleCookie |
| | from typing import ( |
| | TYPE_CHECKING, |
| | Any, |
| | Dict, |
| | Iterator, |
| | MutableMapping, |
| | Optional, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | from multidict import CIMultiDict, istr |
| |
|
| | from . import hdrs, payload |
| | from .abc import AbstractStreamWriter |
| | from .compression_utils import ZLibCompressor |
| | from .helpers import ( |
| | ETAG_ANY, |
| | QUOTED_ETAG_RE, |
| | ETag, |
| | HeadersMixin, |
| | must_be_empty_body, |
| | parse_http_date, |
| | rfc822_formatted_time, |
| | sentinel, |
| | should_remove_content_length, |
| | validate_etag_value, |
| | ) |
| | from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 |
| | from .payload import Payload |
| | from .typedefs import JSONEncoder, LooseHeaders |
| |
|
| | REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus} |
| | LARGE_BODY_SIZE = 1024**2 |
| |
|
| | __all__ = ("ContentCoding", "StreamResponse", "Response", "json_response") |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | from .web_request import BaseRequest |
| |
|
| | BaseClass = MutableMapping[str, Any] |
| | else: |
| | BaseClass = collections.abc.MutableMapping |
| |
|
| |
|
| | |
| | class ContentCoding(enum.Enum): |
| | |
| | |
| | |
| | |
| | deflate = "deflate" |
| | gzip = "gzip" |
| | identity = "identity" |
| |
|
| |
|
| | CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding} |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | class StreamResponse(BaseClass, HeadersMixin): |
| |
|
| | _body: Union[None, bytes, bytearray, Payload] |
| | _length_check = True |
| | _body = None |
| | _keep_alive: Optional[bool] = None |
| | _chunked: bool = False |
| | _compression: bool = False |
| | _compression_strategy: int = zlib.Z_DEFAULT_STRATEGY |
| | _compression_force: Optional[ContentCoding] = None |
| | _req: Optional["BaseRequest"] = None |
| | _payload_writer: Optional[AbstractStreamWriter] = None |
| | _eof_sent: bool = False |
| | _must_be_empty_body: Optional[bool] = None |
| | _body_length = 0 |
| | _cookies: Optional[SimpleCookie] = None |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | status: int = 200, |
| | reason: Optional[str] = None, |
| | headers: Optional[LooseHeaders] = None, |
| | _real_headers: Optional[CIMultiDict[str]] = None, |
| | ) -> None: |
| | """Initialize a new stream response object. |
| | |
| | _real_headers is an internal parameter used to pass a pre-populated |
| | headers object. It is used by the `Response` class to avoid copying |
| | the headers when creating a new response object. It is not intended |
| | to be used by external code. |
| | """ |
| | self._state: Dict[str, Any] = {} |
| |
|
| | if _real_headers is not None: |
| | self._headers = _real_headers |
| | elif headers is not None: |
| | self._headers: CIMultiDict[str] = CIMultiDict(headers) |
| | else: |
| | self._headers = CIMultiDict() |
| |
|
| | self._set_status(status, reason) |
| |
|
| | @property |
| | def prepared(self) -> bool: |
| | return self._eof_sent or self._payload_writer is not None |
| |
|
| | @property |
| | def task(self) -> "Optional[asyncio.Task[None]]": |
| | if self._req: |
| | return self._req.task |
| | else: |
| | return None |
| |
|
| | @property |
| | def status(self) -> int: |
| | return self._status |
| |
|
| | @property |
| | def chunked(self) -> bool: |
| | return self._chunked |
| |
|
| | @property |
| | def compression(self) -> bool: |
| | return self._compression |
| |
|
| | @property |
| | def reason(self) -> str: |
| | return self._reason |
| |
|
| | def set_status( |
| | self, |
| | status: int, |
| | reason: Optional[str] = None, |
| | ) -> None: |
| | assert ( |
| | not self.prepared |
| | ), "Cannot change the response status code after the headers have been sent" |
| | self._set_status(status, reason) |
| |
|
| | def _set_status(self, status: int, reason: Optional[str]) -> None: |
| | self._status = int(status) |
| | if reason is None: |
| | reason = REASON_PHRASES.get(self._status, "") |
| | elif "\n" in reason: |
| | raise ValueError("Reason cannot contain \\n") |
| | self._reason = reason |
| |
|
| | @property |
| | def keep_alive(self) -> Optional[bool]: |
| | return self._keep_alive |
| |
|
| | def force_close(self) -> None: |
| | self._keep_alive = False |
| |
|
| | @property |
| | def body_length(self) -> int: |
| | return self._body_length |
| |
|
| | @property |
| | def output_length(self) -> int: |
| | warnings.warn("output_length is deprecated", DeprecationWarning) |
| | assert self._payload_writer |
| | return self._payload_writer.buffer_size |
| |
|
| | def enable_chunked_encoding(self, chunk_size: Optional[int] = None) -> None: |
| | """Enables automatic chunked transfer encoding.""" |
| | if hdrs.CONTENT_LENGTH in self._headers: |
| | raise RuntimeError( |
| | "You can't enable chunked encoding when a content length is set" |
| | ) |
| | if chunk_size is not None: |
| | warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) |
| | self._chunked = True |
| |
|
| | def enable_compression( |
| | self, |
| | force: Optional[Union[bool, ContentCoding]] = None, |
| | strategy: int = zlib.Z_DEFAULT_STRATEGY, |
| | ) -> None: |
| | """Enables response compression encoding.""" |
| | |
| | if isinstance(force, bool): |
| | force = ContentCoding.deflate if force else ContentCoding.identity |
| | warnings.warn( |
| | "Using boolean for force is deprecated #3318", DeprecationWarning |
| | ) |
| | elif force is not None: |
| | assert isinstance( |
| | force, ContentCoding |
| | ), "force should one of None, bool or ContentEncoding" |
| |
|
| | self._compression = True |
| | self._compression_force = force |
| | self._compression_strategy = strategy |
| |
|
| | @property |
| | def headers(self) -> "CIMultiDict[str]": |
| | return self._headers |
| |
|
| | @property |
| | def cookies(self) -> SimpleCookie: |
| | if self._cookies is None: |
| | self._cookies = SimpleCookie() |
| | return self._cookies |
| |
|
| | def set_cookie( |
| | self, |
| | name: str, |
| | value: str, |
| | *, |
| | expires: Optional[str] = None, |
| | domain: Optional[str] = None, |
| | max_age: Optional[Union[int, str]] = None, |
| | path: str = "/", |
| | secure: Optional[bool] = None, |
| | httponly: Optional[bool] = None, |
| | version: Optional[str] = None, |
| | samesite: Optional[str] = None, |
| | ) -> None: |
| | """Set or update response cookie. |
| | |
| | Sets new cookie or updates existent with new value. |
| | Also updates only those params which are not None. |
| | """ |
| | if self._cookies is None: |
| | self._cookies = SimpleCookie() |
| |
|
| | self._cookies[name] = value |
| | c = self._cookies[name] |
| |
|
| | if expires is not None: |
| | c["expires"] = expires |
| | elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT": |
| | del c["expires"] |
| |
|
| | if domain is not None: |
| | c["domain"] = domain |
| |
|
| | if max_age is not None: |
| | c["max-age"] = str(max_age) |
| | elif "max-age" in c: |
| | del c["max-age"] |
| |
|
| | c["path"] = path |
| |
|
| | if secure is not None: |
| | c["secure"] = secure |
| | if httponly is not None: |
| | c["httponly"] = httponly |
| | if version is not None: |
| | c["version"] = version |
| | if samesite is not None: |
| | c["samesite"] = samesite |
| |
|
| | def del_cookie( |
| | self, |
| | name: str, |
| | *, |
| | domain: Optional[str] = None, |
| | path: str = "/", |
| | secure: Optional[bool] = None, |
| | httponly: Optional[bool] = None, |
| | samesite: Optional[str] = None, |
| | ) -> None: |
| | """Delete cookie. |
| | |
| | Creates new empty expired cookie. |
| | """ |
| | |
| | if self._cookies is not None: |
| | self._cookies.pop(name, None) |
| | self.set_cookie( |
| | name, |
| | "", |
| | max_age=0, |
| | expires="Thu, 01 Jan 1970 00:00:00 GMT", |
| | domain=domain, |
| | path=path, |
| | secure=secure, |
| | httponly=httponly, |
| | samesite=samesite, |
| | ) |
| |
|
| | @property |
| | def content_length(self) -> Optional[int]: |
| | |
| | return super().content_length |
| |
|
| | @content_length.setter |
| | def content_length(self, value: Optional[int]) -> None: |
| | if value is not None: |
| | value = int(value) |
| | if self._chunked: |
| | raise RuntimeError( |
| | "You can't set content length when chunked encoding is enable" |
| | ) |
| | self._headers[hdrs.CONTENT_LENGTH] = str(value) |
| | else: |
| | self._headers.pop(hdrs.CONTENT_LENGTH, None) |
| |
|
| | @property |
| | def content_type(self) -> str: |
| | |
| | return super().content_type |
| |
|
| | @content_type.setter |
| | def content_type(self, value: str) -> None: |
| | self.content_type |
| | self._content_type = str(value) |
| | self._generate_content_type_header() |
| |
|
| | @property |
| | def charset(self) -> Optional[str]: |
| | |
| | return super().charset |
| |
|
| | @charset.setter |
| | def charset(self, value: Optional[str]) -> None: |
| | ctype = self.content_type |
| | if ctype == "application/octet-stream": |
| | raise RuntimeError( |
| | "Setting charset for application/octet-stream " |
| | "doesn't make sense, setup content_type first" |
| | ) |
| | assert self._content_dict is not None |
| | if value is None: |
| | self._content_dict.pop("charset", None) |
| | else: |
| | self._content_dict["charset"] = str(value).lower() |
| | self._generate_content_type_header() |
| |
|
| | @property |
| | def last_modified(self) -> Optional[datetime.datetime]: |
| | """The value of Last-Modified HTTP header, or None. |
| | |
| | This header is represented as a `datetime` object. |
| | """ |
| | return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) |
| |
|
| | @last_modified.setter |
| | def last_modified( |
| | self, value: Optional[Union[int, float, datetime.datetime, str]] |
| | ) -> None: |
| | if value is None: |
| | self._headers.pop(hdrs.LAST_MODIFIED, None) |
| | elif isinstance(value, (int, float)): |
| | self._headers[hdrs.LAST_MODIFIED] = time.strftime( |
| | "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) |
| | ) |
| | elif isinstance(value, datetime.datetime): |
| | self._headers[hdrs.LAST_MODIFIED] = time.strftime( |
| | "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() |
| | ) |
| | elif isinstance(value, str): |
| | self._headers[hdrs.LAST_MODIFIED] = value |
| |
|
| | @property |
| | def etag(self) -> Optional[ETag]: |
| | quoted_value = self._headers.get(hdrs.ETAG) |
| | if not quoted_value: |
| | return None |
| | elif quoted_value == ETAG_ANY: |
| | return ETag(value=ETAG_ANY) |
| | match = QUOTED_ETAG_RE.fullmatch(quoted_value) |
| | if not match: |
| | return None |
| | is_weak, value = match.group(1, 2) |
| | return ETag( |
| | is_weak=bool(is_weak), |
| | value=value, |
| | ) |
| |
|
| | @etag.setter |
| | def etag(self, value: Optional[Union[ETag, str]]) -> None: |
| | if value is None: |
| | self._headers.pop(hdrs.ETAG, None) |
| | elif (isinstance(value, str) and value == ETAG_ANY) or ( |
| | isinstance(value, ETag) and value.value == ETAG_ANY |
| | ): |
| | self._headers[hdrs.ETAG] = ETAG_ANY |
| | elif isinstance(value, str): |
| | validate_etag_value(value) |
| | self._headers[hdrs.ETAG] = f'"{value}"' |
| | elif isinstance(value, ETag) and isinstance(value.value, str): |
| | validate_etag_value(value.value) |
| | hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' |
| | self._headers[hdrs.ETAG] = hdr_value |
| | else: |
| | raise ValueError( |
| | f"Unsupported etag type: {type(value)}. " |
| | f"etag must be str, ETag or None" |
| | ) |
| |
|
| | def _generate_content_type_header( |
| | self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE |
| | ) -> None: |
| | assert self._content_dict is not None |
| | assert self._content_type is not None |
| | params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) |
| | if params: |
| | ctype = self._content_type + "; " + params |
| | else: |
| | ctype = self._content_type |
| | self._headers[CONTENT_TYPE] = ctype |
| |
|
| | async def _do_start_compression(self, coding: ContentCoding) -> None: |
| | if coding is ContentCoding.identity: |
| | return |
| | assert self._payload_writer is not None |
| | self._headers[hdrs.CONTENT_ENCODING] = coding.value |
| | self._payload_writer.enable_compression( |
| | coding.value, self._compression_strategy |
| | ) |
| | |
| | |
| | self._headers.popall(hdrs.CONTENT_LENGTH, None) |
| |
|
| | async def _start_compression(self, request: "BaseRequest") -> None: |
| | if self._compression_force: |
| | await self._do_start_compression(self._compression_force) |
| | return |
| | |
| | |
| | accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() |
| | for value, coding in CONTENT_CODINGS.items(): |
| | if value in accept_encoding: |
| | await self._do_start_compression(coding) |
| | return |
| |
|
| | async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]: |
| | if self._eof_sent: |
| | return None |
| | if self._payload_writer is not None: |
| | return self._payload_writer |
| | self._must_be_empty_body = must_be_empty_body(request.method, self.status) |
| | return await self._start(request) |
| |
|
| | async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: |
| | self._req = request |
| | writer = self._payload_writer = request._payload_writer |
| |
|
| | await self._prepare_headers() |
| | await request._prepare_hook(self) |
| | await self._write_headers() |
| |
|
| | return writer |
| |
|
| | async def _prepare_headers(self) -> None: |
| | request = self._req |
| | assert request is not None |
| | writer = self._payload_writer |
| | assert writer is not None |
| | keep_alive = self._keep_alive |
| | if keep_alive is None: |
| | keep_alive = request.keep_alive |
| | self._keep_alive = keep_alive |
| |
|
| | version = request.version |
| |
|
| | headers = self._headers |
| | if self._cookies: |
| | for cookie in self._cookies.values(): |
| | value = cookie.output(header="")[1:] |
| | headers.add(hdrs.SET_COOKIE, value) |
| |
|
| | if self._compression: |
| | await self._start_compression(request) |
| |
|
| | if self._chunked: |
| | if version != HttpVersion11: |
| | raise RuntimeError( |
| | "Using chunked encoding is forbidden " |
| | "for HTTP/{0.major}.{0.minor}".format(request.version) |
| | ) |
| | if not self._must_be_empty_body: |
| | writer.enable_chunking() |
| | headers[hdrs.TRANSFER_ENCODING] = "chunked" |
| | elif self._length_check: |
| | writer.length = self.content_length |
| | if writer.length is None: |
| | if version >= HttpVersion11: |
| | if not self._must_be_empty_body: |
| | writer.enable_chunking() |
| | headers[hdrs.TRANSFER_ENCODING] = "chunked" |
| | elif not self._must_be_empty_body: |
| | keep_alive = False |
| |
|
| | |
| | |
| | if self._must_be_empty_body: |
| | if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( |
| | request.method, self.status |
| | ): |
| | del headers[hdrs.CONTENT_LENGTH] |
| | |
| | |
| | if hdrs.TRANSFER_ENCODING in headers: |
| | del headers[hdrs.TRANSFER_ENCODING] |
| | elif (writer.length if self._length_check else self.content_length) != 0: |
| | |
| | headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") |
| | headers.setdefault(hdrs.DATE, rfc822_formatted_time()) |
| | headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) |
| |
|
| | |
| | if hdrs.CONNECTION not in headers: |
| | if keep_alive: |
| | if version == HttpVersion10: |
| | headers[hdrs.CONNECTION] = "keep-alive" |
| | elif version == HttpVersion11: |
| | headers[hdrs.CONNECTION] = "close" |
| |
|
| | async def _write_headers(self) -> None: |
| | request = self._req |
| | assert request is not None |
| | writer = self._payload_writer |
| | assert writer is not None |
| | |
| | version = request.version |
| | status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}" |
| | await writer.write_headers(status_line, self._headers) |
| |
|
| | async def write(self, data: Union[bytes, bytearray, memoryview]) -> None: |
| | assert isinstance( |
| | data, (bytes, bytearray, memoryview) |
| | ), "data argument must be byte-ish (%r)" % type(data) |
| |
|
| | if self._eof_sent: |
| | raise RuntimeError("Cannot call write() after write_eof()") |
| | if self._payload_writer is None: |
| | raise RuntimeError("Cannot call write() before prepare()") |
| |
|
| | await self._payload_writer.write(data) |
| |
|
| | async def drain(self) -> None: |
| | assert not self._eof_sent, "EOF has already been sent" |
| | assert self._payload_writer is not None, "Response has not been started" |
| | warnings.warn( |
| | "drain method is deprecated, use await resp.write()", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | await self._payload_writer.drain() |
| |
|
| | async def write_eof(self, data: bytes = b"") -> None: |
| | assert isinstance( |
| | data, (bytes, bytearray, memoryview) |
| | ), "data argument must be byte-ish (%r)" % type(data) |
| |
|
| | if self._eof_sent: |
| | return |
| |
|
| | assert self._payload_writer is not None, "Response has not been started" |
| |
|
| | await self._payload_writer.write_eof(data) |
| | self._eof_sent = True |
| | self._req = None |
| | self._body_length = self._payload_writer.output_size |
| | self._payload_writer = None |
| |
|
| | def __repr__(self) -> str: |
| | if self._eof_sent: |
| | info = "eof" |
| | elif self.prepared: |
| | assert self._req is not None |
| | info = f"{self._req.method} {self._req.path} " |
| | else: |
| | info = "not prepared" |
| | return f"<{self.__class__.__name__} {self.reason} {info}>" |
| |
|
| | def __getitem__(self, key: str) -> Any: |
| | return self._state[key] |
| |
|
| | def __setitem__(self, key: str, value: Any) -> None: |
| | self._state[key] = value |
| |
|
| | def __delitem__(self, key: str) -> None: |
| | del self._state[key] |
| |
|
| | def __len__(self) -> int: |
| | return len(self._state) |
| |
|
| | def __iter__(self) -> Iterator[str]: |
| | return iter(self._state) |
| |
|
| | def __hash__(self) -> int: |
| | return hash(id(self)) |
| |
|
| | def __eq__(self, other: object) -> bool: |
| | return self is other |
| |
|
| |
|
| | class Response(StreamResponse): |
| |
|
| | _compressed_body: Optional[bytes] = None |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | body: Any = None, |
| | status: int = 200, |
| | reason: Optional[str] = None, |
| | text: Optional[str] = None, |
| | headers: Optional[LooseHeaders] = None, |
| | content_type: Optional[str] = None, |
| | charset: Optional[str] = None, |
| | zlib_executor_size: Optional[int] = None, |
| | zlib_executor: Optional[Executor] = None, |
| | ) -> None: |
| | if body is not None and text is not None: |
| | raise ValueError("body and text are not allowed together") |
| |
|
| | if headers is None: |
| | real_headers: CIMultiDict[str] = CIMultiDict() |
| | elif not isinstance(headers, CIMultiDict): |
| | real_headers = CIMultiDict(headers) |
| | else: |
| | real_headers = headers |
| |
|
| | if content_type is not None and "charset" in content_type: |
| | raise ValueError("charset must not be in content_type argument") |
| |
|
| | if text is not None: |
| | if hdrs.CONTENT_TYPE in real_headers: |
| | if content_type or charset: |
| | raise ValueError( |
| | "passing both Content-Type header and " |
| | "content_type or charset params " |
| | "is forbidden" |
| | ) |
| | else: |
| | |
| | if not isinstance(text, str): |
| | raise TypeError("text argument must be str (%r)" % type(text)) |
| | if content_type is None: |
| | content_type = "text/plain" |
| | if charset is None: |
| | charset = "utf-8" |
| | real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset |
| | body = text.encode(charset) |
| | text = None |
| | elif hdrs.CONTENT_TYPE in real_headers: |
| | if content_type is not None or charset is not None: |
| | raise ValueError( |
| | "passing both Content-Type header and " |
| | "content_type or charset params " |
| | "is forbidden" |
| | ) |
| | elif content_type is not None: |
| | if charset is not None: |
| | content_type += "; charset=" + charset |
| | real_headers[hdrs.CONTENT_TYPE] = content_type |
| |
|
| | super().__init__(status=status, reason=reason, _real_headers=real_headers) |
| |
|
| | if text is not None: |
| | self.text = text |
| | else: |
| | self.body = body |
| |
|
| | self._zlib_executor_size = zlib_executor_size |
| | self._zlib_executor = zlib_executor |
| |
|
| | @property |
| | def body(self) -> Optional[Union[bytes, Payload]]: |
| | return self._body |
| |
|
| | @body.setter |
| | def body(self, body: Any) -> None: |
| | if body is None: |
| | self._body = None |
| | elif isinstance(body, (bytes, bytearray)): |
| | self._body = body |
| | else: |
| | try: |
| | self._body = body = payload.PAYLOAD_REGISTRY.get(body) |
| | except payload.LookupError: |
| | raise ValueError("Unsupported body type %r" % type(body)) |
| |
|
| | headers = self._headers |
| |
|
| | |
| | if hdrs.CONTENT_TYPE not in headers: |
| | headers[hdrs.CONTENT_TYPE] = body.content_type |
| |
|
| | |
| | if body.headers: |
| | for key, value in body.headers.items(): |
| | if key not in headers: |
| | headers[key] = value |
| |
|
| | self._compressed_body = None |
| |
|
| | @property |
| | def text(self) -> Optional[str]: |
| | if self._body is None: |
| | return None |
| | return self._body.decode(self.charset or "utf-8") |
| |
|
| | @text.setter |
| | def text(self, text: str) -> None: |
| | assert text is None or isinstance( |
| | text, str |
| | ), "text argument must be str (%r)" % type(text) |
| |
|
| | if self.content_type == "application/octet-stream": |
| | self.content_type = "text/plain" |
| | if self.charset is None: |
| | self.charset = "utf-8" |
| |
|
| | self._body = text.encode(self.charset) |
| | self._compressed_body = None |
| |
|
| | @property |
| | def content_length(self) -> Optional[int]: |
| | if self._chunked: |
| | return None |
| |
|
| | if hdrs.CONTENT_LENGTH in self._headers: |
| | return int(self._headers[hdrs.CONTENT_LENGTH]) |
| |
|
| | if self._compressed_body is not None: |
| | |
| | return len(self._compressed_body) |
| | elif isinstance(self._body, Payload): |
| | |
| | return None |
| | elif self._body is not None: |
| | return len(self._body) |
| | else: |
| | return 0 |
| |
|
| | @content_length.setter |
| | def content_length(self, value: Optional[int]) -> None: |
| | raise RuntimeError("Content length is set automatically") |
| |
|
| | async def write_eof(self, data: bytes = b"") -> None: |
| | if self._eof_sent: |
| | return |
| | if self._compressed_body is None: |
| | body: Optional[Union[bytes, Payload]] = self._body |
| | else: |
| | body = self._compressed_body |
| | assert not data, f"data arg is not supported, got {data!r}" |
| | assert self._req is not None |
| | assert self._payload_writer is not None |
| | if body is None or self._must_be_empty_body: |
| | await super().write_eof() |
| | elif isinstance(self._body, Payload): |
| | await self._body.write(self._payload_writer) |
| | await super().write_eof() |
| | else: |
| | await super().write_eof(cast(bytes, body)) |
| |
|
| | async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: |
| | if hdrs.CONTENT_LENGTH in self._headers: |
| | if should_remove_content_length(request.method, self.status): |
| | del self._headers[hdrs.CONTENT_LENGTH] |
| | elif not self._chunked: |
| | if isinstance(self._body, Payload): |
| | if self._body.size is not None: |
| | self._headers[hdrs.CONTENT_LENGTH] = str(self._body.size) |
| | else: |
| | body_len = len(self._body) if self._body else "0" |
| | |
| | if body_len != "0" or ( |
| | self.status != 304 and request.method not in hdrs.METH_HEAD_ALL |
| | ): |
| | self._headers[hdrs.CONTENT_LENGTH] = str(body_len) |
| |
|
| | return await super()._start(request) |
| |
|
| | async def _do_start_compression(self, coding: ContentCoding) -> None: |
| | if self._chunked or isinstance(self._body, Payload): |
| | return await super()._do_start_compression(coding) |
| | if coding is ContentCoding.identity: |
| | return |
| | |
| | |
| | compressor = ZLibCompressor( |
| | encoding=coding.value, |
| | max_sync_chunk_size=self._zlib_executor_size, |
| | executor=self._zlib_executor, |
| | ) |
| | assert self._body is not None |
| | if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE: |
| | warnings.warn( |
| | "Synchronous compression of large response bodies " |
| | f"({len(self._body)} bytes) might block the async event loop. " |
| | "Consider providing a custom value to zlib_executor_size/" |
| | "zlib_executor response properties or disabling compression on it." |
| | ) |
| | self._compressed_body = ( |
| | await compressor.compress(self._body) + compressor.flush() |
| | ) |
| | self._headers[hdrs.CONTENT_ENCODING] = coding.value |
| | self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) |
| |
|
| |
|
| | def json_response( |
| | data: Any = sentinel, |
| | *, |
| | text: Optional[str] = None, |
| | body: Optional[bytes] = None, |
| | status: int = 200, |
| | reason: Optional[str] = None, |
| | headers: Optional[LooseHeaders] = None, |
| | content_type: str = "application/json", |
| | dumps: JSONEncoder = json.dumps, |
| | ) -> Response: |
| | if data is not sentinel: |
| | if text or body: |
| | raise ValueError("only one of data, text, or body should be specified") |
| | else: |
| | text = dumps(data) |
| | return Response( |
| | text=text, |
| | body=body, |
| | status=status, |
| | reason=reason, |
| | headers=headers, |
| | content_type=content_type, |
| | ) |
| |
|