Spaces:
Running
Running
import asyncio | |
import logging | |
import signal | |
from gunicorn import glogging | |
from gunicorn.app.base import BaseApplication | |
from uvicorn.workers import UvicornWorker | |
from langflow.logging.logger import InterceptHandler | |
class LangflowUvicornWorker(UvicornWorker): | |
CONFIG_KWARGS = {"loop": "asyncio"} | |
def _install_sigint_handler(self) -> None: | |
"""Install a SIGQUIT handler on workers. | |
- https://github.com/encode/uvicorn/issues/1116 | |
- https://github.com/benoitc/gunicorn/issues/2604 | |
""" | |
loop = asyncio.get_running_loop() | |
loop.add_signal_handler(signal.SIGINT, self.handle_exit, signal.SIGINT, None) | |
async def _serve(self) -> None: | |
# We do this to not log the "Worker (pid:XXXXX) was sent SIGINT" | |
self._install_sigint_handler() | |
await super()._serve() | |
class Logger(glogging.Logger): | |
"""Implements and overrides the gunicorn logging interface. | |
This class inherits from the standard gunicorn logger and overrides it by | |
replacing the handlers with `InterceptHandler` in order to route the | |
gunicorn logs to loguru. | |
""" | |
def __init__(self, cfg) -> None: | |
super().__init__(cfg) | |
logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] | |
logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] | |
class LangflowApplication(BaseApplication): | |
def __init__(self, app, options=None) -> None: | |
self.options = options or {} | |
self.options["worker_class"] = "langflow.server.LangflowUvicornWorker" | |
self.options["logger_class"] = Logger | |
self.application = app | |
super().__init__() | |
def load_config(self) -> None: | |
config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} | |
for key, value in config.items(): | |
self.cfg.set(key.lower(), value) | |
def load(self): | |
return self.application | |