|
|
"""Logging utilities (placeholder for legacy compatibility).""" |
|
|
import json |
|
|
import logging |
|
|
from uuid import uuid4 |
|
|
from pathlib import Path |
|
|
from threading import Lock |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any, Optional |
|
|
|
|
|
from .config import load_config |
|
|
|
|
|
def save_logs( |
|
|
scheduler=None, |
|
|
json_dataset_path: Path = None, |
|
|
logs_data: Dict[str, Any] = None, |
|
|
feedback: str = None |
|
|
) -> None: |
|
|
""" |
|
|
Save logs (placeholder for legacy compatibility). |
|
|
|
|
|
Args: |
|
|
scheduler: HuggingFace scheduler (not used in refactored version) |
|
|
json_dataset_path: Path to JSON dataset |
|
|
logs_data: Log data dictionary |
|
|
feedback: User feedback |
|
|
|
|
|
Note: |
|
|
This is a placeholder function for backward compatibility. |
|
|
In the refactored version, logging would be handled differently. |
|
|
""" |
|
|
if not is_logging_enabled(): |
|
|
return |
|
|
try: |
|
|
current_time = datetime.now().timestamp() |
|
|
logs_data["time"] = str(current_time) |
|
|
if feedback: |
|
|
logs_data["feedback"] = feedback |
|
|
logs_data["record_id"] = str(uuid4()) |
|
|
field_order = [ |
|
|
"record_id", |
|
|
"session_id", |
|
|
"time", |
|
|
"session_duration_seconds", |
|
|
"client_location", |
|
|
"platform", |
|
|
"system_prompt", |
|
|
"sources", |
|
|
"reports", |
|
|
"subtype", |
|
|
"year", |
|
|
"question", |
|
|
"retriever", |
|
|
"endpoint_type", |
|
|
"reader", |
|
|
"docs", |
|
|
"answer", |
|
|
"feedback" |
|
|
] |
|
|
ordered_logs = {k: logs_data.get(k) for k in field_order if k in logs_data} |
|
|
lock = getattr(scheduler, "lock", None) |
|
|
if lock is None: |
|
|
lock = Lock() |
|
|
with lock: |
|
|
with open(json_dataset_path, 'a') as f: |
|
|
json.dump(ordered_logs, f) |
|
|
f.write("\n") |
|
|
logging.info("logging done") |
|
|
except Exception as e: |
|
|
logging.error(f"Error saving logs: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def setup_logging(log_level: str = "INFO", log_file: str = None) -> None: |
|
|
""" |
|
|
Set up logging configuration. |
|
|
|
|
|
Args: |
|
|
log_level: Logging level |
|
|
log_file: Optional log file path |
|
|
""" |
|
|
if not is_logging_enabled(): |
|
|
return |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=getattr(logging, log_level.upper()), |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler(log_file) if log_file else logging.NullHandler() |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
def log_query_response( |
|
|
query: str, |
|
|
response: str, |
|
|
metadata: Dict[str, Any] = None |
|
|
) -> None: |
|
|
""" |
|
|
Log query and response for analysis. |
|
|
|
|
|
Args: |
|
|
query: User query |
|
|
response: System response |
|
|
metadata: Additional metadata |
|
|
""" |
|
|
if not is_logging_enabled(): |
|
|
return |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
log_entry = { |
|
|
"query": query, |
|
|
"response_length": len(response), |
|
|
"metadata": metadata or {} |
|
|
} |
|
|
|
|
|
logger.info(f"Query processed: {log_entry}") |
|
|
|
|
|
|
|
|
def log_error(error: Exception, context: Dict[str, Any] = None) -> None: |
|
|
""" |
|
|
Log error with context. |
|
|
|
|
|
Args: |
|
|
error: Exception that occurred |
|
|
context: Additional context information |
|
|
""" |
|
|
if not is_logging_enabled(): |
|
|
return |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
error_info = { |
|
|
"error_type": type(error).__name__, |
|
|
"error_message": str(error), |
|
|
"context": context or {} |
|
|
} |
|
|
|
|
|
logger.error(f"Error occurred: {error_info}") |
|
|
|
|
|
|
|
|
def log_performance_metrics( |
|
|
operation: str, |
|
|
duration: float, |
|
|
metadata: Dict[str, Any] = None |
|
|
) -> None: |
|
|
""" |
|
|
Log performance metrics. |
|
|
|
|
|
Args: |
|
|
operation: Name of the operation |
|
|
duration: Duration in seconds |
|
|
metadata: Additional metadata |
|
|
""" |
|
|
if not is_logging_enabled(): |
|
|
return |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
metrics = { |
|
|
"operation": operation, |
|
|
"duration_seconds": duration, |
|
|
"metadata": metadata or {} |
|
|
} |
|
|
|
|
|
logger.info(f"Performance metrics: {metrics}") |
|
|
|
|
|
|
|
|
def is_session_enabled() -> bool: |
|
|
""" |
|
|
Returns True if session management is enabled, False otherwise. |
|
|
Checks environment variable ENABLE_SESSION first, then config. |
|
|
""" |
|
|
env = os.getenv("ENABLE_SESSION") |
|
|
if env is not None: |
|
|
return env.lower() in ("1", "true", "yes", "on") |
|
|
config = load_config() |
|
|
return config.get("features", {}).get("enable_session", True) |
|
|
|
|
|
|
|
|
def is_logging_enabled() -> bool: |
|
|
""" |
|
|
Returns True if logging is enabled, False otherwise. |
|
|
Checks environment variable ENABLE_LOGGING first, then config. |
|
|
""" |
|
|
env = os.getenv("ENABLE_LOGGING") |
|
|
if env is not None: |
|
|
return env.lower() in ("1", "true", "yes", "on") |
|
|
config = load_config() |
|
|
return config.get("features", {}).get("enable_logging", True) |
|
|
|
|
|
|