import asyncio import logging import signal from gunicorn import glogging from gunicorn.app.base import BaseApplication from uvicorn.workers import UvicornWorker from langflow.logging.logger import InterceptHandler class LangflowUvicornWorker(UvicornWorker): CONFIG_KWARGS = {"loop": "asyncio"} def _install_sigint_handler(self) -> None: """Install a SIGQUIT handler on workers. - https://github.com/encode/uvicorn/issues/1116 - https://github.com/benoitc/gunicorn/issues/2604 """ loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGINT, self.handle_exit, signal.SIGINT, None) async def _serve(self) -> None: # We do this to not log the "Worker (pid:XXXXX) was sent SIGINT" self._install_sigint_handler() await super()._serve() class Logger(glogging.Logger): """Implements and overrides the gunicorn logging interface. This class inherits from the standard gunicorn logger and overrides it by replacing the handlers with `InterceptHandler` in order to route the gunicorn logs to loguru. """ def __init__(self, cfg) -> None: super().__init__(cfg) logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] class LangflowApplication(BaseApplication): def __init__(self, app, options=None) -> None: self.options = options or {} self.options["worker_class"] = "langflow.server.LangflowUvicornWorker" self.options["logger_class"] = Logger self.application = app super().__init__() def load_config(self) -> None: config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application