Spaces:
Paused
Paused
| """ | |
| Centralized Logging System for Flare Platform | |
| """ | |
| import sys | |
| import logging | |
| import json | |
| import os | |
| import threading | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import Optional, Dict, Any | |
| from pathlib import Path | |
| class LogLevel(Enum): | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class FlareLogger: | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self._initialized = True | |
| # Log level from environment | |
| self.log_level = LogLevel[os.getenv('LOG_LEVEL', 'INFO')] | |
| # Configure Python logging | |
| self.logger = logging.getLogger('flare') | |
| self.logger.setLevel(self.log_level.value) | |
| # Remove default handlers | |
| self.logger.handlers = [] | |
| # Console handler with custom format | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| console_handler.setFormatter(self._get_formatter()) | |
| self.logger.addHandler(console_handler) | |
| # File handler for production | |
| if os.getenv('LOG_TO_FILE', 'false').lower() == 'true': | |
| log_dir = Path('logs') | |
| log_dir.mkdir(exist_ok=True) | |
| file_handler = logging.FileHandler( | |
| log_dir / f"flare_{datetime.now().strftime('%Y%m%d')}.log" | |
| ) | |
| file_handler.setFormatter(self._get_formatter()) | |
| self.logger.addHandler(file_handler) | |
| # Future: Add ElasticSearch handler here | |
| # if os.getenv('ELASTICSEARCH_URL'): | |
| # from elasticsearch_handler import ElasticsearchHandler | |
| # es_handler = ElasticsearchHandler( | |
| # hosts=[os.getenv('ELASTICSEARCH_URL')], | |
| # index='flare-logs' | |
| # ) | |
| # self.logger.addHandler(es_handler) | |
| def _get_formatter(self): | |
| return logging.Formatter( | |
| '[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s] %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| def log(self, level: LogLevel, message: str, **kwargs): | |
| """Central logging method with structured data""" | |
| # Add context data | |
| extra_data = { | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'service': 'flare', | |
| 'thread_id': threading.get_ident(), | |
| **kwargs | |
| } | |
| # Log with structured data | |
| log_message = message | |
| if kwargs: | |
| # Format kwargs for readability | |
| kwargs_str = json.dumps(kwargs, ensure_ascii=False, default=str) | |
| log_message = f"{message} | {kwargs_str}" | |
| getattr(self.logger, level.value.lower())(log_message, extra={'data': extra_data}) | |
| # Always flush for real-time debugging | |
| sys.stdout.flush() | |
| def debug(self, message: str, **kwargs): | |
| """Log debug message""" | |
| self.log(LogLevel.DEBUG, message, **kwargs) | |
| def info(self, message: str, **kwargs): | |
| """Log info message""" | |
| self.log(LogLevel.INFO, message, **kwargs) | |
| def warning(self, message: str, **kwargs): | |
| """Log warning message""" | |
| self.log(LogLevel.WARNING, message, **kwargs) | |
| def error(self, message: str, **kwargs): | |
| """Log error message""" | |
| self.log(LogLevel.ERROR, message, **kwargs) | |
| def critical(self, message: str, **kwargs): | |
| """Log critical message""" | |
| self.log(LogLevel.CRITICAL, message, **kwargs) | |
| def set_level(self, level: str): | |
| """Dynamically change log level""" | |
| try: | |
| self.log_level = LogLevel[level.upper()] | |
| self.logger.setLevel(self.log_level.value) | |
| self.info(f"Log level changed to {level}") | |
| except KeyError: | |
| self.warning(f"Invalid log level: {level}") | |
| # Global logger instance | |
| logger = FlareLogger() | |
| # Convenience functions | |
| def log_debug(message: str, **kwargs): | |
| """Log debug message""" | |
| logger.debug(message, **kwargs) | |
| def log_info(message: str, **kwargs): | |
| """Log info message""" | |
| logger.info(message, **kwargs) | |
| def log_warning(message: str, **kwargs): | |
| """Log warning message""" | |
| logger.warning(message, **kwargs) | |
| def log_error(message: str, **kwargs): | |
| """Log error message""" | |
| logger.error(message, **kwargs) | |
| def log_critical(message: str, **kwargs): | |
| """Log critical message""" | |
| logger.critical(message, **kwargs) | |
| # Backward compatibility | |
| def log(message: str, level: str = "INFO", **kwargs): | |
| """Legacy log function for compatibility""" | |
| getattr(logger, level.lower())(message, **kwargs) | |
| # Performance logging helpers | |
| class LogTimer: | |
| """Context manager for timing operations""" | |
| def __init__(self, operation_name: str, **extra_kwargs): | |
| self.operation_name = operation_name | |
| self.extra_kwargs = extra_kwargs | |
| self.start_time = None | |
| def __enter__(self): | |
| self.start_time = datetime.now() | |
| log_debug(f"Starting {self.operation_name}", **self.extra_kwargs) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| duration_ms = (datetime.now() - self.start_time).total_seconds() * 1000 | |
| if exc_type: | |
| log_error( | |
| f"{self.operation_name} failed after {duration_ms:.2f}ms", | |
| error=str(exc_val), | |
| duration_ms=duration_ms, | |
| **self.extra_kwargs | |
| ) | |
| else: | |
| log_info( | |
| f"{self.operation_name} completed in {duration_ms:.2f}ms", | |
| duration_ms=duration_ms, | |
| **self.extra_kwargs | |
| ) | |