Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Analytics Analysis Tool for Neural OS Multi-GPU System | |
| This script analyzes the structured analytics logs to generate reports and insights. | |
| Usage: python analyze_analytics.py [--since HOURS] [--type TYPE] | |
| """ | |
| import json | |
| import argparse | |
| import glob | |
| import time | |
| from collections import defaultdict, Counter | |
| from datetime import datetime, timedelta | |
| import statistics | |
| class AnalyticsAnalyzer: | |
| def __init__(self, since_hours=24): | |
| self.since_timestamp = time.time() - (since_hours * 3600) | |
| self.data = { | |
| 'gpu_metrics': [], | |
| 'connection_events': [], | |
| 'queue_metrics': [], | |
| 'ip_stats': [] | |
| } | |
| self.load_data() | |
| def load_data(self): | |
| """Load all analytics data files""" | |
| file_types = { | |
| 'gpu_metrics': 'gpu_metrics_*.jsonl', | |
| 'connection_events': 'connection_events_*.jsonl', | |
| 'queue_metrics': 'queue_metrics_*.jsonl', | |
| 'ip_stats': 'ip_stats_*.jsonl' | |
| } | |
| for data_type, pattern in file_types.items(): | |
| files = glob.glob(pattern) | |
| for file_path in files: | |
| try: | |
| with open(file_path, 'r') as f: | |
| for line in f: | |
| try: | |
| record = json.loads(line.strip()) | |
| if record.get('type') != 'metadata' and record.get('timestamp', 0) >= self.since_timestamp: | |
| self.data[data_type].append(record) | |
| except json.JSONDecodeError: | |
| continue | |
| except FileNotFoundError: | |
| continue | |
| print(f"Loaded data from the last {(time.time() - self.since_timestamp) / 3600:.1f} hours:") | |
| for data_type, records in self.data.items(): | |
| print(f" {data_type}: {len(records)} records") | |
| print() | |
| def analyze_gpu_utilization(self): | |
| """Analyze GPU utilization patterns""" | |
| print("🖥️ GPU UTILIZATION ANALYSIS") | |
| print("=" * 40) | |
| gpu_records = [r for r in self.data['gpu_metrics'] if r.get('type') == 'gpu_status'] | |
| if not gpu_records: | |
| print("No GPU utilization data found.") | |
| return | |
| utilizations = [r['utilization_percent'] for r in gpu_records] | |
| total_gpus = gpu_records[-1].get('total_gpus', 0) | |
| print(f"Total GPUs: {total_gpus}") | |
| print(f"Average utilization: {statistics.mean(utilizations):.1f}%") | |
| print(f"Peak utilization: {max(utilizations):.1f}%") | |
| print(f"Minimum utilization: {min(utilizations):.1f}%") | |
| print(f"Utilization std dev: {statistics.stdev(utilizations) if len(utilizations) > 1 else 0:.1f}%") | |
| # Utilization distribution | |
| high_util = sum(1 for u in utilizations if u >= 80) | |
| med_util = sum(1 for u in utilizations if 40 <= u < 80) | |
| low_util = sum(1 for u in utilizations if u < 40) | |
| print(f"\nUtilization distribution:") | |
| print(f" High (≥80%): {high_util} samples ({high_util/len(utilizations)*100:.1f}%)") | |
| print(f" Medium (40-79%): {med_util} samples ({med_util/len(utilizations)*100:.1f}%)") | |
| print(f" Low (<40%): {low_util} samples ({low_util/len(utilizations)*100:.1f}%)") | |
| print() | |
| def analyze_connections(self): | |
| """Analyze connection patterns""" | |
| print("🔗 CONNECTION ANALYSIS") | |
| print("=" * 40) | |
| opens = [r for r in self.data['connection_events'] if r.get('type') == 'connection_open'] | |
| closes = [r for r in self.data['connection_events'] if r.get('type') == 'connection_close'] | |
| if not opens and not closes: | |
| print("No connection data found.") | |
| return | |
| print(f"Total connections opened: {len(opens)}") | |
| print(f"Total connections closed: {len(closes)}") | |
| if closes: | |
| durations = [r['duration'] for r in closes] | |
| interactions = [r['interactions'] for r in closes] | |
| reasons = [r['reason'] for r in closes] | |
| print(f"\nSession durations:") | |
| print(f" Average: {statistics.mean(durations):.1f}s") | |
| print(f" Median: {statistics.median(durations):.1f}s") | |
| print(f" Max: {max(durations):.1f}s") | |
| print(f" Min: {min(durations):.1f}s") | |
| print(f"\nInteractions per session:") | |
| print(f" Average: {statistics.mean(interactions):.1f}") | |
| print(f" Median: {statistics.median(interactions):.1f}") | |
| print(f" Max: {max(interactions)}") | |
| print(f"\nSession end reasons:") | |
| reason_counts = Counter(reasons) | |
| for reason, count in reason_counts.most_common(): | |
| print(f" {reason}: {count} ({count/len(closes)*100:.1f}%)") | |
| print() | |
| def analyze_queue_performance(self): | |
| """Analyze queue performance""" | |
| print("📝 QUEUE PERFORMANCE ANALYSIS") | |
| print("=" * 40) | |
| bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass'] | |
| waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait'] | |
| statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status'] | |
| limit_applications = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_limits_applied'] | |
| total_users = len(bypasses) + len(waits) | |
| if total_users == 0: | |
| print("No queue data found.") | |
| return | |
| print(f"Total users processed: {total_users}") | |
| print(f"Users bypassed queue: {len(bypasses)} ({len(bypasses)/total_users*100:.1f}%)") | |
| print(f"Users waited in queue: {len(waits)} ({len(waits)/total_users*100:.1f}%)") | |
| if waits: | |
| wait_times = [r['wait_time'] for r in waits] | |
| positions = [r['queue_position'] for r in waits] | |
| print(f"\nWait time statistics:") | |
| print(f" Average wait: {statistics.mean(wait_times):.1f}s") | |
| print(f" Median wait: {statistics.median(wait_times):.1f}s") | |
| print(f" Max wait: {max(wait_times):.1f}s") | |
| print(f" Average queue position: {statistics.mean(positions):.1f}") | |
| if statuses: | |
| queue_sizes = [r['queue_size'] for r in statuses] | |
| # Handle both old 'estimated_wait' and new 'maximum_wait' fields for backwards compatibility | |
| maximum_waits = [] | |
| for r in statuses: | |
| if r['queue_size'] > 0: | |
| if 'maximum_wait' in r: | |
| maximum_waits.append(r['maximum_wait']) | |
| elif 'estimated_wait' in r: | |
| maximum_waits.append(r['estimated_wait']) | |
| print(f"\nQueue size statistics:") | |
| print(f" Average queue size: {statistics.mean(queue_sizes):.1f}") | |
| print(f" Max queue size: {max(queue_sizes)}") | |
| if maximum_waits: | |
| print(f" Average maximum wait: {statistics.mean(maximum_waits):.1f}s") | |
| print(f" Peak maximum wait: {max(maximum_waits):.1f}s") | |
| if limit_applications: | |
| total_affected = sum(r['affected_sessions'] for r in limit_applications) | |
| print(f"\nQueue limit applications:") | |
| print(f" Times limits applied to existing sessions: {len(limit_applications)}") | |
| print(f" Total sessions affected: {total_affected}") | |
| print(f" Average sessions affected per application: {total_affected/len(limit_applications):.1f}") | |
| print() | |
| def analyze_ip_usage(self): | |
| """Analyze IP address usage patterns""" | |
| print("🌍 IP USAGE ANALYSIS") | |
| print("=" * 40) | |
| ip_records = self.data['ip_stats'] | |
| if not ip_records: | |
| print("No IP usage data found.") | |
| return | |
| # Get latest connection counts per IP | |
| latest_ip_data = {} | |
| for record in ip_records: | |
| if record.get('type') == 'ip_update': | |
| ip = record['ip_address'] | |
| latest_ip_data[ip] = record['connection_count'] | |
| if not latest_ip_data: | |
| print("No IP connection data found.") | |
| return | |
| total_connections = sum(latest_ip_data.values()) | |
| unique_ips = len(latest_ip_data) | |
| print(f"Total unique IP addresses: {unique_ips}") | |
| print(f"Total connections: {total_connections}") | |
| print(f"Average connections per IP: {total_connections/unique_ips:.1f}") | |
| print(f"\nTop IP addresses by connection count:") | |
| sorted_ips = sorted(latest_ip_data.items(), key=lambda x: x[1], reverse=True) | |
| for i, (ip, count) in enumerate(sorted_ips[:10], 1): | |
| percentage = count / total_connections * 100 | |
| print(f" {i:2d}. {ip}: {count} connections ({percentage:.1f}%)") | |
| print() | |
| def generate_summary_report(self): | |
| """Generate a comprehensive summary report""" | |
| print("📊 SYSTEM SUMMARY REPORT") | |
| print("=" * 50) | |
| # Time range | |
| start_time = datetime.fromtimestamp(self.since_timestamp) | |
| end_time = datetime.now() | |
| duration_hours = (end_time.timestamp() - self.since_timestamp) / 3600 | |
| print(f"Report period: {start_time.strftime('%Y-%m-%d %H:%M:%S')} to {end_time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Duration: {duration_hours:.1f} hours") | |
| print() | |
| self.analyze_gpu_utilization() | |
| self.analyze_connections() | |
| self.analyze_queue_performance() | |
| self.analyze_ip_usage() | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Analyze Neural OS analytics data') | |
| parser.add_argument('--since', type=float, default=24, | |
| help='Analyze data from the last N hours (default: 24)') | |
| parser.add_argument('--type', choices=['gpu', 'connections', 'queue', 'ip', 'summary'], | |
| default='summary', help='Type of analysis to perform') | |
| args = parser.parse_args() | |
| analyzer = AnalyticsAnalyzer(since_hours=args.since) | |
| if args.type == 'gpu': | |
| analyzer.analyze_gpu_utilization() | |
| elif args.type == 'connections': | |
| analyzer.analyze_connections() | |
| elif args.type == 'queue': | |
| analyzer.analyze_queue_performance() | |
| elif args.type == 'ip': | |
| analyzer.analyze_ip_usage() | |
| else: | |
| analyzer.generate_summary_report() | |
| if __name__ == '__main__': | |
| main() |