Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Generate CSV file with simple metrics for each model. | |
| Reads tactic_counts_summary.json and generates a CSV file containing | |
| F1, accuracy, precision, recall, and other metrics for each model. | |
| Usage: | |
| python generate_metrics_csv.py [--input INPUT_PATH] [--output OUTPUT_PATH] | |
| """ | |
| import argparse | |
| import json | |
| import csv | |
| from pathlib import Path | |
| from typing import Dict, List, Any | |
| from datetime import datetime | |
| import statistics | |
| class MetricsCSVGenerator: | |
| """Generates CSV file with simple metrics for each model""" | |
| def __init__(self, tactic_counts_file: Path): | |
| self.tactic_counts_file = tactic_counts_file | |
| self.tactic_data = [] | |
| self.load_tactic_counts() | |
| def load_tactic_counts(self): | |
| """Load tactic counts summary data""" | |
| if not self.tactic_counts_file.exists(): | |
| raise FileNotFoundError(f"Tactic counts file not found: {self.tactic_counts_file}") | |
| data = json.loads(self.tactic_counts_file.read_text(encoding='utf-8')) | |
| self.tactic_data = data.get('results', []) | |
| print(f"[INFO] Loaded {len(self.tactic_data)} tactic analysis results") | |
| def group_by_model(self) -> Dict[str, List[Dict]]: | |
| """Group tactic data by model""" | |
| models = {} | |
| for item in self.tactic_data: | |
| model = item['model'] | |
| if model not in models: | |
| models[model] = [] | |
| models[model].append(item) | |
| return models | |
| def calculate_model_metrics(self, model_data: List[Dict]) -> Dict[str, Any]: | |
| """Calculate comprehensive metrics for a single model""" | |
| if not model_data: | |
| return self._empty_metrics() | |
| # Aggregate by tactic for this model | |
| tactic_aggregates = {} | |
| for item in model_data: | |
| tactic = item['tactic'] | |
| if tactic not in tactic_aggregates: | |
| tactic_aggregates[tactic] = { | |
| 'total_files': 0, | |
| 'files_detected': 0, | |
| 'total_events': 0, | |
| 'true_positives': 0, | |
| 'false_positives': 0, | |
| 'false_negatives': 0 | |
| } | |
| tactic_aggregates[tactic]['total_files'] += 1 | |
| tactic_aggregates[tactic]['files_detected'] += item['tactic_detected'] | |
| tactic_aggregates[tactic]['total_events'] += item['total_abnormal_events_detected'] | |
| # For binary classification metrics, we consider: | |
| # - True Positive: tactic_detected = 1 (correctly detected) | |
| # - False Positive: tactic_detected = 0 but there were events (missed detection) | |
| # - False Negative: tactic_detected = 0 (missed detection) | |
| # - True Negative: tactic_detected = 0 and no events (correctly identified as normal) | |
| if item['tactic_detected'] == 1: | |
| tactic_aggregates[tactic]['true_positives'] += 1 | |
| else: | |
| if item['total_abnormal_events_detected'] > 0: | |
| tactic_aggregates[tactic]['false_negatives'] += 1 | |
| else: | |
| # This is actually a true negative (correctly identified as normal) | |
| pass | |
| # Calculate overall metrics | |
| total_files = sum(agg['total_files'] for agg in tactic_aggregates.values()) | |
| total_detected = sum(agg['files_detected'] for agg in tactic_aggregates.values()) | |
| total_events = sum(agg['total_events'] for agg in tactic_aggregates.values()) | |
| # Calculate detection rate (recall) | |
| detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0.0 | |
| # Calculate coverage | |
| total_tactics = len(tactic_aggregates) | |
| tactics_with_detection = sum(1 for agg in tactic_aggregates.values() if agg['files_detected'] > 0) | |
| coverage_percent = (tactics_with_detection / total_tactics * 100) if total_tactics > 0 else 0.0 | |
| # Calculate accuracy (overall correctness) | |
| accuracy = (total_detected / total_files) if total_files > 0 else 0.0 | |
| # Calculate precision, recall, and F1 for each tactic, then average | |
| precision_scores = [] | |
| recall_scores = [] | |
| f1_scores = [] | |
| for tactic, agg in tactic_aggregates.items(): | |
| tp = agg['true_positives'] | |
| fp = agg['false_positives'] | |
| fn = agg['false_negatives'] | |
| # Precision = TP / (TP + FP) | |
| # For our case, FP is when we detect but shouldn't have (hard to measure from this data) | |
| # So we'll use a simplified approach: precision = detection rate | |
| precision = (tp / agg['total_files']) if agg['total_files'] > 0 else 0.0 | |
| # Recall = TP / (TP + FN) = detection rate | |
| recall = (tp / agg['total_files']) if agg['total_files'] > 0 else 0.0 | |
| # F1 = 2 * (precision * recall) / (precision + recall) | |
| if precision + recall > 0: | |
| f1 = 2 * (precision * recall) / (precision + recall) | |
| else: | |
| f1 = 0.0 | |
| precision_scores.append(precision) | |
| recall_scores.append(recall) | |
| f1_scores.append(f1) | |
| # Calculate averages | |
| avg_precision = statistics.mean(precision_scores) if precision_scores else 0.0 | |
| avg_recall = statistics.mean(recall_scores) if recall_scores else 0.0 | |
| avg_f1 = statistics.mean(f1_scores) if f1_scores else 0.0 | |
| # Calculate effectiveness score (weighted combination) | |
| effectiveness_score = ( | |
| detection_rate * 0.4 + | |
| coverage_percent * 0.3 + | |
| avg_f1 * 100 * 0.3 | |
| ) | |
| # Grade the model | |
| if effectiveness_score >= 80: | |
| grade = 'EXCELLENT' | |
| elif effectiveness_score >= 60: | |
| grade = 'GOOD' | |
| elif effectiveness_score >= 40: | |
| grade = 'FAIR' | |
| elif effectiveness_score >= 20: | |
| grade = 'POOR' | |
| else: | |
| grade = 'CRITICAL' | |
| return { | |
| 'model_name': model_data[0]['model'] if model_data else 'unknown', | |
| 'total_files_analyzed': total_files, | |
| 'total_files_detected': total_detected, | |
| 'total_files_missed': total_files - total_detected, | |
| 'total_abnormal_events_detected': total_events, | |
| 'total_tactics_tested': total_tactics, | |
| 'tactics_with_detection': tactics_with_detection, | |
| 'tactics_with_zero_detection': total_tactics - tactics_with_detection, | |
| 'detection_rate_percent': detection_rate, | |
| 'coverage_percent': coverage_percent, | |
| 'accuracy': accuracy, | |
| 'precision': avg_precision, | |
| 'recall': avg_recall, | |
| 'f1_score': avg_f1, | |
| 'effectiveness_score': effectiveness_score, | |
| 'grade': grade | |
| } | |
| def _empty_metrics(self) -> Dict[str, Any]: | |
| """Return empty metrics structure""" | |
| return { | |
| 'model_name': 'unknown', | |
| 'total_files_analyzed': 0, | |
| 'total_files_detected': 0, | |
| 'total_files_missed': 0, | |
| 'total_abnormal_events_detected': 0, | |
| 'total_tactics_tested': 0, | |
| 'tactics_with_detection': 0, | |
| 'tactics_with_zero_detection': 0, | |
| 'detection_rate_percent': 0.0, | |
| 'coverage_percent': 0.0, | |
| 'accuracy': 0.0, | |
| 'precision': 0.0, | |
| 'recall': 0.0, | |
| 'f1_score': 0.0, | |
| 'effectiveness_score': 0.0, | |
| 'grade': 'CRITICAL' | |
| } | |
| def generate_csv(self, output_path: Path) -> bool: | |
| """Generate CSV file with metrics for all models""" | |
| print("\n" + "="*80) | |
| print("GENERATING METRICS CSV") | |
| print("="*80 + "\n") | |
| # Group data by model | |
| models_data = self.group_by_model() | |
| if not models_data: | |
| print("[WARNING] No model data found") | |
| return False | |
| print(f"Found {len(models_data)} models: {', '.join(models_data.keys())}") | |
| # Calculate metrics for each model | |
| all_metrics = [] | |
| for model_name, model_data in models_data.items(): | |
| print(f"Calculating metrics for {model_name} ({len(model_data)} files)...") | |
| metrics = self.calculate_model_metrics(model_data) | |
| all_metrics.append(metrics) | |
| # Define CSV columns | |
| fieldnames = [ | |
| 'model_name', | |
| 'total_files_analyzed', | |
| 'total_files_detected', | |
| 'total_files_missed', | |
| 'total_abnormal_events_detected', | |
| 'total_tactics_tested', | |
| 'tactics_with_detection', | |
| 'tactics_with_zero_detection', | |
| 'detection_rate_percent', | |
| 'coverage_percent', | |
| 'accuracy', | |
| 'precision', | |
| 'recall', | |
| 'f1_score', | |
| 'effectiveness_score', | |
| 'grade' | |
| ] | |
| # Write CSV file | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for metrics in all_metrics: | |
| # Convert all values to appropriate types for CSV | |
| row = {} | |
| for field in fieldnames: | |
| value = metrics.get(field, 0) | |
| if isinstance(value, float): | |
| row[field] = round(value, 4) | |
| else: | |
| row[field] = value | |
| writer.writerow(row) | |
| print(f"\nCSV file generated: {output_path}") | |
| print(f"Models included: {len(all_metrics)}") | |
| # Display summary | |
| print("\nSummary:") | |
| for metrics in all_metrics: | |
| print(f" {metrics['model_name']}: F1={metrics['f1_score']:.3f}, " | |
| f"Accuracy={metrics['accuracy']:.3f}, " | |
| f"Precision={metrics['precision']:.3f}, " | |
| f"Recall={metrics['recall']:.3f}, " | |
| f"Grade={metrics['grade']}") | |
| return True | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Generate CSV file with simple metrics for each model" | |
| ) | |
| parser.add_argument( | |
| "--input", | |
| default="full_pipeline_evaluation/results/tactic_counts_summary.json", | |
| help="Path to tactic_counts_summary.json" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="full_pipeline_evaluation/results/model_metrics.csv", | |
| help="Output file for CSV metrics" | |
| ) | |
| args = parser.parse_args() | |
| input_path = Path(args.input) | |
| output_path = Path(args.output) | |
| if not input_path.exists(): | |
| print(f"[ERROR] Input file not found: {input_path}") | |
| print("Run count_tactics.py first to generate tactic counts") | |
| return 1 | |
| # Generate CSV | |
| generator = MetricsCSVGenerator(input_path) | |
| success = generator.generate_csv(output_path) | |
| if not success: | |
| print("[ERROR] Failed to generate CSV file") | |
| return 1 | |
| print("\n" + "="*80) | |
| print("CSV GENERATION COMPLETE") | |
| print("="*80 + "\n") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) | |