HINTECH / core /logger.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
Logger Module
Centralized logging system for the virtual ISP stack:
- Structured logging with multiple levels
- Log aggregation and filtering
- Real-time log streaming
- Log persistence and rotation
"""
import logging
import logging.handlers
import time
import threading
import json
import os
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
from collections import deque
import queue
class LogLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class LogCategory(Enum):
SYSTEM = "SYSTEM"
DHCP = "DHCP"
NAT = "NAT"
FIREWALL = "FIREWALL"
TCP = "TCP"
ROUTER = "ROUTER"
BRIDGE = "BRIDGE"
SOCKET = "SOCKET"
SESSION = "SESSION"
SECURITY = "SECURITY"
PERFORMANCE = "PERFORMANCE"
@dataclass
class LogEntry:
"""Structured log entry"""
timestamp: float
level: str
category: str
module: str
message: str
session_id: Optional[str] = None
client_id: Optional[str] = None
source_ip: Optional[str] = None
dest_ip: Optional[str] = None
protocol: Optional[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.timestamp == 0:
self.timestamp = time.time()
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> Dict:
"""Convert to dictionary"""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string"""
return json.dumps(self.to_dict(), default=str)
class LogFilter:
"""Log filtering class"""
def __init__(self):
self.level_filter: Optional[LogLevel] = None
self.category_filter: Optional[LogCategory] = None
self.module_filter: Optional[str] = None
self.session_filter: Optional[str] = None
self.client_filter: Optional[str] = None
self.ip_filter: Optional[str] = None
self.text_filter: Optional[str] = None
self.time_range: Optional[tuple] = None
def matches(self, entry: LogEntry) -> bool:
"""Check if log entry matches filter criteria"""
# Level filter
if self.level_filter:
entry_level_value = getattr(logging, entry.level)
filter_level_value = getattr(logging, self.level_filter.value)
if entry_level_value < filter_level_value:
return False
# Category filter
if self.category_filter and entry.category != self.category_filter.value:
return False
# Module filter
if self.module_filter and self.module_filter.lower() not in entry.module.lower():
return False
# Session filter
if self.session_filter and entry.session_id != self.session_filter:
return False
# Client filter
if self.client_filter and entry.client_id != self.client_filter:
return False
# IP filter
if self.ip_filter:
if (entry.source_ip != self.ip_filter and
entry.dest_ip != self.ip_filter):
return False
# Text filter
if self.text_filter and self.text_filter.lower() not in entry.message.lower():
return False
# Time range filter
if self.time_range:
start_time, end_time = self.time_range
if not (start_time <= entry.timestamp <= end_time):
return False
return True
class LogSubscriber:
"""Log subscriber for real-time streaming"""
def __init__(self, subscriber_id: str, callback: Callable[[LogEntry], None],
log_filter: Optional[LogFilter] = None):
self.subscriber_id = subscriber_id
self.callback = callback
self.filter = log_filter or LogFilter()
self.created_time = time.time()
self.message_count = 0
self.last_message_time = None
self.is_active = True
def send_log(self, entry: LogEntry) -> bool:
"""Send log entry to subscriber if it matches filter"""
if not self.is_active:
return False
if self.filter.matches(entry):
try:
self.callback(entry)
self.message_count += 1
self.last_message_time = time.time()
return True
except Exception as e:
print(f"Error sending log to subscriber {self.subscriber_id}: {e}")
self.is_active = False
return False
return False
class VirtualISPLogger:
"""Centralized logger for Virtual ISP stack"""
def __init__(self, config: Dict):
self.config = config
self.log_entries: deque = deque(maxlen=config.get('max_memory_logs', 10000))
self.subscribers: Dict[str, LogSubscriber] = {}
self.lock = threading.Lock()
# Configuration
self.log_level = LogLevel(config.get('log_level', 'INFO'))
self.log_to_file = config.get('log_to_file', True)
self.log_file_path = config.get('log_file_path', '/tmp/virtual_isp.log')
self.log_file_max_size = config.get('log_file_max_size', 10 * 1024 * 1024) # 10MB
self.log_file_backup_count = config.get('log_file_backup_count', 5)
self.log_to_console = config.get('log_to_console', True)
self.structured_logging = config.get('structured_logging', True)
# Statistics
self.stats = {
'total_logs': 0,
'logs_by_level': {level.value: 0 for level in LogLevel},
'logs_by_category': {cat.value: 0 for cat in LogCategory},
'active_subscribers': 0,
'file_logs_written': 0,
'console_logs_written': 0,
'dropped_logs': 0
}
# Setup logging
self._setup_logging()
# Background processing
self.running = False
self.log_queue = queue.Queue()
self.processing_thread = None
def _setup_logging(self):
"""Setup Python logging infrastructure"""
# Create logger
self.logger = logging.getLogger('virtual_isp')
self.logger.setLevel(getattr(logging, self.log_level.value))
# Remove existing handlers
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Console handler
if self.log_to_console:
console_handler = logging.StreamHandler()
if self.structured_logging:
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
else:
console_formatter = logging.Formatter('%(message)s')
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
# File handler with rotation
if self.log_to_file:
# Ensure log directory exists
log_dir = os.path.dirname(self.log_file_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.handlers.RotatingFileHandler(
self.log_file_path,
maxBytes=self.log_file_max_size,
backupCount=self.log_file_backup_count
)
if self.structured_logging:
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
else:
file_formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
def _process_log_queue(self):
"""Background thread to process log queue"""
while self.running:
try:
# Get log entry from queue (with timeout)
try:
entry = self.log_queue.get(timeout=1.0)
except queue.Empty:
continue
# Store in memory
with self.lock:
self.log_entries.append(entry)
# Send to subscribers
inactive_subscribers = []
with self.lock:
for subscriber_id, subscriber in self.subscribers.items():
if not subscriber.send_log(entry):
inactive_subscribers.append(subscriber_id)
# Remove inactive subscribers
for subscriber_id in inactive_subscribers:
self.remove_subscriber(subscriber_id)
# Update statistics
self.stats['total_logs'] += 1
self.stats['logs_by_level'][entry.level] += 1
self.stats['logs_by_category'][entry.category] += 1
# Mark task as done
self.log_queue.task_done()
except Exception as e:
print(f"Error processing log queue: {e}")
time.sleep(1)
def log(self, level: LogLevel, category: LogCategory, module: str, message: str,
session_id: Optional[str] = None, client_id: Optional[str] = None,
source_ip: Optional[str] = None, dest_ip: Optional[str] = None,
protocol: Optional[str] = None, **metadata):
"""Log a message"""
# Check if we should log this level
level_value = getattr(logging, level.value)
min_level_value = getattr(logging, self.log_level.value)
if level_value < min_level_value:
return
# Create log entry
entry = LogEntry(
timestamp=time.time(),
level=level.value,
category=category.value,
module=module,
message=message,
session_id=session_id,
client_id=client_id,
source_ip=source_ip,
dest_ip=dest_ip,
protocol=protocol,
metadata=metadata
)
# Add to queue for background processing
try:
self.log_queue.put_nowait(entry)
except queue.Full:
self.stats['dropped_logs'] += 1
# Also log through Python logging system
if self.structured_logging:
log_data = entry.to_dict()
log_message = f"{message} | {json.dumps(log_data, default=str)}"
else:
log_message = message
# Log to Python logger
python_logger_level = getattr(logging, level.value)
self.logger.log(python_logger_level, log_message)
# Update console/file stats
if self.log_to_console:
self.stats['console_logs_written'] += 1
if self.log_to_file:
self.stats['file_logs_written'] += 1
def debug(self, category: LogCategory, module: str, message: str, **kwargs):
"""Log debug message"""
self.log(LogLevel.DEBUG, category, module, message, **kwargs)
def info(self, category: LogCategory, module: str, message: str, **kwargs):
"""Log info message"""
self.log(LogLevel.INFO, category, module, message, **kwargs)
def warning(self, category: LogCategory, module: str, message: str, **kwargs):
"""Log warning message"""
self.log(LogLevel.WARNING, category, module, message, **kwargs)
def error(self, category: LogCategory, module: str, message: str, **kwargs):
"""Log error message"""
self.log(LogLevel.ERROR, category, module, message, **kwargs)
def critical(self, category: LogCategory, module: str, message: str, **kwargs):
"""Log critical message"""
self.log(LogLevel.CRITICAL, category, module, message, **kwargs)
def add_subscriber(self, subscriber_id: str, callback: Callable[[LogEntry], None],
log_filter: Optional[LogFilter] = None) -> bool:
"""Add log subscriber for real-time streaming"""
with self.lock:
if subscriber_id in self.subscribers:
return False
subscriber = LogSubscriber(subscriber_id, callback, log_filter)
self.subscribers[subscriber_id] = subscriber
self.stats['active_subscribers'] = len(self.subscribers)
return True
def remove_subscriber(self, subscriber_id: str) -> bool:
"""Remove log subscriber"""
with self.lock:
if subscriber_id in self.subscribers:
del self.subscribers[subscriber_id]
self.stats['active_subscribers'] = len(self.subscribers)
return True
return False
def get_logs(self, limit: int = 100, offset: int = 0,
log_filter: Optional[LogFilter] = None) -> List[Dict]:
"""Get logs with filtering and pagination"""
with self.lock:
# Convert deque to list for easier manipulation
all_logs = list(self.log_entries)
# Apply filter
if log_filter:
filtered_logs = [entry for entry in all_logs if log_filter.matches(entry)]
else:
filtered_logs = all_logs
# Sort by timestamp (newest first)
filtered_logs.sort(key=lambda x: x.timestamp, reverse=True)
# Apply pagination
paginated_logs = filtered_logs[offset:offset + limit]
return [entry.to_dict() for entry in paginated_logs]
def search_logs(self, query: str, limit: int = 100) -> List[Dict]:
"""Search logs by text query"""
log_filter = LogFilter()
log_filter.text_filter = query
return self.get_logs(limit=limit, log_filter=log_filter)
def get_logs_by_session(self, session_id: str, limit: int = 100) -> List[Dict]:
"""Get logs for specific session"""
log_filter = LogFilter()
log_filter.session_filter = session_id
return self.get_logs(limit=limit, log_filter=log_filter)
def get_logs_by_client(self, client_id: str, limit: int = 100) -> List[Dict]:
"""Get logs for specific client"""
log_filter = LogFilter()
log_filter.client_filter = client_id
return self.get_logs(limit=limit, log_filter=log_filter)
def get_logs_by_ip(self, ip_address: str, limit: int = 100) -> List[Dict]:
"""Get logs for specific IP address"""
log_filter = LogFilter()
log_filter.ip_filter = ip_address
return self.get_logs(limit=limit, log_filter=log_filter)
def get_recent_errors(self, limit: int = 50) -> List[Dict]:
"""Get recent error and critical logs"""
log_filter = LogFilter()
log_filter.level_filter = LogLevel.ERROR
return self.get_logs(limit=limit, log_filter=log_filter)
def clear_logs(self):
"""Clear all logs from memory"""
with self.lock:
self.log_entries.clear()
def get_stats(self) -> Dict:
"""Get logging statistics"""
with self.lock:
stats = self.stats.copy()
stats['memory_logs_count'] = len(self.log_entries)
stats['active_subscribers'] = len(self.subscribers)
stats['queue_size'] = self.log_queue.qsize()
return stats
def reset_stats(self):
"""Reset logging statistics"""
self.stats = {
'total_logs': 0,
'logs_by_level': {level.value: 0 for level in LogLevel},
'logs_by_category': {cat.value: 0 for cat in LogCategory},
'active_subscribers': len(self.subscribers),
'file_logs_written': 0,
'console_logs_written': 0,
'dropped_logs': 0
}
def export_logs(self, format: str = 'json', log_filter: Optional[LogFilter] = None) -> str:
"""Export logs in specified format"""
logs = self.get_logs(limit=10000, log_filter=log_filter)
if format == 'json':
return json.dumps(logs, indent=2, default=str)
elif format == 'csv':
import csv
import io
output = io.StringIO()
if logs:
writer = csv.DictWriter(output, fieldnames=logs[0].keys())
writer.writeheader()
writer.writerows(logs)
return output.getvalue()
else:
raise ValueError(f"Unsupported export format: {format}")
def set_log_level(self, level: LogLevel):
"""Set logging level"""
self.log_level = level
self.logger.setLevel(getattr(logging, level.value))
def start(self):
"""Start logger"""
self.running = True
self.processing_thread = threading.Thread(target=self._process_log_queue, daemon=True)
self.processing_thread.start()
self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger started')
def stop(self):
"""Stop logger"""
self.info(LogCategory.SYSTEM, 'logger', 'Virtual ISP Logger stopping')
self.running = False
# Wait for queue to be processed
self.log_queue.join()
# Wait for processing thread
if self.processing_thread:
self.processing_thread.join()
# Remove all subscribers
with self.lock:
self.subscribers.clear()
print("Virtual ISP Logger stopped")
# Global logger instance
_global_logger: Optional[VirtualISPLogger] = None
def get_logger() -> Optional[VirtualISPLogger]:
"""Get global logger instance"""
return _global_logger
def init_logger(config: Dict) -> VirtualISPLogger:
"""Initialize global logger"""
global _global_logger
_global_logger = VirtualISPLogger(config)
return _global_logger
def log_debug(category: LogCategory, module: str, message: str, **kwargs):
"""Global debug logging function"""
if _global_logger:
_global_logger.debug(category, module, message, **kwargs)
def log_info(category: LogCategory, module: str, message: str, **kwargs):
"""Global info logging function"""
if _global_logger:
_global_logger.info(category, module, message, **kwargs)
def log_warning(category: LogCategory, module: str, message: str, **kwargs):
"""Global warning logging function"""
if _global_logger:
_global_logger.warning(category, module, message, **kwargs)
def log_error(category: LogCategory, module: str, message: str, **kwargs):
"""Global error logging function"""
if _global_logger:
_global_logger.error(category, module, message, **kwargs)
def log_critical(category: LogCategory, module: str, message: str, **kwargs):
"""Global critical logging function"""
if _global_logger:
_global_logger.critical(category, module, message, **kwargs)