#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see .
__author__ = "João Magalhães "
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import io
import asyncio
import inspect
import tempfile
from . import util
from . import legacy
from . import exceptions
class ASGIApp(object):
@classmethod
async def asgi_entry(cls, scope, receive, send):
if hasattr(cls, "_asgi") and cls._asgi:
return await cls._asgi.app_asgi(scope, receive, send)
cls._asgi = cls()
return await cls._asgi.app_asgi(scope, receive, send)
def serve_uvicorn(self, host, port, **kwargs):
util.ensure_pip("uvicorn")
import uvicorn
self.server_version = uvicorn.__version__
reload = kwargs.get("reload", False)
forwarded_allow_ips = kwargs.get("forwarded_allow_ips", None)
app_asgi = build_asgi_i(self)
config = uvicorn.Config(
app_asgi,
host=host,
port=port,
reload=reload,
forwarded_allow_ips=forwarded_allow_ips,
)
self._server = uvicorn.Server(config=config)
self._server.run()
def serve_hypercorn(
self, host, port, ssl=False, key_file=None, cer_file=None, **kwargs
):
util.ensure_pip("hypercorn")
import hypercorn.config
import hypercorn.asyncio
self.server_version = hypercorn.__version__
app_asgi = build_asgi_i(self)
config = hypercorn.config.Config()
config.bind = ["%s:%d" % (host, port)]
config.keyfile = key_file if ssl else None
config.certfile = cer_file if ssl else None
server_coro = hypercorn.asyncio.serve(app_asgi, config)
asyncio.run(server_coro)
def serve_daphne(self, host, port, **kwargs):
util.ensure_pip("daphne")
import daphne.server
import daphne.cli
self.server_version = daphne.__version__
app_asgi = build_asgi_i(self)
app_daphne = daphne.cli.ASGI3Middleware(app_asgi)
self._server = daphne.server.Server(
app_daphne, endpoints=["tcp:port=%d:interface=%s" % (port, host)]
)
self._server.run()
async def send(self, data, content_type=None):
if content_type:
self.response.set_content_type(content_type)
return await self.response.send(data)
async def app_asgi(self, *args, **kwargs):
return await self.application_asgi(*args, **kwargs)
async def application_asgi(self, scope, receive, send):
"""
ASGI version of the application entrypoint, should define
the proper asynchronous workflow for an HTTP request handling.
:type scope: Dictionary
:param scope: The connection scope, a dictionary that contains
at least a type key specifying the protocol that is incoming.
:type receive: Coroutine
:param receive: An awaitable callable that will yield a new
event dictionary when one is available.
:type send: Coroutine
:param send: an awaitable callable taking a single event dictionary
as a positional argument that will return once the send has been
completed or the connection has been closed.
:see: https://asgi.readthedocs.io
"""
scope_type = scope.get("type", None)
scope_method = getattr(self, "asgi_" + scope_type, None)
if not scope_method:
raise exceptions.OperationalError(
message="Unexpected scope type '%s'" % scope_type
)
return await scope_method(scope, receive, send)
async def asgi_lifespan(self, scope, receive, send):
running = True
while running:
event = await receive()
if event["type"] == "lifespan.startup":
self.start()
await send(dict(type="lifespan.startup.complete"))
elif event["type"] == "lifespan.shutdown":
self.stop()
await send(dict(type="lifespan.shutdown.complete"))
running = False
async def asgi_http(self, scope, receive, send):
try:
# sets the initials body value, to be replaced by the "real"
# body file instance once it's created
body = None
# creates the context dictionary so that this new "pseudo" request
# can have its own context for futures placement
ctx = dict(start_task=None, encoding="utf-8")
# runs the asynchronous building of the intermediate structures
# to get to the final WSGI compliant environment dictionary
start_response = await self._build_start_response(ctx, send)
sender = await self._build_sender(ctx, send, start_response)
body = await self._build_body(receive)
environ = await self._build_environ(scope, body, sender)
self.prepare()
try:
result = self.application_l(environ, start_response, ensure_gen=False)
self.set_request_ctx()
finally:
self.restore()
# verifies if the resulting value is an awaitable and if
# that's the case waits for it's "real" result value (async)
if inspect.isawaitable(result):
result = await result # @UndefinedVariable
# waits for the start (code and headers) send operation to be
# completed (async) so that we can proceed with body sending
self._ensure_start(ctx, start_response)
await ctx["start_task"]
# iterates over the complete set of chunks in the response
# iterator to send each of them to the client side
for chunk in result if result else []:
if asyncio.iscoroutine(chunk):
await chunk
elif asyncio.isfuture(chunk):
await chunk
elif isinstance(chunk, int):
continue
else:
if legacy.is_string(chunk):
chunk = chunk.encode(ctx["encoding"])
await send(
{"type": "http.response.body", "body": chunk, "more_body": True}
)
# sends the final empty chunk indicating the end
# of the body payload to the "owning" server
await send({"type": "http.response.body", "body": b""})
finally:
if body:
body.close()
self.unset_request_ctx()
async def _build_start_response(self, ctx, send):
def start_response(status, headers):
if ctx["start_task"]:
return
code = status.split(" ", 1)[0]
code = int(code)
headers = [
(name.lower().encode("ascii"), value.encode("ascii"))
for name, value in headers
]
send_coro = send(
{"type": "http.response.start", "status": code, "headers": headers}
)
ctx["start_task"] = asyncio.create_task(send_coro)
return start_response
async def _build_sender(self, ctx, send, start_response):
async def sender(data):
self._ensure_start(ctx, start_response)
await ctx["start_task"]
if legacy.is_string(data):
data = data.encode(ctx["encoding"])
return await send(
{"type": "http.response.body", "body": data, "more_body": True}
)
return sender
async def _build_body(self, receive, max_size=65536):
body = tempfile.SpooledTemporaryFile(max_size=max_size)
while True:
message = await receive()
util.verify(message["type"] == "http.request")
body.write(message.get("body", b""))
if not message.get("more_body"):
break
body.seek(0)
return body
async def _build_environ(self, scope, body, sender):
"""
Builds a scope and request body into a WSGI environ object.
:type scope: Dictionary
:param scope: The scope dictionary from ASGI.
:type: body: File
:param body: The body callable to be used for the reading
of the input.
:type sender: Function
:param sender: The sender function responsible for the sending
of data to the client side (response).
:rtype: Dictionary
:return: The WSGI compatible environ dictionary converted
from ASGI and ready to be used by WSGI apps.
"""
environ = {
"REQUEST_METHOD": scope["method"],
"SCRIPT_NAME": scope.get("root_path", ""),
"PATH_INFO": scope["path"],
"QUERY_STRING": scope["query_string"].decode("ascii"),
"SERVER_PROTOCOL": "HTTP/%s" % scope["http_version"],
"wsgi.version": (1, 0),
"wsgi.url_scheme": scope.get("scheme", "http"),
"wsgi.input": body,
"wsgi.output": sender,
"wsgi.errors": io.BytesIO(),
"wsgi.multithread": True,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "server" in scope:
environ["SERVER_NAME"] = scope["server"][0]
environ["SERVER_PORT"] = str(scope["server"][1])
else:
environ["SERVER_NAME"] = "localhost"
environ["SERVER_PORT"] = "80"
if "client" in scope:
environ["REMOTE_ADDR"] = scope["client"][0]
for name, value in scope.get("headers", []):
name = name.decode("latin1")
if name == "content-length":
corrected_name = "CONTENT_LENGTH"
elif name == "content-type":
corrected_name = "CONTENT_TYPE"
else:
corrected_name = "HTTP_%s" % name.upper().replace("-", "_")
value = value.decode("latin1")
if corrected_name in environ:
value = environ[corrected_name] + "," + value
environ[corrected_name] = value
return environ
def _ensure_start(self, ctx, start_response):
if ctx["start_task"]:
return
self.request_ctx.set_headers_b()
code_s = self.request_ctx.get_code_s()
headers = self.request_ctx.get_headers() or []
if self.sort_headers:
headers.sort()
start_response(code_s, headers)
def build_asgi(app_cls):
async def app_asgi(scope, receive, send):
return await app_cls.asgi_entry(scope, receive, send)
return app_asgi
def build_asgi_i(app):
async def app_asgi(scope, receive, send):
return await app.app_asgi(scope, receive, send)
return app_asgi