Spaces:
Running
Running
# src/evaluation.py | |
import pandas as pd | |
import numpy as np | |
from sacrebleu.metrics import BLEU, CHRF | |
from rouge_score import rouge_scorer | |
import Levenshtein | |
from collections import defaultdict | |
from transformers.models.whisper.english_normalizer import BasicTextNormalizer | |
from typing import Dict, List, Tuple, Optional | |
from scipy import stats | |
import warnings | |
from config import ( | |
ALL_UG40_LANGUAGES, | |
GOOGLE_SUPPORTED_LANGUAGES, | |
METRICS_CONFIG, | |
EVALUATION_TRACKS, | |
MODEL_CATEGORIES, | |
) | |
from src.utils import get_all_language_pairs | |
warnings.filterwarnings("ignore", category=RuntimeWarning) | |
def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]: | |
"""Calculate all metrics for a single sentence pair.""" | |
# Handle empty predictions | |
if not prediction or not isinstance(prediction, str): | |
prediction = "" | |
if not reference or not isinstance(reference, str): | |
reference = "" | |
# Normalize texts | |
normalizer = BasicTextNormalizer() | |
pred_norm = normalizer(prediction) | |
ref_norm = normalizer(reference) | |
metrics = {} | |
# BLEU score (0-100 scale) | |
try: | |
bleu = BLEU(effective_order=True) | |
metrics["bleu"] = bleu.sentence_score(pred_norm, [ref_norm]).score | |
except: | |
metrics["bleu"] = 0.0 | |
# ChrF score (normalize to 0-1) | |
try: | |
chrf = CHRF() | |
metrics["chrf"] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0 | |
except: | |
metrics["chrf"] = 0.0 | |
# Character Error Rate (CER) | |
try: | |
if len(ref_norm) > 0: | |
metrics["cer"] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm) | |
else: | |
metrics["cer"] = 1.0 if len(pred_norm) > 0 else 0.0 | |
except: | |
metrics["cer"] = 1.0 | |
# Word Error Rate (WER) | |
try: | |
ref_words = ref_norm.split() | |
pred_words = pred_norm.split() | |
if len(ref_words) > 0: | |
metrics["wer"] = Levenshtein.distance(ref_words, pred_words) / len(ref_words) | |
else: | |
metrics["wer"] = 1.0 if len(pred_words) > 0 else 0.0 | |
except: | |
metrics["wer"] = 1.0 | |
# ROUGE scores | |
try: | |
scorer = rouge_scorer.RougeScorer( | |
["rouge1", "rougeL"], use_stemmer=True | |
) | |
rouge_scores = scorer.score(ref_norm, pred_norm) | |
metrics["rouge1"] = rouge_scores["rouge1"].fmeasure | |
metrics["rougeL"] = rouge_scores["rougeL"].fmeasure | |
except: | |
metrics["rouge1"] = 0.0 | |
metrics["rougeL"] = 0.0 | |
# Quality score (composite metric) | |
try: | |
quality_components = [ | |
metrics["bleu"] / 100.0, # Normalize BLEU to 0-1 | |
metrics["chrf"], # Already 0-1 | |
1.0 - min(metrics["cer"], 1.0), # Invert error rates | |
1.0 - min(metrics["wer"], 1.0), | |
metrics["rouge1"], | |
metrics["rougeL"], | |
] | |
metrics["quality_score"] = np.mean(quality_components) | |
except: | |
metrics["quality_score"] = 0.0 | |
return metrics | |
def calculate_confidence_interval(values: List[float], confidence_level: float = 0.95) -> Tuple[float, float, float]: | |
"""Calculate mean and confidence interval for a list of values.""" | |
if not values or len(values) == 0: | |
return 0.0, 0.0, 0.0 | |
values = np.array(values) | |
values = values[~np.isnan(values)] # Remove NaN values | |
if len(values) == 0: | |
return 0.0, 0.0, 0.0 | |
mean_val = float(np.mean(values)) | |
if len(values) < METRICS_CONFIG["min_samples_for_ci"]: | |
# Not enough samples for meaningful CI | |
return mean_val, mean_val, mean_val | |
try: | |
# Bootstrap confidence interval | |
n_bootstrap = min(METRICS_CONFIG["bootstrap_samples"], 1000) | |
bootstrap_means = [] | |
for _ in range(n_bootstrap): | |
bootstrap_sample = np.random.choice(values, size=len(values), replace=True) | |
bootstrap_means.append(np.mean(bootstrap_sample)) | |
alpha = 1 - confidence_level | |
ci_lower = np.percentile(bootstrap_means, 100 * alpha / 2) | |
ci_upper = np.percentile(bootstrap_means, 100 * (1 - alpha / 2)) | |
return mean_val, float(ci_lower), float(ci_upper) | |
except Exception: | |
# Fallback to t-distribution CI | |
try: | |
std_err = stats.sem(values) | |
h = std_err * stats.t.ppf((1 + confidence_level) / 2, len(values) - 1) | |
return mean_val, mean_val - h, mean_val + h | |
except: | |
return mean_val, mean_val, mean_val | |
def evaluate_predictions_by_track( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str | |
) -> Dict: | |
"""Evaluate predictions for a specific track.""" | |
print(f"🔄 Evaluating for {track} track...") | |
track_config = EVALUATION_TRACKS[track] | |
track_languages = track_config["languages"] | |
# Filter test set and predictions to track languages | |
track_test_set = test_set[ | |
(test_set["source_language"].isin(track_languages)) & | |
(test_set["target_language"].isin(track_languages)) | |
].copy() | |
# Merge predictions with test set | |
merged = track_test_set.merge( | |
predictions, on="sample_id", how="inner", suffixes=("", "_pred") | |
) | |
if len(merged) == 0: | |
return { | |
"error": f"No matching samples found for {track} track", | |
"evaluated_samples": 0, | |
"track": track, | |
} | |
print(f"📊 Evaluating {len(merged)} samples for {track} track...") | |
# Calculate metrics for each sample | |
sample_metrics = [] | |
for idx, row in merged.iterrows(): | |
metrics = calculate_sentence_metrics(row["target_text"], row["prediction"]) | |
metrics["sample_id"] = row["sample_id"] | |
metrics["source_language"] = row["source_language"] | |
metrics["target_language"] = row["target_language"] | |
sample_metrics.append(metrics) | |
sample_df = pd.DataFrame(sample_metrics) | |
# Aggregate by language pairs | |
pair_metrics = {} | |
overall_metrics = defaultdict(list) | |
# Calculate metrics for each language pair | |
for src_lang in track_languages: | |
for tgt_lang in track_languages: | |
if src_lang == tgt_lang: | |
continue | |
pair_data = sample_df[ | |
(sample_df["source_language"] == src_lang) & | |
(sample_df["target_language"] == tgt_lang) | |
] | |
if len(pair_data) >= MIN_SAMPLES_PER_PAIR: | |
pair_key = f"{src_lang}_to_{tgt_lang}" | |
pair_metrics[pair_key] = {} | |
# Calculate statistics for each metric | |
for metric in METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"]: | |
if metric in pair_data.columns: | |
values = pair_data[metric].replace([np.inf, -np.inf], np.nan).dropna() | |
if len(values) > 0: | |
mean_val, ci_lower, ci_upper = calculate_confidence_interval(values.tolist()) | |
pair_metrics[pair_key][metric] = { | |
"mean": mean_val, | |
"ci_lower": ci_lower, | |
"ci_upper": ci_upper, | |
"std": float(np.std(values)) if len(values) > 1 else 0.0, | |
"count": len(values) | |
} | |
# Add to overall metrics for track-level statistics | |
overall_metrics[metric].append(mean_val) | |
pair_metrics[pair_key]["sample_count"] = len(pair_data) | |
# Calculate track-level aggregated statistics | |
track_averages = {} | |
track_confidence = {} | |
for metric in overall_metrics: | |
if overall_metrics[metric]: | |
mean_val, ci_lower, ci_upper = calculate_confidence_interval(overall_metrics[metric]) | |
track_averages[metric] = mean_val | |
track_confidence[metric] = { | |
"mean": mean_val, | |
"ci_lower": ci_lower, | |
"ci_upper": ci_upper, | |
"std": float(np.std(overall_metrics[metric])) if len(overall_metrics[metric]) > 1 else 0.0 | |
} | |
# Generate evaluation summary | |
summary = { | |
"track": track, | |
"track_name": track_config["name"], | |
"total_samples": len(sample_df), | |
"language_pairs_evaluated": len([k for k in pair_metrics if pair_metrics[k].get("sample_count", 0) > 0]), | |
"languages_covered": len(set(sample_df["source_language"]) | set(sample_df["target_language"])), | |
"min_samples_per_pair": track_config["min_samples_per_pair"], | |
} | |
return { | |
"pair_metrics": pair_metrics, | |
"track_averages": track_averages, | |
"track_confidence": track_confidence, | |
"summary": summary, | |
"evaluated_samples": len(sample_df), | |
"track": track, | |
"error": None, | |
} | |
def evaluate_predictions( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, model_category: str = "community" | |
) -> Dict: | |
"""Comprehensive evaluation across all tracks.""" | |
print("🔬 Starting evaluation...") | |
# Validate model category | |
if model_category not in MODEL_CATEGORIES: | |
model_category = "community" | |
evaluation_results = { | |
"model_category": model_category, | |
"category_info": MODEL_CATEGORIES[model_category], | |
"tracks": {}, | |
"metadata": { | |
"evaluation_timestamp": pd.Timestamp.now().isoformat(), | |
"total_samples_submitted": len(predictions), | |
"total_samples_available": len(test_set), | |
}, | |
} | |
# Evaluate each track | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_result = evaluate_predictions_by_track(predictions, test_set, track_name) | |
evaluation_results["tracks"][track_name] = track_result | |
return evaluation_results | |
def generate_evaluation_report(results: Dict, model_name: str = "") -> str: | |
"""Generate a comprehensive evaluation report.""" | |
if any(track_data.get("error") for track_data in results.get("tracks", {}).values()): | |
return f"❌ **Evaluation Error**: Unable to complete evaluation" | |
report = [] | |
# Header | |
report.append(f"### 🔬 Evaluation Report: {model_name or 'Model'}") | |
report.append("") | |
# Model categorization | |
category_info = results.get("category_info", {}) | |
report.append(f"**Model Category**: {category_info.get('name', 'Unknown')}") | |
report.append("") | |
# Track-by-track analysis | |
for track_name, track_data in results.get("tracks", {}).items(): | |
if track_data.get("error"): | |
continue | |
track_config = EVALUATION_TRACKS[track_name] | |
summary = track_data.get("summary", {}) | |
track_averages = track_data.get("track_averages", {}) | |
track_confidence = track_data.get("track_confidence", {}) | |
report.append(f"#### {track_config['name']}") | |
report.append("") | |
# Summary statistics | |
report.append("**Summary Statistics:**") | |
report.append(f"- **Samples Evaluated**: {summary.get('total_samples', 0):,}") | |
report.append(f"- **Language Pairs**: {summary.get('language_pairs_evaluated', 0)}") | |
report.append(f"- **Languages Covered**: {summary.get('languages_covered', 0)}") | |
report.append("") | |
# Primary metrics with confidence intervals | |
report.append("**Primary Metrics (95% Confidence Intervals):**") | |
for metric in METRICS_CONFIG["primary_metrics"]: | |
if metric in track_confidence: | |
stats = track_confidence[metric] | |
mean_val = stats["mean"] | |
ci_lower = stats["ci_lower"] | |
ci_upper = stats["ci_upper"] | |
report.append(f"- **{metric.upper()}**: {mean_val:.4f} [{ci_lower:.4f}, {ci_upper:.4f}]") | |
report.append("") | |
return "\n".join(report) | |
# Backwards compatibility | |
MIN_SAMPLES_PER_PAIR = 10 |