#!/usr/bin/python # -*- coding: utf-8 -*- # Hive Appier Framework # Copyright (c) 2008-2024 Hive Solutions Lda. # # This file is part of Hive Appier Framework. # # Hive Appier Framework is free software: you can redistribute it and/or modify # it under the terms of the Apache License as published by the Apache # Foundation, either version 2.0 of the License, or (at your option) any # later version. # # Hive Appier Framework is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # Apache License for more details. # # You should have received a copy of the Apache License along with # Hive Appier Framework. If not, see . __author__ = "João Magalhães " """ The author(s) of the module """ __copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." """ The copyright for the module """ __license__ = "Apache License, Version 2.0" """ The license for the module """ import os import json import base64 import string import random import logging import threading from . import util from . import common from . import legacy from . import typesf from . import config from . import exceptions from . import structures TIMEOUT = 60 """ The timeout in seconds to be used for the blocking operations in the HTTP connection, this value avoid unwanted blocking operations to remain open for an infinite time """ RANGE = string.ascii_letters + string.digits """ The range of characters that are going to be used in the generation of the boundary value for the mime """ SEQUENCE_TYPES = (list, tuple) """ The sequence defining the various types that are considered to be sequence based for python """ AUTH_ERRORS = (401, 403, 440, 499) """ The sequence that defines the various HTTP errors considered to be authentication related and for which a new authentication try will be performed """ ACCESS_LOCK = threading.RLock() """ Global access lock used for locking global operations that require thread safety under the HTTP infra-structure """ def file_g(path, chunk=40960): yield os.path.getsize(path) file = open(path, "rb") try: while True: data = file.read(chunk) if not data: break yield data finally: file.close() def get_f(*args, **kwargs): name = kwargs.pop("name", "default") kwargs["handle"] = kwargs.get("handle", True) kwargs["redirect"] = kwargs.get("redirect", True) data, response = get(*args, **kwargs) info = response.info() mime = info.get("Content-Type", None) file_tuple = util.FileTuple((name, mime, data)) return typesf.File(file_tuple) def get( url, params=None, headers=None, handle=None, silent=None, redirect=None, timeout=None, auth_callback=None, **kwargs ): return _method( _get, url, params=params, headers=headers, handle=handle, silent=silent, redirect=redirect, timeout=timeout, auth_callback=auth_callback, **kwargs ) def post( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, auth_callback=None, **kwargs ): return _method( _post, url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, auth_callback=auth_callback, **kwargs ) def put( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, auth_callback=None, **kwargs ): return _method( _put, url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, auth_callback=auth_callback, **kwargs ) def delete( url, params=None, headers=None, handle=None, silent=None, redirect=None, timeout=None, auth_callback=None, **kwargs ): return _method( _delete, url, params=params, headers=headers, handle=handle, silent=silent, redirect=redirect, timeout=timeout, auth_callback=auth_callback, **kwargs ) def patch( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, auth_callback=None, **kwargs ): return _method( _patch, url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, auth_callback=auth_callback, **kwargs ) def basic_auth(username, password=None): if not password: password = username authorization = _authorization(username, password) return "Basic %s" % authorization def _try_auth(auth_callback, params, headers=None): if not auth_callback: raise if headers == None: headers = dict() auth_callback(params, headers) def _method(method, *args, **kwargs): try: auth_callback = kwargs.pop("auth_callback", None) result = method(*args, **kwargs) except legacy.HTTPError as error: try: params = kwargs.get("params", None) headers = kwargs.get("headers", None) if not error.code in AUTH_ERRORS: raise _try_auth(auth_callback, params, headers) result = method(*args, **kwargs) except legacy.HTTPError as error: code = error.getcode() raise exceptions.HTTPError(error, code) return result def _get( url, params=None, headers=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): return _method_empty( "GET", url, params=params, headers=headers, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _post( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): return _method_payload( "POST", url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _put( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): return _method_payload( "PUT", url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _delete( url, params=None, headers=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): return _method_empty( "DELETE", url, params=params, headers=headers, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _patch( url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): return _method_payload( "PATCH", url, params=params, data=data, data_j=data_j, data_m=data_m, headers=headers, mime=mime, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _method_empty( name, url, params=None, headers=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): if handle == None: handle = False if silent == None: silent = config.conf("HTTP_SILENT", False, cast=bool) if redirect == None: redirect = config.conf("HTTP_REDIRECT", False, cast=bool) if timeout == None: timeout = config.conf("HTTP_TIMEOUT", TIMEOUT, cast=int) values = params or dict() values_s = " with '%s'" % str(values) if values else "" if not silent: logging.debug("%s %s%s" % (name, url, values_s)) url, scheme, host, authorization, extra = _parse_url(url) if extra: values.update(extra) data = _urlencode(values) headers = dict(headers) if headers else dict() if host: headers["Host"] = host if authorization: headers["Authorization"] = "Basic %s" % authorization url = url + "?" + data if data else url url = str(url) _method_callback(handle, kwargs) # runs the concrete resolution method (taking into account the adapter) # providing it with the required parameters for request execution file = _resolve(url, name, headers, None, silent, timeout, **kwargs) # verifies if the resulting "file" from the resolution process is either # an invalid or tuple value and if that's the case as the request has # probably been deferred for asynchronous execution if file == None: return file if isinstance(file, tuple): return file try: result = file.read() finally: file.close() code = file.getcode() info = file.info() location = info.get("Location", None) if redirect else None if location: return _redirect( location, scheme, host, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) if not silent: logging.debug("%s %s returned '%d'" % (name, url, code)) result = _result(result, info) return (result, file) if handle else result def _method_payload( name, url, params=None, data=None, data_j=None, data_m=None, headers=None, mime=None, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): if handle == None: handle = False if silent == None: silent = config.conf("HTTP_SILENT", False, cast=bool) if redirect == None: redirect = config.conf("HTTP_REDIRECT", False, cast=bool) if timeout == None: timeout = config.conf("HTTP_TIMEOUT", TIMEOUT, cast=int) values = params or dict() values_s = " with '%s'" % str(values) if values else "" if not silent: logging.debug("%s %s%s" % (name, url, values_s)) url, scheme, host, authorization, extra = _parse_url(url) if extra: values.update(extra) data_e = _urlencode(values) if not data == None: url = url + "?" + data_e if data_e else url elif not data_j == None: data = json.dumps(data_j) url = url + "?" + data_e if data_e else url mime = mime or "application/json" elif not data_m == None: url = url + "?" + data_e if data_e else url content_type, data = _encode_multipart(data_m, mime=mime, doseq=True) mime = content_type elif data_e: data = data_e mime = mime or "application/x-www-form-urlencoded" if legacy.is_unicode(data): data = legacy.bytes(data, force=True) if not data: length = 0 elif legacy.is_bytes(data): length = len(data) else: length = -1 headers = dict(headers) if headers else dict() if not length == -1: headers["Content-Length"] = str(length) if mime: headers["Content-Type"] = mime if host: headers["Host"] = host if authorization: headers["Authorization"] = "Basic %s" % authorization url = str(url) _method_callback(handle, kwargs) file = _resolve(url, name, headers, data, silent, timeout, **kwargs) if file == None: return file try: result = file.read() finally: file.close() code = file.getcode() info = file.info() location = info.get("Location", None) if redirect else None if location: return _redirect( location, scheme, host, handle=handle, silent=silent, redirect=redirect, timeout=timeout, ) if not silent: logging.debug("%s %s returned '%d'" % (name, url, code)) result = _result(result, info) return (result, file) if handle else result def _method_callback(handle, kwargs): # tries to determine if a callback value has been registered # in the set of keyword argument and if that's not the case # returns immediately (nothing to be done) callback = kwargs.get("callback", None) if not callback: return def callback_wrap(file): # determines if the received file is valid (no error) # or if instead there's an error with the connection # and an invalid/unset value has been provided if file: info = file.info() try: result = file.read() finally: file.close() result = _result(result, info) else: result = None # taking into account the handle flag determines the # kind of result structure that should be "returned" result = (result, file) if handle else (result,) callback(*result) # sets the "new" callback clojure in the set of keyword # based arguments for the calling of the HTTP handler method # so that this new callback is called instead of the original kwargs["callback"] = callback_wrap def _redirect( location, scheme, host, handle=None, silent=None, redirect=None, timeout=None, **kwargs ): is_relative = location.startswith("/") if is_relative: location = scheme + "://" + host + location logging.debug("Redirecting to %s" % location) return get( location, handle=handle, silent=silent, redirect=redirect, timeout=timeout, **kwargs ) def _resolve(*args, **kwargs): # obtains the reference to the global set of variables, so # that it's possible to obtain the proper resolver method # according to the requested client _global = globals() # tries to retrieve the global configuration values that # will condition the way the request is going to be performed client = config.conf("HTTP_CLIENT", "netius") reuse = config.conf("HTTP_REUSE", True, cast=bool) # tries to determine the set of configurations requested on # a request basis (not global) these have priority when # compared with the global configuration ones client = kwargs.pop("client", client) reuse = kwargs.pop("reuse", reuse) # sets the value for connection re-usage so that a connection # pool is used if request, otherwise one connection per request # is going to be used (at the expense of resources) kwargs["reuse"] = reuse # tries to retrieve the reference to the resolve method for the # current client and then runs it, retrieve then the final result, # note that the result structure may be engine dependent resolver = _global.get("_resolve_" + client, _resolve_legacy) try: result = resolver(*args, **kwargs) except ImportError: result = _resolve_legacy(*args, **kwargs) return result def _resolve_legacy(url, method, headers, data, silent, timeout, **kwargs): is_generator = not data == None and legacy.is_generator(data) if is_generator: next(data) data = b"".join(data) is_file = hasattr(data, "tell") if is_file: data = data.read() opener = legacy.build_opener(legacy.HTTPHandler) request = legacy.Request(url, data=data, headers=headers) request.get_method = lambda: method return opener.open(request, timeout=timeout) def _resolve_requests(url, method, headers, data, silent, timeout, **kwargs): util.ensure_pip("requests") import requests global _requests_session # retrieves the various dynamic parameters for the HTTP client # usage under the requests infra-structure reuse = kwargs.get("reuse", True) connections = kwargs.get("connections", 256) # verifies if the provided data is a generator, assumes that if the # data is not invalid and is of type generator then it's a generator # and then if that's the case encapsulates this size based generator # into a generator based file-like object so that it can be used inside # the request infra-structure (as it accepts only file objects) is_generator = not data == None and legacy.is_generator(data) if is_generator: data = structures.GeneratorFile(data) # verifies if the session for the requests infra-structure is # already created and if that's not the case and the re-use # flag is sets creates a new session for the requested settings registered = "_requests_session" in globals() if not registered and reuse: _requests_session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=connections, pool_maxsize=connections ) _requests_session.mount("", adapter) # determines the based object from which the concrete methods # are going to be loaded by inspecting the re-use flag if reuse: base = _requests_session else: base = requests # converts the string based method value into a lower cased value # and then uses it to retrieve the method object method (callable) # that is going to be called to perform the request method = method.lower() caller = getattr(base, method) # runs the caller method (according to selected method) and waits for # the result object converting it then to the target response object result = caller(url, headers=headers, data=data, timeout=timeout) response = HTTPResponse( data=result.content, code=result.status_code, headers=result.headers ) # retrieves the response code of the created response and verifies if # it represent an error, if that's the case raised an error exception # to the upper layers to break the current execution logic properly code = response.getcode() is_error = _is_error(code) if is_error: raise legacy.HTTPError(url, code, "HTTP retrieval problem", None, response) # returns the final response object to the caller method, this object # should comply with the proper upper layers structure return response def _resolve_netius(url, method, headers, data, silent, timeout, **kwargs): util.ensure_pip("netius") import netius.clients # determines the final value of the silent flag taking into # account if the current infra-structure is not running under # a development environment silent = silent or False silent |= not common.is_devel() # converts the provided dictionary of headers into a new map to # allow any re-writing of values, valuable for a re-connect headers = dict(headers) # tries to determine the proper level of verbosity to be used by # the client, for that the system tries to determine if the current # execution environment is a development one (verbose) level = logging.CRITICAL if silent else logging.DEBUG # retrieves the various dynamic parameters for the HTTP client # usage under the netius infra-structure retry = kwargs.get("retry", 1) reuse = kwargs.get("reuse", True) level = kwargs.get("level", level) asynchronous = kwargs.get("async", False) asynchronous = kwargs.get("asynchronous", asynchronous) use_file = kwargs.get("use_file", False) callback = kwargs.get("callback", None) callback_init = kwargs.get("callback_init", None) callback_open = kwargs.get("callback_open", None) callback_headers = kwargs.get("callback_headers", None) callback_data = kwargs.get("callback_data", None) callback_result = kwargs.get("callback_result", None) # re-calculates the retry and re-use flags taking into account # the async flag, if the execution mode is async we don't want # to re-use the HTTP client as it would create issues retry, reuse = (0, False) if asynchronous else (retry, reuse) # creates the proper set of extra parameters to be sent to the # HTTP client taking into account a possible async method request extra = ( _async_netius( callback=callback, callback_init=callback_init, callback_open=callback_open, callback_headers=callback_headers, callback_data=callback_data, callback_result=callback_result, ) if asynchronous else dict( on_init=lambda c: callback_init and callback_init(c), on_open=lambda c: callback_open and callback_open(c), on_headers=lambda c, p: callback_headers and callback_headers(p.headers), on_data=lambda c, p, d: callback_data and callback_data(d), on_result=lambda c, p, r: callback_result and callback_result(r), ) ) # verifies if client re-usage must be enforced and if that's the # case the global client object is requested (singleton) otherwise # the client should be created inside the HTTP client static method http_client = _client_netius(level=level) if reuse else None result = netius.clients.HTTPClient.method_s( method, url, headers=headers, data=data, asynchronous=asynchronous, timeout=timeout, use_file=use_file, http_client=http_client, level=level, **extra ) # if the async mode is defined the result (tuple) is returned immediately # as the processing will be taking place latter (on callback) if asynchronous: return result # tries to retrieve any possible error coming from the result object # if this happens it means an exception has been raised internally and # the error should be handled in a proper manner, if the error is related # to a closed connection a retry may be performed to try to re-establish # the connection (allows for reconnection in connection pool) error = result.get("error", None) if error == "closed" and retry > 0: kwargs["retry"] = retry - 1 return _resolve_netius(url, method, headers, data, silent, timeout, **kwargs) # converts the netius specific result map into a response compatible # object (equivalent to the urllib one) to be used by the upper layers # under an equivalent and compatible approach note that this conversion # may raise an exception in case the result represent an error response = netius.clients.HTTPClient.to_response(result) # retrieves the response code of the created response and verifies if # it represent an error, if that's the case raised an error exception # to the upper layers to break the current execution logic properly code = response.getcode() is_error = _is_error(code) if is_error: raise legacy.HTTPError(url, code, "HTTP retrieval problem", None, response) # returns the final response object to the upper layers, this object # may be used freely under the compatibility interface it provides return response def _client_netius(level=logging.CRITICAL): import netius.clients global _netius_clients # retrieves the reference to the current thread and uses the value # to retrieve the thread identifier (TID) for it, to be used in the # identification of the client resource associated with it tid = threading.current_thread().ident # acquires the global HTTP lock and executes a series of validation # and initialization of the netius client infra-structure, this # operations required thread safety ACCESS_LOCK.acquire() try: registered = "_netius_clients" in globals() _netius_clients = _netius_clients if registered else dict() netius_client = _netius_clients.get(tid, None) finally: ACCESS_LOCK.release() # in case a previously created netius client has been retrieved # returns it to the caller method for proper re-usage if netius_client: return netius_client # creates the "new" HTTP client for the current thread and registers # it under the netius client structure so that it may be re-used netius_client = netius.clients.HTTPClient(auto_release=False) _netius_clients[tid] = netius_client # in case this is the first registration of the dictionary a new on # exit callback is registered to cleanup the netius infra-structure # then the final client is returned to the caller of the method if not registered: common.base().on_exit(_cleanup_netius) return netius_client def _async_netius( callback=None, callback_init=None, callback_open=None, callback_headers=None, callback_data=None, callback_result=None, ): import netius.clients buffer = [] extra = dict() def _on_init(protocol): callback_init and callback_init(protocol) def _on_open(protocol): callback_open and callback_open(protocol) def _on_close(protocol): callback and callback(None) def _on_headers(protocol, parser): callback_headers and callback_headers(parser.headers) def _on_data(protocol, parser, data): data = data data and buffer.append(data) callback_data and callback_data(data) def _on_result(protocol, parser, result): callback_result and callback_result(result) def _callback(protocol, parser, message): result = netius.clients.HTTPProtocol.set_request(parser, buffer) response = netius.clients.HTTPClient.to_response(result) callback and callback(response) extra["callback"] = _callback extra["on_data"] = _on_data if callback_init: extra["_on_init"] = _on_init if callback_open: extra["on_open"] = _on_open if callback: extra["on_close"] = _on_close if callback_headers: extra["on_headers"] = _on_headers if callback_result: extra["on_result"] = _on_result return extra def _cleanup_netius(): global _netius_clients for netius_client in _netius_clients.values(): netius_client.cleanup() del _netius_clients def _parse_url(url): parse = legacy.urlparse(url) scheme = parse.scheme secure = scheme == "https" default = 443 if secure else 80 port = parse.port or default url = parse.scheme + "://" + parse.hostname + ":" + str(port) + parse.path if port in (80, 443): host = parse.hostname else: host = parse.hostname + ":" + str(port) authorization = _authorization(parse.username, parse.password) params = _params(parse.query) return (url, scheme, host, authorization, params) def _result(data, info={}, force=False, strict=False): # tries to retrieve the content type value from the headers # info and verifies if the current data is JSON encoded, so # that it gets automatically decoded for such cases content_type = info.get("Content-Type", None) or "" is_json = ( util.is_content_type( content_type, ("application/json", "text/json", "text/javascript") ) or force ) # verifies if the current result set is JSON encoded and in # case it's decodes it and loads it as JSON otherwise returns # the "raw" data to the caller method as expected, note that # the strict flag is used to determine if the exception should # be re-raised to the upper level in case of value error if is_json and legacy.is_bytes(data): data = data.decode("utf-8") try: data = json.loads(data) if is_json else data except ValueError: if strict: raise return data def _params(query): # creates the dictionary that is going to be used to store the # complete information regarding the parameters in query params = dict() # validates that the provided query value is valid and if # that's not the case returns the created parameters immediately # (empty parameters are returned) if not query: return params # splits the query value around the initial parameter separator # symbol and iterates over each of them to parse them and create # the proper parameters dictionary (of lists) query_s = query.split("&") for part in query_s: parts = part.split("=", 1) if len(parts) == 1: value = "" else: value = parts[1] key = parts[0] key = legacy.unquote_plus(key) value = legacy.unquote_plus(value) param = params.get(key, []) param.append(value) params[key] = param # returns the final parameters dictionary to the caller method # so that it may be used as a proper structure representation return params def _urlencode(values, as_string=True): # creates the list that will hold the final tuple of values # (without the unset and invalid values) final = [] # verifies if the provided value is a sequence and in case it's # not converts it into a sequence (assuming a map) is_sequence = isinstance(values, (list, tuple)) if not is_sequence: values = values.items() # iterates over all the items in the values sequence to # try to filter the values that are not valid for key, value in values: # creates the list that will hold the valid values # of the current key in iteration (sanitized values) _values = [] # in case the current data type of the key is unicode # the value must be converted into a string using the # default utf encoding strategy (as defined) if type(key) == legacy.UNICODE: key = key.encode("utf-8") # verifies the type of the current value and in case # it's sequence based converts it into a list using # the conversion method otherwise creates a new list # and includes the value in it value_t = type(value) if value_t in SEQUENCE_TYPES: value = list(value) else: value = [value] # iterates over all the values in the current sequence # and adds the valid values to the sanitized sequence, # this includes the conversion from unicode string into # a simple string using the default utf encoder for _value in value: if _value == None: continue is_string = type(_value) in legacy.STRINGS if not is_string: _value = str(_value) is_unicode = type(_value) == legacy.UNICODE if is_unicode: _value = _value.encode("utf-8") _values.append(_value) # sets the sanitized list of values as the new value for # the key in the final dictionary of values final.append((key, _values)) # in case the "as string" flag is not set the ended key to value # dictionary should be returned to the called method and not the # "default" linear and string based value if not as_string: return final # runs the encoding with sequence support on the final map # of sanitized values and returns the encoded result to the # caller method as the encoded value return legacy.urlencode(final, doseq=True) def _quote(values, plus=False, safe="/"): method = legacy.quote_plus if plus else legacy.quote values = _urlencode(values, as_string=False) final = dict() for key, value in values.items(): key = method(key, safe=safe) value = method(value[0], safe=safe) final[key] = value return final def _authorization(username, password): if not username: return None if not password: return None payload = "%s:%s" % (username, password) payload = legacy.bytes(payload) authorization = base64.b64encode(payload) authorization = legacy.str(authorization) return authorization def _encode_multipart(fields, mime=None, doseq=False): mime = mime or "multipart/form-data" boundary = _create_boundary(fields, doseq=doseq) boundary_b = legacy.bytes(boundary) buffer = [] for key, values in fields.items(): is_list = doseq and type(values) == list values = values if is_list else [values] for value in values: if value == None: continue if isinstance(value, dict): header_l = [] data = None for key, item in value.items(): if key == "data": data = item else: header_l.append("%s: %s" % (key, item)) value = data header = "\r\n".join(header_l) elif isinstance(value, tuple): content_type = None if len(value) == 2: name, contents = value else: name, content_type, contents = value header = 'Content-Disposition: form-data; name="%s"; filename="%s"' % ( key, name, ) if content_type: header += "\r\nContent-Type: %s" % content_type value = contents else: header = 'Content-Disposition: form-data; name="%s"' % key value = _encode(value) header = _encode(header) value = _encode(value) buffer.append(b"--" + boundary_b) buffer.append(header) buffer.append(b"") buffer.append(value) buffer.append(b"--" + boundary_b + b"--") buffer.append(b"") body = b"\r\n".join(buffer) content_type = "%s; boundary=%s" % (mime, boundary) return content_type, body def _create_boundary(fields, size=32, doseq=False): while True: base = "".join(random.choice(RANGE) for _value in range(size)) boundary = "----------" + base result = _try_boundary(fields, boundary, doseq=doseq) if result: break return boundary def _try_boundary(fields, boundary, doseq=False): boundary_b = legacy.bytes(boundary) for key, values in fields.items(): is_list = doseq and type(values) == list values = values if is_list else [values] for value in values: if isinstance(value, dict): name = "" value = value.get("data", b"") elif isinstance(value, tuple): if len(value) == 2: name = value[0] or "" value = value[1] or b"" else: name = value[0] or "" value = value[2] or b"" else: name = "" value = _encode(value) if not key.find(boundary) == -1: return False if not name.find(boundary) == -1: return False if not value.find(boundary_b) == -1: return False return True def _is_error(code): return code // 100 in (4, 5) if code else True def _encode(value, encoding="utf-8"): value_t = type(value) if value_t == legacy.BYTES: return value elif value_t == legacy.UNICODE: return value.encode(encoding) return legacy.bytes(str(value)) class HTTPResponse(object): """ Compatibility object to be used by HTTP libraries that do not support the legacy HTTP response object as a return for any of their structures. """ def __init__(self, data=None, code=200, status=None, headers=None): self.data = data self.code = code self.status = status self.headers = headers def read(self): return self.data def readline(self): return self.read() def close(self): pass def getcode(self): return self.code def info(self): return self.headers