| """
|
| Logging Configuration Module.
|
|
|
| Provides structured logging for:
|
| - API request/response logging
|
| - Detection event logging
|
| - Error logging
|
| - Audit logging
|
| """
|
|
|
| import logging
|
| import sys
|
| from typing import Optional, Dict, Any
|
| from datetime import datetime
|
|
|
|
|
|
|
| LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
|
| LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
|
|
|
| def setup_logging(
|
| level: str = "INFO",
|
| log_format: Optional[str] = None,
|
| log_file: Optional[str] = None,
|
| ) -> None:
|
| """
|
| Configure logging for the application.
|
|
|
| Args:
|
| level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
| log_format: Custom log format string
|
| log_file: Optional file path for file logging
|
| """
|
|
|
| log_level = getattr(logging, level.upper(), logging.INFO)
|
|
|
|
|
| handlers = []
|
|
|
|
|
| console_handler = logging.StreamHandler(sys.stdout)
|
| console_handler.setLevel(log_level)
|
| console_handler.setFormatter(logging.Formatter(
|
| log_format or LOG_FORMAT,
|
| LOG_DATE_FORMAT,
|
| ))
|
| handlers.append(console_handler)
|
|
|
|
|
| if log_file:
|
| file_handler = logging.FileHandler(log_file)
|
| file_handler.setLevel(log_level)
|
| file_handler.setFormatter(logging.Formatter(
|
| log_format or LOG_FORMAT,
|
| LOG_DATE_FORMAT,
|
| ))
|
| handlers.append(file_handler)
|
|
|
|
|
| logging.basicConfig(
|
| level=log_level,
|
| format=log_format or LOG_FORMAT,
|
| datefmt=LOG_DATE_FORMAT,
|
| handlers=handlers,
|
| )
|
|
|
|
|
| def get_logger(name: str) -> logging.Logger:
|
| """
|
| Get a logger instance.
|
|
|
| Args:
|
| name: Logger name (typically __name__)
|
|
|
| Returns:
|
| Logger instance
|
| """
|
| return logging.getLogger(name)
|
|
|
|
|
| class RequestLogger:
|
| """Logger for API requests."""
|
|
|
| def __init__(self) -> None:
|
| """Initialize request logger."""
|
| self.logger = get_logger("scamshield.request")
|
|
|
| def log_request(
|
| self,
|
| method: str,
|
| path: str,
|
| client_ip: Optional[str] = None,
|
| request_id: Optional[str] = None,
|
| ) -> None:
|
| """
|
| Log incoming API request.
|
|
|
| Args:
|
| method: HTTP method
|
| path: Request path
|
| client_ip: Client IP address
|
| request_id: Unique request identifier
|
| """
|
| self.logger.info(
|
| f"REQUEST | {method} {path} | client={client_ip} | request_id={request_id}"
|
| )
|
|
|
| def log_response(
|
| self,
|
| status_code: int,
|
| duration_ms: int,
|
| request_id: Optional[str] = None,
|
| ) -> None:
|
| """
|
| Log API response.
|
|
|
| Args:
|
| status_code: HTTP status code
|
| duration_ms: Response time in milliseconds
|
| request_id: Request identifier
|
| """
|
| self.logger.info(
|
| f"RESPONSE | status={status_code} | duration={duration_ms}ms | request_id={request_id}"
|
| )
|
|
|
|
|
| class DetectionLogger:
|
| """Logger for scam detection events."""
|
|
|
| def __init__(self) -> None:
|
| """Initialize detection logger."""
|
| self.logger = get_logger("scamshield.detection")
|
|
|
| def log_detection(
|
| self,
|
| session_id: str,
|
| scam_detected: bool,
|
| confidence: float,
|
| language: str,
|
| indicators: Optional[list] = None,
|
| ) -> None:
|
| """
|
| Log scam detection event.
|
|
|
| Args:
|
| session_id: Session identifier
|
| scam_detected: Detection result
|
| confidence: Detection confidence
|
| language: Detected language
|
| indicators: Matched scam indicators
|
| """
|
| result = "SCAM" if scam_detected else "LEGITIMATE"
|
| self.logger.info(
|
| f"DETECTION | session={session_id} | result={result} | "
|
| f"confidence={confidence:.2f} | language={language} | "
|
| f"indicators={indicators}"
|
| )
|
|
|
| def log_extraction(
|
| self,
|
| session_id: str,
|
| intel_summary: Dict[str, int],
|
| confidence: float,
|
| ) -> None:
|
| """
|
| Log intelligence extraction event.
|
|
|
| Args:
|
| session_id: Session identifier
|
| intel_summary: Summary of extracted entities
|
| confidence: Extraction confidence
|
| """
|
| self.logger.info(
|
| f"EXTRACTION | session={session_id} | "
|
| f"entities={intel_summary} | confidence={confidence:.2f}"
|
| )
|
|
|
|
|
| class AuditLogger:
|
| """Logger for audit events."""
|
|
|
| def __init__(self) -> None:
|
| """Initialize audit logger."""
|
| self.logger = get_logger("scamshield.audit")
|
|
|
| def log_event(
|
| self,
|
| event_type: str,
|
| details: Dict[str, Any],
|
| session_id: Optional[str] = None,
|
| ) -> None:
|
| """
|
| Log audit event.
|
|
|
| Args:
|
| event_type: Type of event
|
| details: Event details
|
| session_id: Session identifier if applicable
|
| """
|
| timestamp = datetime.utcnow().isoformat()
|
| self.logger.info(
|
| f"AUDIT | time={timestamp} | type={event_type} | "
|
| f"session={session_id} | details={details}"
|
| )
|
|
|
|
|
|
|
| request_logger = RequestLogger()
|
| detection_logger = DetectionLogger()
|
| audit_logger = AuditLogger()
|
|
|