Spaces:
Runtime error
Runtime error
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Hive Appier Framework | |
# Copyright (c) 2008-2024 Hive Solutions Lda. | |
# | |
# This file is part of Hive Appier Framework. | |
# | |
# Hive Appier Framework is free software: you can redistribute it and/or modify | |
# it under the terms of the Apache License as published by the Apache | |
# Foundation, either version 2.0 of the License, or (at your option) any | |
# later version. | |
# | |
# Hive Appier Framework is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# Apache License for more details. | |
# | |
# You should have received a copy of the Apache License along with | |
# Hive Appier Framework. If not, see <http://www.apache.org/licenses/>. | |
__author__ = "João Magalhães <joamag@hive.pt>" | |
""" The author(s) of the module """ | |
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." | |
""" The copyright for the module """ | |
__license__ = "Apache License, Version 2.0" | |
""" The license for the module """ | |
import json | |
import time | |
import base64 | |
import datetime | |
from . import util | |
from . import config | |
from . import legacy | |
from . import session | |
from . import exceptions | |
CODE_STRINGS = { | |
100: "Continue", | |
101: "Switching Protocols", | |
200: "OK", | |
201: "Created", | |
202: "Accepted", | |
203: "Non-Authoritative Information", | |
204: "No Content", | |
205: "Reset Content", | |
206: "Partial Content", | |
207: "Multi-Status", | |
301: "Moved permanently", | |
302: "Found", | |
303: "See Other", | |
304: "Not Modified", | |
305: "Use Proxy", | |
306: "(Unused)", | |
307: "Temporary Redirect", | |
400: "Bad Request", | |
401: "Unauthorized", | |
402: "Payment Required", | |
403: "Forbidden", | |
404: "Not Found", | |
405: "Method Not Allowed", | |
406: "Not Acceptable", | |
407: "Proxy Authentication Required", | |
408: "Request Timeout", | |
409: "Conflict", | |
410: "Gone", | |
411: "Length Required", | |
412: "Precondition Failed", | |
413: "Request Entity Too Large", | |
414: "Request-URI Too Long", | |
415: "Unsupported Media Type", | |
416: "Requested Range Not Satisfiable", | |
417: "Expectation Failed", | |
500: "Internal Server Error", | |
501: "Not Implemented", | |
502: "Bad Gateway", | |
503: "Service Unavailable", | |
504: "Gateway Timeout", | |
505: "HTTP Version Not Supported", | |
} | |
""" Dictionary associating the error code as integers | |
with the official descriptive message for it """ | |
class Request(object): | |
""" | |
Request class that acts as a proxy for the both | |
the input and output of an HTTP request. | |
Other responsibilities should include indirect | |
session management and serialization. | |
""" | |
ALIAS = ("session_id", "_sid") | |
""" The list of strings that are considered to represent | |
and alias to the session identifier name, this values may | |
be changed in case the app required loading of the session | |
using a different attribute name """ | |
def __init__( | |
self, | |
owner=None, | |
method="GET", | |
path="/", | |
prefix="/", | |
query="", | |
scheme=None, | |
address=None, | |
protocol=None, | |
params={}, | |
data_j={}, | |
environ={}, | |
session_c=session.FileSession, | |
): | |
self.owner = owner | |
self.method = method | |
self.path = path | |
self.prefix = prefix | |
self.query = query | |
self.scheme = scheme | |
self.address = address | |
self.protocol = protocol | |
self.params = params | |
self.data_j = data_j | |
self.environ = environ | |
self.session_c = session_c | |
self.handled = False | |
self.stime = None | |
self.etime = None | |
self.context = None | |
self.method_i = None | |
self.exception = None | |
self.stacktrace = None | |
self.json = False | |
self.code = 200 | |
self.location = prefix + util.quote(path).lstrip("/") | |
self.encoding = "utf-8" | |
self.content_type = None | |
self.cache_control = None | |
self.authorization = None | |
self.data = None | |
self.result = None | |
self.result_l = None | |
self.locale = None | |
self.language = None | |
self.query_s = None | |
self.prefixes = None | |
self.session = session.MockSession(self) | |
self.set_cookie = None | |
self.set_cookie_prefix = None | |
self.set_cookie_suffix = None | |
self.post = {} | |
self.files = {} | |
self.args = {} | |
self.cookies = {} | |
self.in_headers = {} | |
self.out_headers = {} | |
self.warnings = [] | |
self.properties = {} | |
self._closed = False | |
self._params = None | |
def __str__(self): | |
return str(self.repr()) | |
def __unicode__(self): | |
return self.__str__() | |
def close(self): | |
""" | |
Called upon the closing of the request so that the internal references | |
the request contains may be related (avoiding memory leaks). | |
After a request is closed it cannot be re-opened and a new request must | |
be created instead if a new one is required. | |
""" | |
self.owner = None | |
self.method = None | |
self.path = None | |
self.prefix = None | |
self.query = None | |
self.scheme = None | |
self.address = None | |
self.protocol = None | |
self.params = None | |
self.data_j = None | |
self.environ = None | |
self.session_c = None | |
self.handled = None | |
self.context = None | |
self.method_i = None | |
self.json = None | |
self.code = None | |
self.location = None | |
self.encoding = None | |
self.content_type = None | |
self.cache_control = None | |
self.authorization = None | |
self.data = None | |
self.result = None | |
self.result_l = None | |
self.locale = None | |
self.language = None | |
self.query_s = None | |
self.prefixes = None | |
self.session = None | |
self.set_cookie = None | |
self.set_cookie_prefix = None | |
self.set_cookie_suffix = None | |
self.post = None | |
self.files = None | |
self.args = None | |
self.cookies = None | |
self.in_headers = None | |
self.out_headers = None | |
self.warnings = None | |
self.properties = None | |
self._closed = True | |
self._params = None | |
def handle(self, code=None, result=""): | |
""" | |
Handles the current request, setting the response code | |
and the resulting data that is going to be returned to | |
the current client. | |
This method should not be called directly by any of the | |
action functions but only by the internal appier structures | |
or any other extra middleware handler. Because of that | |
this method should be considered internal and must be | |
used with extreme care. | |
If this method is called the request is considered to be | |
handled and the "typical" action function based handling | |
is going to be skipped. | |
:type code: int | |
:param code: The status code that is going to be set for | |
the response associated with the current request. | |
:type result: Object | |
:param result: Value that is returned to the handling | |
infra-structure as the result of the handling, proper data | |
types may vary and the include: strings, dictionaries or | |
generator objects. | |
""" | |
self.handled = True | |
self.code = self.code if self.code else 200 | |
self.code = code if code else self.code | |
self.result = result | |
def flush(self): | |
""" | |
Flushes the current request information meaning that the | |
current request information is updated so that it's state | |
is persisted into the current client's information. | |
This method should always be called at the end of the request | |
handling workflow. | |
""" | |
self.session.flush(self) | |
def repr(self): | |
""" | |
Returns the dictionary based representation of the current | |
request this information may be used both for debugging or | |
logging purposes. | |
For performance reasons this method should be used the least | |
amount of times possible. | |
:rtype: Dictionary | |
:return: The dictionary containing the information about | |
the current request. | |
""" | |
return dict( | |
method=self.method, | |
path=self.path, | |
prefix=self.prefix, | |
query=self.query, | |
scheme=self.scheme, | |
encoding=self.encoding, | |
content_type=self.content_type, | |
cache_conrtol=self.cache_control, | |
) | |
def warning(self, message): | |
message_t = type(message) | |
if message_t in legacy.STRINGS: | |
message = dict(message=message) | |
elif not message_t == dict: | |
raise exceptions.OperationalError( | |
message="Invalid message type '%s'" % message_t | |
) | |
self.warnings.append(message) | |
def get_params(self): | |
if self._params: | |
return self._params | |
self._params = {} | |
for key, value in self.params.items(): | |
self._params[key] = value[0] | |
return self._params | |
def get_param(self, name, default=None): | |
if not name in self.params: | |
return default | |
value = self.params[name] | |
return value[0] if value else default | |
def set_params(self, params): | |
self.params = params | |
self.extend_args(params) | |
def set_post(self, post): | |
self.post = post | |
self.extend_args(post) | |
def set_files(self, files): | |
self.files = files | |
self.extend_args(files) | |
def set_multipart(self, post, files, ordered): | |
self.post = post | |
self.files = files | |
self.extend_args(ordered) | |
def get_data(self): | |
return self.data | |
def set_data(self, data): | |
self.data = data | |
def get_json(self): | |
return self.data_j | |
def set_json(self, data_j): | |
self.data_j = data_j | |
def set_code(self, code): | |
self.code = code | |
def get_encoded(self, encoding=None, safe=True): | |
if encoding == None: | |
encoding = self.get_encoding() | |
if not encoding: | |
return self.data | |
try: | |
return legacy.u(self.data, encoding=encoding, force=True) | |
except UnicodeDecodeError: | |
if not safe: | |
raise | |
return self.data | |
def extend_args(self, args): | |
is_dict = isinstance(args, dict) | |
args = args.items() if is_dict else args | |
for key, value in args: | |
_value = self.args.get(key, []) | |
_value.extend(value) | |
self.args[key] = _value | |
def get_encoding(self): | |
return self.encoding | |
def set_encoding(self, encoding): | |
self.encoding = encoding | |
def get_content_type(self): | |
return self.content_type | |
def set_content_type(self, content_type): | |
self.content_type = content_type | |
def default_content_type(self, default): | |
if self.content_type: | |
return | |
if self.has_header_out("Content-Type"): | |
return | |
self.content_type = default | |
def get_cache_control(self): | |
return self.cache_control | |
def set_cache_control(self, cache_control): | |
self.cache_control = cache_control | |
def default_cache_control(self, default): | |
if self.cache_control: | |
return | |
if self.has_header_out("Cache-Control"): | |
return | |
self.cache_control = default | |
def get_header(self, name, default=None, normalize=True): | |
if normalize: | |
name = name.title() | |
return self.in_headers.get(name, default) | |
def set_header(self, name, value, normalize=False, replace=True): | |
if normalize: | |
name = name.title() | |
if not replace and name in self.out_headers: | |
return | |
self.out_headers[name] = legacy.ascii(value) | |
def ensure_header(self, name, value, normalize=False): | |
self.set_header(name, value, normalize=normalize, replace=False) | |
def set_headers(self, headers): | |
if not headers: | |
return | |
for name, value in headers.items(): | |
self.set_header(name, value) | |
def set_headers_l(self, headers_l): | |
for name, value in headers_l: | |
self.set_header(name, value) | |
def has_header_in(self, name, insensitive=True): | |
if insensitive: | |
name = name.title() | |
return name in self.in_headers | |
def has_header_out(self, name, insensitive=True): | |
if insensitive: | |
name = name.title() | |
return name in self.out_headers | |
def set_headers_b(self): | |
content_type = self.get_content_type() or "text/plain" | |
cache_control = self.get_cache_control() | |
self.set_header("Content-Type", content_type) | |
if cache_control: | |
self.set_header("Cache-Control", cache_control) | |
def get_address(self, resolve=True, cleanup=True): | |
""" | |
Retrieves the client (network) address associated with the | |
current request/connection. | |
In case the resolve flag is set the resolution takes a more | |
complex approach to avoid the common problems when used proxy | |
based connections (indirection in connection). | |
If cleanup mode is used it means that IPv4 addresses encapsulated | |
in IPv6 addresses will have the prefix removed (eg: ::ffff:127.0.0.01 | |
will become just 127.0.0.1). | |
:type resolve: bool | |
:param resolve: If the complex resolution process for the | |
network address should take place. | |
:type cleanup: bool | |
:param cleanup: If the final value should have the possible | |
IPv6 to IPv4 address prefix removed. | |
:rtype: String | |
:return: The resolved client (network) address. | |
""" | |
if resolve: | |
address = self.get_header("X-Forwarded-For", self.address) | |
address = self.get_header("X-Client-IP", address) | |
address = self.get_header("X-Real-IP", address) | |
address = address.split(",", 1)[0].strip() if address else address | |
else: | |
address = self.address | |
if cleanup and address and address.startswith("::ffff:"): | |
address = address[7:] | |
return address | |
def get_host(self, resolve=True): | |
""" | |
Retrieves the hostname (as a string) associated with the | |
current request. | |
In case the resolve flag is set the resolution takes a more | |
complex approach to avoid the common problems when used proxy | |
based connections (indirection in connection). | |
:type resolve: bool | |
:param resolve: If the complex resolution process for the | |
hostname should take place. | |
:rtype: String | |
:return: The resolved hostname string value. | |
""" | |
server_name = self.environ.get("SERVER_NAME", "localhost") | |
server_port = int(self.environ.get("SERVER_PORT", "80")) | |
server_host = "%s:%d" % (server_name, server_port) | |
host = self.get_header("Host", server_host) | |
if resolve: | |
host = self.get_header("X-Forwarded-Host", host) | |
return host | |
def get_url(self, resolve=True): | |
host = self.get_host(resolve=resolve) | |
if not host: | |
return | |
if not self.scheme: | |
return | |
query_s = "?%s" % self.query if self.query else "" | |
return "%s://%s%s%s" % (self.scheme, host, self.path, query_s) | |
def resolve_params(self): | |
self.params = self._resolve_p(self.params) | |
def resolve_query_s(self, prefixes=("x-",)): | |
parts = [ | |
part for part in self.query.split("&") if not part.startswith(prefixes) | |
] | |
self.query_s = "&".join(parts) | |
self.prefixes = prefixes | |
def load_base(self): | |
self.load_data() | |
self.load_form() | |
self.load_authorization() | |
self.load_headers() | |
self.load_session() | |
def load_data(self): | |
# initializes the various structures associated with the data loading | |
# and/or parsing, so that the request is correctly populated | |
self.data_j = None | |
self.post = {} | |
self.files = {} | |
# verifies if the current data attribute contains a valid value in case | |
# it does not returns immediately as there's nothing to be loaded | |
if not self.data: | |
return | |
# tries to retrieve the current content type value set in the environment | |
# then splits it around the separator to retrieve the mime type | |
content_type = self.environ.get("CONTENT_TYPE", "application/json") | |
content_type = self.environ.get("HTTP_CONTENT_TYPE", content_type) | |
content_type_s = content_type.split(";") | |
mime_type = content_type_s[0].strip() | |
if mime_type == "application/json": | |
data = self.data.decode("utf-8") if self.data else None | |
try: | |
self.data_j = json.loads(data) if data else None | |
except Exception: | |
pass | |
elif mime_type == "application/x-www-form-urlencoded": | |
data = legacy.str(self.data) if self.data else None | |
post = legacy.parse_qs(data, keep_blank_values=True) if self.data else {} | |
post = util.decode_params(post) | |
self.set_post(post) | |
elif mime_type == "multipart/form-data": | |
boundary = content_type_s[1].strip() | |
post, files, ordered = util.parse_multipart(self.data, boundary) | |
self.set_multipart(post, files, ordered) | |
def load_form(self): | |
self.params_s = util.load_form(self.params) | |
self.post_s = util.load_form(self.post) | |
self.files_s = util.load_form(self.files) | |
def load_authorization(self): | |
# tries to decode the provided authorization header into it's own | |
# components of username and password, in case the structure of the | |
# provided string is not compliant an exception is raised | |
authorization = self.environ.get("HTTP_AUTHORIZATION", None) | |
if not authorization: | |
return | |
parts = authorization.split(" ", 1) | |
if not len(parts) == 2: | |
raise exceptions.OperationalError(message="Invalid authorization header") | |
_method, value = parts | |
value = base64.b64decode(value) | |
value = legacy.str(value) | |
parts = value.split(":", 1) | |
if not len(parts) == 2: | |
raise exceptions.OperationalError(message="Invalid authorization header") | |
self.authorization = tuple(parts) | |
def load_headers(self): | |
for key, value in self.environ.items(): | |
if not key.startswith("HTTP_"): | |
continue | |
key = key[5:] | |
parts = key.split("_") | |
parts = [part.title() for part in parts] | |
key_s = "-".join(parts) | |
self.in_headers[key_s] = value | |
def load_session(self): | |
self.load_mock() | |
self.load_cookies() | |
self.set_alias() | |
self.set_session() | |
def load_mock(self): | |
self.session.address = self.get_address() | |
def load_cookies(self): | |
cookie_s = self.environ.get("HTTP_COOKIE", "") | |
self.cookies = util.parse_cookie(cookie_s) | |
def locale_s(self, value=None): | |
value = value or self.locale | |
return value.replace("_", "-").lower() | |
def locale_b(self, value=None): | |
value = value or self.locale | |
return value.replace("-", "_").lower() | |
def locale_l(self, value=None): | |
value = value or self.locale | |
return value.split("_", 1)[0] | |
def load_locale(self, available, fallback="en_us", ensure=True): | |
# tries to gather the best locale value using the currently | |
# available strategies and in case the retrieved local is part | |
# of the valid locales for the app returns the locale, otherwise | |
# returns the fallback value instead (using the first available | |
# locale in case the ensure flag is set) | |
locale = self.get_locale(fallback=fallback) | |
locale = self.locale_b(locale) | |
language = self.locale_l(locale) | |
if locale in available: | |
return self.set_locale(locale) | |
if language in available: | |
return self.set_locale(locale) | |
if ensure and available: | |
return self.set_locale(available[0]) | |
return self.set_locale(fallback) | |
def get_locale(self, fallback="en_us"): | |
# tries to retrieve the locale value from the provided URL | |
# parameters (this is the highest priority) and in case it | |
# exists returns this locale immediately | |
locale = self.params.get("locale", None) | |
if locale: | |
return locale[0] | |
# uses the currently loaded session to try to gather the locale | |
# value from it and in case it's valid and exists returns it | |
locale = self.session.get("locale", None) | |
if locale: | |
return locale | |
# gathers the complete set of language values set in the accept | |
# language header and in case there's at least one value returned | |
# returns the first of these values as the locale | |
langs = self.get_langs() | |
if langs: | |
return langs[0] | |
# defines the locale as the global wide appier configuration and | |
# tries to retrieve the value of it in case it's valid uses it | |
locale = config.conf("LOCALE", None) | |
if locale: | |
return locale | |
# in case this code entry is reached all the strategies for locale | |
# retrieval have failed and so the fallback value is returned | |
return fallback | |
def set_locale(self, locale): | |
# sets both the base locale value but also the language attribute | |
# that may be used to determine the base (less specific) language value | |
self.locale = locale | |
self.language = self.locale_l(locale) | |
def get_langs(self): | |
# gathers the value of the accept language header and in case | |
# it's not defined returns immediately as no language can be | |
# determined using the currently provided headers | |
accept_language = self.in_headers.get("Accept-Language", None) | |
if not accept_language: | |
return () | |
# starts the list that is going to be used to store the various | |
# languages "recovered" from the accept language header, note that | |
# the order of these languages should be from the most relevant to | |
# the least relevant as defined in HTTP specification | |
langs = [] | |
# splits the accept language header into the various components of | |
# it and then iterates over each of them splitting each of the | |
# components into the proper language string and priority | |
parts = accept_language.split(",") | |
for part in parts: | |
values = part.split(";", 1) | |
value_l = len(values) | |
if value_l == 1: | |
(lang,) = values | |
else: | |
lang, _priority = values | |
lang = lang.replace("-", "_") | |
lang = lang.lower() | |
langs.append(lang) | |
# returns the complete list of languages that have been extracted | |
# from the accept language header these list may be empty in case | |
# the header was not parsed correctly or there's no contents in it | |
return langs | |
def set_alias(self): | |
for alias in Request.ALIAS: | |
if not alias in self.params: | |
continue | |
self.params["sid"] = self.params[alias] | |
for alias in Request.ALIAS: | |
if not alias in self.post: | |
continue | |
self.post["sid"] = self.post[alias] | |
for alias in Request.ALIAS: | |
if not alias in self.args: | |
continue | |
self.args["sid"] = self.args[alias] | |
def set_session(self, create=False): | |
# tries to retrieves the session id (SID) from all the | |
# possible sources so that something may be used in the | |
# identification of the current request | |
sid = self.cookies.get("sid", None) | |
sid = self.post.get("sid", (None,))[0] or sid | |
sid = self.params.get("sid", (None,))[0] or sid | |
# in case the data type of the currently provided session | |
# identifier is not unicode based converts it into a string | |
# so that it may be used to correctly retrieve the associated | |
# session object from the underlying session class repository | |
sid = str(sid) if type(sid) == legacy.UNICODE else sid | |
# tries to retrieve the session reference for the | |
# provided SID (session id) in case there's an exception | |
# defaults to unset session so that a new one gets created | |
try: | |
session = self.session_c.get_s(sid, request=self) | |
except Exception: | |
session = None | |
# in case no valid session exists a new one must be created | |
# so that the user may be able to interact with the system | |
# with some kind of memory/persistence otherwise sets the | |
# loaded session in the current request (to be used) | |
if session: | |
self.session = session | |
elif create: | |
self.session = self.session_c.new(address=self.address) | |
def get_session(self): | |
return self.session | |
def get_session_agent(self): | |
if not self.session: | |
return None | |
if not self.session.is_loaded(): | |
return None | |
username = self.session.get("username", None) | |
if username: | |
return username | |
email = self.session.get("email", None) | |
if email: | |
return email | |
id = self.session.get("id", None) | |
if id: | |
return id | |
return None | |
def get_warnings(self): | |
return self.warnings | |
def get_set_cookie(self, lang="en", path="/", delta=31536000): | |
base = self.set_cookie | |
if not base: | |
return base | |
expires = time.time() + delta | |
expires_d = datetime.datetime.fromtimestamp(expires) | |
with util.ctx_locale(): | |
expires_s = expires_d.strftime("%a, %m %b %Y %H:%M:%S GMT") | |
set_cookie = "%s%s;lang=%s;path=%s;expires=%s;%s" % ( | |
self.set_cookie_prefix or self.owner.set_cookie_prefix or "", | |
base, | |
lang, | |
path, | |
expires_s, | |
self.set_cookie_suffix or self.owner.set_cookie_suffix or "", | |
) | |
return set_cookie | |
def get_headers(self): | |
return self.get_out_headers() | |
def get_in_headers(self): | |
headers = self.in_headers.items() | |
return legacy.eager(headers) | |
def get_out_headers(self): | |
headers = self.out_headers.items() | |
return legacy.eager(headers) | |
def get_code_s(self): | |
code_s = CODE_STRINGS.get(self.code, "Unknown") | |
code_s = str(self.code) + " " + code_s | |
return code_s | |
def get_sdate(self, format="%d/%b/%Y:%H:%M:%S +0000"): | |
sdate = datetime.datetime.utcfromtimestamp(self.stime) | |
return sdate.strftime(format) | |
def get_edate(self, format="%d/%b/%Y:%H:%M:%S +0000"): | |
edate = datetime.datetime.utcfromtimestamp(self.etime) | |
return edate.strftime(format) | |
def is_mobile(self): | |
user_agent = self.get_header("User-Agent") | |
return util.is_mobile(user_agent) | |
def is_tablet(self): | |
user_agent = self.get_header("User-Agent") | |
return util.is_tablet(user_agent) | |
def is_browser(self): | |
user_agent = self.get_header("User-Agent") | |
return util.is_browser(user_agent) | |
def is_bot(self): | |
user_agent = self.get_header("User-Agent") | |
return util.is_bot(user_agent) | |
def is_success(self): | |
return self.code == 200 | |
def is_empty(self): | |
return self.code in (204, 304) | |
def is_mock(self): | |
return False | |
def location_f(self, safe=True): | |
query = self.query_s if safe else self.query | |
if not query: | |
return self.location | |
return self.location + "?" + query | |
def params_f(self, safe=True): | |
if not safe: | |
return self.params_s | |
if hasattr(self, "_params_f"): | |
return self._params_f | |
self._params_f = dict( | |
[ | |
(key, value) | |
for key, value in self.params_s.items() | |
if not key.startswith(self.prefixes) | |
] | |
) | |
return self._params_f | |
def duration(self, milliseconds=True, safe=True): | |
if not self.stime: | |
return None | |
if not self.etime and not safe: | |
return None | |
etime = self.etime or time.time() | |
duration = etime - self.stime | |
if not milliseconds: | |
return duration | |
return duration * 1000.0 | |
def in_length(self): | |
data = self.get_data() | |
if not data: | |
return 0 | |
return len(data) | |
def out_length(self, safe=True): | |
return self.result_l or 0 | |
def asynchronous(self): | |
return True if self.get_header("X-Async") else False | |
def partial(self): | |
return True if self.get_header("X-Partial") else False | |
def browser_info(self): | |
user_agent = self.get_header("User-Agent") | |
return util.browser_info(user_agent) | |
def _resolve_p(self, params): | |
secret = self.session.get("secret", None) | |
if not secret: | |
return params | |
raise exceptions.AppierException(message="Not implemented") | |
class MockRequest(Request): | |
""" | |
Mock request class, that is meant to be used for situations | |
where no web oriented request is possible to be retried or | |
the logic being running outside of web request. | |
It's recommended to keep only one instance of a mock request | |
per each execution life-cycle. | |
""" | |
def __init__(self, locale="en_us", *args, **kwargs): | |
Request.__init__(self, *args, **kwargs) | |
self.files_s = dict() | |
self.post_s = dict() | |
self.params_s = dict() | |
self.set_locale(locale) | |
def close(self): | |
pass | |
def is_mock(self): | |
return True | |