import time import logging import asyncio from collections import defaultdict from typing import Dict, List, Set, Optional import datetime logger = logging.getLogger(__name__) class MetricsTracker: """ Tracks usage metrics across the API server. """ def __init__(self): # Total metrics since server start self.total_requests = { 'chat': 0, 'video': 0, 'search': 0, 'other': 0, } # Per-user metrics self.user_metrics = defaultdict(lambda: { 'requests': { 'chat': 0, 'video': 0, 'search': 0, 'other': 0, }, 'first_seen': time.time(), 'last_active': time.time(), 'role': 'anon' }) # Rate limiting buckets (per minute) self.rate_limits = { 'anon': { 'video': 30, 'search': 45, 'chat': 90, 'other': 45 }, 'normal': { 'video': 60, 'search': 90, 'chat': 180, 'other': 90 }, 'pro': { 'video': 120, 'search': 180, 'chat': 300, 'other': 180 }, 'admin': { 'video': 240, 'search': 360, 'chat': 450, 'other': 360 } } # Minute-based rate limiting buckets self.time_buckets = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) # Lock for thread safety self.lock = asyncio.Lock() # Track concurrent sessions by IP self.ip_sessions = defaultdict(set) # Server start time self.start_time = time.time() async def record_request(self, user_id: str, ip: str, request_type: str, role: str): """Record a request for metrics and rate limiting""" async with self.lock: # Update total metrics if request_type in self.total_requests: self.total_requests[request_type] += 1 else: self.total_requests['other'] += 1 # Update user metrics user_data = self.user_metrics[user_id] user_data['last_active'] = time.time() user_data['role'] = role if request_type in user_data['requests']: user_data['requests'][request_type] += 1 else: user_data['requests']['other'] += 1 # Update time bucket for rate limiting current_minute = int(time.time() / 60) self.time_buckets[user_id][current_minute][request_type] += 1 # Clean up old time buckets (keep only last 10 minutes) cutoff = current_minute - 10 for minute in list(self.time_buckets[user_id].keys()): if minute < cutoff: del self.time_buckets[user_id][minute] def register_session(self, user_id: str, ip: str): """Register a new session for an IP address""" self.ip_sessions[ip].add(user_id) def unregister_session(self, user_id: str, ip: str): """Unregister a session when it disconnects""" if user_id in self.ip_sessions[ip]: self.ip_sessions[ip].remove(user_id) if not self.ip_sessions[ip]: del self.ip_sessions[ip] def get_session_count_for_ip(self, ip: str) -> int: """Get the number of active sessions for an IP address""" return len(self.ip_sessions.get(ip, set())) async def is_rate_limited(self, user_id: str, request_type: str, role: str) -> bool: """Check if a user is currently rate limited for a request type""" async with self.lock: current_minute = int(time.time() / 60) prev_minute = current_minute - 1 # Count requests in current and previous minute current_count = self.time_buckets[user_id][current_minute][request_type] prev_count = self.time_buckets[user_id][prev_minute][request_type] # Calculate requests per minute rate (weighted average) # Weight current minute more as it's more recent rate = (current_count * 0.7) + (prev_count * 0.3) # Get rate limit based on user role limit = self.rate_limits.get(role, self.rate_limits['anon']).get( request_type, self.rate_limits['anon']['other']) # Check if rate exceeds limit return rate >= limit def get_metrics(self) -> Dict: """Get a snapshot of current metrics""" active_users = { 'total': len(self.user_metrics), 'anon': 0, 'normal': 0, 'pro': 0, 'admin': 0, } # Count active users in the last 5 minutes active_cutoff = time.time() - (5 * 60) for user_data in self.user_metrics.values(): if user_data['last_active'] >= active_cutoff: active_users[user_data['role']] += 1 return { 'uptime_seconds': int(time.time() - self.start_time), 'total_requests': dict(self.total_requests), 'active_users': active_users, 'active_ips': len(self.ip_sessions), 'timestamp': datetime.datetime.now().isoformat() } def get_detailed_metrics(self) -> Dict: """Get detailed metrics including per-user data""" metrics = self.get_metrics() # Add anonymized user metrics user_list = [] for user_id, data in self.user_metrics.items(): # Skip users inactive for more than 1 hour if time.time() - data['last_active'] > 3600: continue user_list.append({ 'id': user_id[:8] + '...', # Anonymize ID 'role': data['role'], 'requests': data['requests'], 'active_ago': int(time.time() - data['last_active']), 'session_duration': int(time.time() - data['first_seen']) }) metrics['users'] = user_list return metrics