import asyncio import json import logging import os import sys from collections import deque from pathlib import Path from threading import Lock, Semaphore from typing import TypedDict import orjson from loguru import _defaults, logger from loguru._error_interceptor import ErrorInterceptor from loguru._file_sink import FileSink from loguru._simple_sinks import AsyncSink from platformdirs import user_cache_dir from rich.logging import RichHandler from typing_extensions import NotRequired from langflow.settings import DEV VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] # Human-readable DEFAULT_LOG_FORMAT = ( "{time:YYYY-MM-DD HH:mm:ss} - " "{level: <8} - {module} - {message}" ) class SizedLogBuffer: def __init__( self, max_readers: int = 20, # max number of concurrent readers for the buffer ): """A buffer for storing log messages for the log retrieval API. The buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE because the logger is initialized before the settings_service are loaded. """ self.buffer: deque = deque() self._max_readers = max_readers self._wlock = Lock() self._rsemaphore = Semaphore(max_readers) self._max = 0 def get_write_lock(self) -> Lock: return self._wlock def write(self, message: str) -> None: record = json.loads(message) log_entry = record["text"] epoch = int(record["record"]["time"]["timestamp"] * 1000) with self._wlock: if len(self.buffer) >= self.max: for _ in range(len(self.buffer) - self.max + 1): self.buffer.popleft() self.buffer.append((epoch, log_entry)) def __len__(self) -> int: return len(self.buffer) def get_after_timestamp(self, timestamp: int, lines: int = 5) -> dict[int, str]: rc = {} self._rsemaphore.acquire() try: with self._wlock: for ts, msg in self.buffer: if lines == 0: break if ts >= timestamp and lines > 0: rc[ts] = msg lines -= 1 finally: self._rsemaphore.release() return rc def get_before_timestamp(self, timestamp: int, lines: int = 5) -> dict[int, str]: self._rsemaphore.acquire() try: with self._wlock: as_list = list(self.buffer) max_index = -1 for i, (ts, _) in enumerate(as_list): if ts >= timestamp: max_index = i break if max_index == -1: return self.get_last_n(lines) rc = {} start_from = max(max_index - lines, 0) for i, (ts, msg) in enumerate(as_list): if start_from <= i < max_index: rc[ts] = msg return rc finally: self._rsemaphore.release() def get_last_n(self, last_idx: int) -> dict[int, str]: self._rsemaphore.acquire() try: with self._wlock: as_list = list(self.buffer) return dict(as_list[-last_idx:]) finally: self._rsemaphore.release() @property def max(self) -> int: # Get it dynamically to allow for env variable changes if self._max == 0: env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0") if env_buffer_size.isdigit(): self._max = int(env_buffer_size) return self._max @max.setter def max(self, value: int) -> None: self._max = value def enabled(self) -> bool: return self.max > 0 def max_size(self) -> int: return self.max # log buffer for capturing log messages log_buffer = SizedLogBuffer() def serialize_log(record): subset = { "timestamp": record["time"].timestamp(), "message": record["message"], "level": record["level"].name, "module": record["module"], } return orjson.dumps(subset) def patching(record) -> None: record["extra"]["serialized"] = serialize_log(record) if DEV is False: record.pop("exception", None) class LogConfig(TypedDict): log_level: NotRequired[str] log_file: NotRequired[Path] disable: NotRequired[bool] log_env: NotRequired[str] log_format: NotRequired[str] class AsyncFileSink(AsyncSink): def __init__(self, file): self._sink = FileSink( path=file, rotation="10 MB", # Log rotation based on file size ) super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1)) async def complete(self): await asyncio.to_thread(self._sink.stop) for task in self._tasks: await self._complete_task(task) async def write_async(self, message): await asyncio.to_thread(self._sink.write, message) def is_valid_log_format(format_string) -> bool: """Validates a logging format string by attempting to format it with a dummy LogRecord. Args: format_string (str): The format string to validate. Returns: bool: True if the format string is valid, False otherwise. """ record = logging.LogRecord( name="dummy", level=logging.INFO, pathname="dummy_path", lineno=0, msg="dummy message", args=None, exc_info=None ) formatter = logging.Formatter(format_string) try: # Attempt to format the record formatter.format(record) except (KeyError, ValueError, TypeError): logger.error("Invalid log format string passed, fallback to default") return False return True def configure( *, log_level: str | None = None, log_file: Path | None = None, disable: bool | None = False, log_env: str | None = None, log_format: str | None = None, async_file: bool = False, ) -> None: if disable and log_level is None and log_file is None: logger.disable("langflow") if os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS and log_level is None: log_level = os.getenv("LANGFLOW_LOG_LEVEL") if log_level is None: log_level = "ERROR" if log_file is None: env_log_file = os.getenv("LANGFLOW_LOG_FILE", "") log_file = Path(env_log_file) if env_log_file else None if log_env is None: log_env = os.getenv("LANGFLOW_LOG_ENV", "") logger.remove() # Remove default handlers logger.patch(patching) if log_env.lower() == "container" or log_env.lower() == "container_json": logger.add(sys.stdout, format="{message}", serialize=True) elif log_env.lower() == "container_csv": logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} {level} {file} {line} {function} {message}") else: if os.getenv("LANGFLOW_LOG_FORMAT") and log_format is None: log_format = os.getenv("LANGFLOW_LOG_FORMAT") if log_format is None or not is_valid_log_format(log_format): log_format = DEFAULT_LOG_FORMAT # Configure loguru to use RichHandler logger.configure( handlers=[ { "sink": RichHandler(rich_tracebacks=True, markup=True), "format": log_format, "level": log_level.upper(), } ] ) if not log_file: cache_dir = Path(user_cache_dir("langflow")) logger.debug(f"Cache directory: {cache_dir}") log_file = cache_dir / "langflow.log" logger.debug(f"Log file: {log_file}") try: log_file.parent.mkdir(parents=True, exist_ok=True) logger.add( sink=AsyncFileSink(log_file) if async_file else log_file, level=log_level.upper(), format=log_format, serialize=True, ) except Exception: # noqa: BLE001 logger.exception("Error setting up log file") if log_buffer.enabled(): logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True) logger.debug(f"Logger set up with log level: {log_level}") setup_uvicorn_logger() setup_gunicorn_logger() def setup_uvicorn_logger() -> None: loggers = (logging.getLogger(name) for name in logging.root.manager.loggerDict if name.startswith("uvicorn.")) for uvicorn_logger in loggers: uvicorn_logger.handlers = [] logging.getLogger("uvicorn").handlers = [InterceptHandler()] def setup_gunicorn_logger() -> None: logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] class InterceptHandler(logging.Handler): """Default handler from examples in loguru documentation. See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging. """ def emit(self, record) -> None: # Get corresponding Loguru level if it exists try: level = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__ and frame.f_back: frame = frame.f_back depth += 1 logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())