Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import time | |
import logging | |
import asyncio | |
from collections import defaultdict | |
from typing import Dict, List, Set, Optional | |
import datetime | |
logger = logging.getLogger(__name__) | |
class MetricsTracker: | |
""" | |
Tracks usage metrics across the API server. | |
""" | |
def __init__(self): | |
# Total metrics since server start | |
self.total_requests = { | |
'chat': 0, | |
'video': 0, | |
'search': 0, | |
'other': 0, | |
} | |
# Per-user metrics | |
self.user_metrics = defaultdict(lambda: { | |
'requests': { | |
'chat': 0, | |
'video': 0, | |
'search': 0, | |
'other': 0, | |
}, | |
'first_seen': time.time(), | |
'last_active': time.time(), | |
'role': 'anon' | |
}) | |
# Rate limiting buckets (per minute) | |
self.rate_limits = { | |
'anon': { | |
'video': 30, | |
'search': 45, | |
'chat': 90, | |
'other': 45 | |
}, | |
'normal': { | |
'video': 60, | |
'search': 90, | |
'chat': 180, | |
'other': 90 | |
}, | |
'pro': { | |
'video': 120, | |
'search': 180, | |
'chat': 300, | |
'other': 180 | |
}, | |
'admin': { | |
'video': 240, | |
'search': 360, | |
'chat': 450, | |
'other': 360 | |
} | |
} | |
# Minute-based rate limiting buckets | |
self.time_buckets = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) | |
# Lock for thread safety | |
self.lock = asyncio.Lock() | |
# Track concurrent sessions by IP | |
self.ip_sessions = defaultdict(set) | |
# Server start time | |
self.start_time = time.time() | |
async def record_request(self, user_id: str, ip: str, request_type: str, role: str): | |
"""Record a request for metrics and rate limiting""" | |
async with self.lock: | |
# Update total metrics | |
if request_type in self.total_requests: | |
self.total_requests[request_type] += 1 | |
else: | |
self.total_requests['other'] += 1 | |
# Update user metrics | |
user_data = self.user_metrics[user_id] | |
user_data['last_active'] = time.time() | |
user_data['role'] = role | |
if request_type in user_data['requests']: | |
user_data['requests'][request_type] += 1 | |
else: | |
user_data['requests']['other'] += 1 | |
# Update time bucket for rate limiting | |
current_minute = int(time.time() / 60) | |
self.time_buckets[user_id][current_minute][request_type] += 1 | |
# Clean up old time buckets (keep only last 10 minutes) | |
cutoff = current_minute - 10 | |
for minute in list(self.time_buckets[user_id].keys()): | |
if minute < cutoff: | |
del self.time_buckets[user_id][minute] | |
def register_session(self, user_id: str, ip: str): | |
"""Register a new session for an IP address""" | |
self.ip_sessions[ip].add(user_id) | |
def unregister_session(self, user_id: str, ip: str): | |
"""Unregister a session when it disconnects""" | |
if user_id in self.ip_sessions[ip]: | |
self.ip_sessions[ip].remove(user_id) | |
if not self.ip_sessions[ip]: | |
del self.ip_sessions[ip] | |
def get_session_count_for_ip(self, ip: str) -> int: | |
"""Get the number of active sessions for an IP address""" | |
return len(self.ip_sessions.get(ip, set())) | |
async def is_rate_limited(self, user_id: str, request_type: str, role: str) -> bool: | |
"""Check if a user is currently rate limited for a request type""" | |
async with self.lock: | |
current_minute = int(time.time() / 60) | |
prev_minute = current_minute - 1 | |
# Count requests in current and previous minute | |
current_count = self.time_buckets[user_id][current_minute][request_type] | |
prev_count = self.time_buckets[user_id][prev_minute][request_type] | |
# Calculate requests per minute rate (weighted average) | |
# Weight current minute more as it's more recent | |
rate = (current_count * 0.7) + (prev_count * 0.3) | |
# Get rate limit based on user role | |
limit = self.rate_limits.get(role, self.rate_limits['anon']).get( | |
request_type, self.rate_limits['anon']['other']) | |
# Check if rate exceeds limit | |
return rate >= limit | |
def get_metrics(self) -> Dict: | |
"""Get a snapshot of current metrics""" | |
active_users = { | |
'total': len(self.user_metrics), | |
'anon': 0, | |
'normal': 0, | |
'pro': 0, | |
'admin': 0, | |
} | |
# Count active users in the last 5 minutes | |
active_cutoff = time.time() - (5 * 60) | |
for user_data in self.user_metrics.values(): | |
if user_data['last_active'] >= active_cutoff: | |
active_users[user_data['role']] += 1 | |
return { | |
'uptime_seconds': int(time.time() - self.start_time), | |
'total_requests': dict(self.total_requests), | |
'active_users': active_users, | |
'active_ips': len(self.ip_sessions), | |
'timestamp': datetime.datetime.now().isoformat() | |
} | |
def get_detailed_metrics(self) -> Dict: | |
"""Get detailed metrics including per-user data""" | |
metrics = self.get_metrics() | |
# Add anonymized user metrics | |
user_list = [] | |
for user_id, data in self.user_metrics.items(): | |
# Skip users inactive for more than 1 hour | |
if time.time() - data['last_active'] > 3600: | |
continue | |
user_list.append({ | |
'id': user_id[:8] + '...', # Anonymize ID | |
'role': data['role'], | |
'requests': data['requests'], | |
'active_ago': int(time.time() - data['last_active']), | |
'session_duration': int(time.time() - data['first_seen']) | |
}) | |
metrics['users'] = user_list | |
return metrics |