Codette-Reasoning / evaluation /failure_analyzer.py
Raiff1982's picture
Upload 120 files
ed1b365 verified
"""
Failure Analyzer - examines evaluation logs to find patterns in
low-scoring responses, cluster failures by topic, and recommend
dataset improvements.
"""
from __future__ import annotations
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
_THIS_DIR = Path(__file__).resolve().parent
_PROJECT_ROOT = _THIS_DIR.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Keyword extraction (lightweight, no external deps)
# ---------------------------------------------------------------------------
_STOP_WORDS: Set[str] = {
"the", "a", "an", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "shall",
"should", "may", "might", "must", "can", "could", "to", "of", "in",
"for", "on", "with", "at", "by", "from", "as", "into", "through",
"during", "before", "after", "above", "below", "between", "out",
"off", "over", "under", "again", "further", "then", "once", "here",
"there", "when", "where", "why", "how", "all", "both", "each",
"few", "more", "most", "other", "some", "such", "no", "nor", "not",
"only", "own", "same", "so", "than", "too", "very", "just", "don",
"now", "and", "but", "or", "if", "while", "that", "this", "what",
"which", "who", "whom", "it", "its", "they", "them", "their",
"he", "she", "him", "her", "his", "we", "us", "our", "you", "your",
"i", "me", "my", "about", "up",
}
def _extract_keywords(text: str, top_n: int = 8) -> List[str]:
"""Extract the most frequent meaningful words from text."""
words = re.findall(r"[a-z]{3,}", text.lower())
filtered = [w for w in words if w not in _STOP_WORDS]
counts = Counter(filtered)
return [w for w, _ in counts.most_common(top_n)]
def _jaccard(set_a: Set[str], set_b: Set[str]) -> float:
"""Jaccard similarity between two sets."""
if not set_a and not set_b:
return 1.0
union = set_a | set_b
if not union:
return 0.0
return len(set_a & set_b) / len(union)
# ---------------------------------------------------------------------------
# FailureAnalyzer
# ---------------------------------------------------------------------------
class FailureAnalyzer:
"""Analyze evaluation results to identify failure patterns."""
# Score thresholds
FAILURE_THRESHOLD = 0.4 # scores below this = failure
WEAK_THRESHOLD = 0.55 # scores below this = weak
def __init__(
self,
failure_threshold: float = 0.4,
weak_threshold: float = 0.55,
):
self.failure_threshold = failure_threshold
self.weak_threshold = weak_threshold
# -- loading -----------------------------------------------------------
@staticmethod
def load_results(filepath: str) -> Dict[str, Any]:
"""Load benchmark results JSON produced by BenchmarkRunner."""
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
# -- analysis ----------------------------------------------------------
def find_failures(
self,
results: Dict[str, Any],
dimension: str = "overall",
) -> List[Dict[str, Any]]:
"""Return entries whose *dimension* score is below failure threshold."""
failures = []
for entry in results.get("all_scores", []):
score = entry.get("scores", {}).get(dimension)
if score is not None and score < self.failure_threshold:
failures.append({
"prompt": entry["prompt"],
"score": score,
"all_scores": entry["scores"],
})
failures.sort(key=lambda x: x["score"])
return failures
def find_weak_areas(
self,
results: Dict[str, Any],
) -> Dict[str, float]:
"""Identify which scoring dimensions are weakest across all prompts.
Returns dict of dimension -> average score, sorted ascending.
"""
dimension_totals: Dict[str, float] = defaultdict(float)
dimension_counts: Dict[str, int] = defaultdict(int)
for entry in results.get("all_scores", []):
for k, v in entry.get("scores", {}).items():
if isinstance(v, float) and k not in ("word_count", "sentence_count"):
dimension_totals[k] += v
dimension_counts[k] += 1
averages = {}
for k in dimension_totals:
if dimension_counts[k] > 0:
averages[k] = round(dimension_totals[k] / dimension_counts[k], 4)
return dict(sorted(averages.items(), key=lambda x: x[1]))
def failure_rate_by_category(
self,
results: Dict[str, Any],
dimension: str = "overall",
) -> Dict[str, Dict[str, Any]]:
"""Calculate failure rates per category."""
rates: Dict[str, Dict[str, Any]] = {}
for cat, data in results.get("categories", {}).items():
details = data.get("details", [])
total = len(details)
if total == 0:
continue
failures = sum(
1 for d in details
if d.get("scores", {}).get(dimension, 1.0) < self.failure_threshold
)
weak = sum(
1 for d in details
if self.failure_threshold <= d.get("scores", {}).get(dimension, 1.0) < self.weak_threshold
)
rates[cat] = {
"total": total,
"failures": failures,
"weak": weak,
"failure_rate": round(failures / total, 4),
"weak_rate": round(weak / total, 4),
"avg_score": data.get("average_scores", {}).get(dimension, 0),
}
return dict(sorted(rates.items(), key=lambda x: -x[1]["failure_rate"]))
def cluster_failures_by_topic(
self,
failures: List[Dict[str, Any]],
similarity_threshold: float = 0.25,
) -> List[Dict[str, Any]]:
"""Cluster failure prompts by keyword overlap.
Uses a simple greedy clustering: each prompt is assigned to the first
cluster whose centroid keywords have Jaccard similarity above threshold.
"""
clusters: List[Dict[str, Any]] = []
for failure in failures:
prompt = failure["prompt"]
keywords = set(_extract_keywords(prompt))
matched = False
for cluster in clusters:
if _jaccard(keywords, cluster["keywords"]) >= similarity_threshold:
cluster["prompts"].append(failure)
cluster["keywords"] |= keywords
matched = True
break
if not matched:
clusters.append({
"keywords": keywords,
"prompts": [failure],
})
# Format output
result = []
for i, c in enumerate(clusters):
avg_score = sum(p["score"] for p in c["prompts"]) / len(c["prompts"])
result.append({
"cluster_id": i,
"topic_keywords": sorted(c["keywords"])[:10],
"num_failures": len(c["prompts"]),
"avg_score": round(avg_score, 4),
"sample_prompts": [p["prompt"] for p in c["prompts"][:5]],
})
result.sort(key=lambda x: -x["num_failures"])
return result
def identify_weakest_dimensions(
self,
results: Dict[str, Any],
top_n: int = 3,
) -> List[Tuple[str, float]]:
"""Return the top_n weakest scoring dimensions."""
averages = self.find_weak_areas(results)
items = [(k, v) for k, v in averages.items() if k != "overall"]
return items[:top_n]
# -- recommendations ---------------------------------------------------
def generate_recommendations(
self,
results: Dict[str, Any],
) -> List[str]:
"""Generate actionable recommendations for dataset improvement."""
recommendations: List[str] = []
# Weakest dimensions
weakest = self.identify_weakest_dimensions(results, top_n=3)
for dim, score in weakest:
if score < self.failure_threshold:
recommendations.append(
f"CRITICAL: Dimension '{dim}' averages {score:.3f} (below failure threshold). "
f"Add training examples that emphasise {dim} explicitly."
)
elif score < self.weak_threshold:
recommendations.append(
f"IMPROVE: Dimension '{dim}' averages {score:.3f} (weak). "
f"Augment dataset with responses demonstrating strong {dim}."
)
# Category failure rates
cat_rates = self.failure_rate_by_category(results)
for cat, info in cat_rates.items():
if info["failure_rate"] > 0.3:
recommendations.append(
f"CATEGORY '{cat}': {info['failure_rate']:.0%} failure rate. "
f"Add more diverse training examples for {cat} topics."
)
# Failure clustering
failures = self.find_failures(results)
if failures:
clusters = self.cluster_failures_by_topic(failures)
for cluster in clusters[:3]:
kw = ", ".join(cluster["topic_keywords"][:5])
recommendations.append(
f"TOPIC CLUSTER: {cluster['num_failures']} failures around "
f"[{kw}]. Create targeted training data for these concepts."
)
# General
overall = results.get("overall", {})
overall_score = overall.get("overall", 0)
if overall_score < 0.5:
recommendations.append(
"GENERAL: Overall score is very low. Consider increasing dataset size "
"and diversity before next training run."
)
elif overall_score < 0.65:
recommendations.append(
"GENERAL: Overall score is moderate. Focus on the weakest categories "
"and dimensions for the next dataset iteration."
)
if not recommendations:
recommendations.append(
"No critical issues detected. Continue monitoring with additional benchmarks."
)
return recommendations
# -- report ------------------------------------------------------------
def format_report(self, results: Dict[str, Any]) -> str:
"""Generate a full failure analysis report."""
lines: List[str] = []
lines.append("=" * 70)
lines.append(" FAILURE ANALYSIS REPORT")
lines.append("=" * 70)
# Weakest dimensions
lines.append("")
lines.append("-" * 70)
lines.append(" WEAKEST SCORING DIMENSIONS")
lines.append("-" * 70)
weak_areas = self.find_weak_areas(results)
for dim, score in list(weak_areas.items())[:6]:
status = "FAIL" if score < self.failure_threshold else (
"WEAK" if score < self.weak_threshold else "OK "
)
lines.append(f" [{status}] {dim:<22s} {score:.4f}")
# Category failure rates
lines.append("")
lines.append("-" * 70)
lines.append(" FAILURE RATES BY CATEGORY")
lines.append("-" * 70)
cat_rates = self.failure_rate_by_category(results)
for cat, info in cat_rates.items():
lines.append(
f" {cat:<18s} fail: {info['failure_rate']:>5.1%} "
f"weak: {info['weak_rate']:>5.1%} "
f"avg: {info['avg_score']:.4f}"
)
# Failure clusters
failures = self.find_failures(results)
if failures:
lines.append("")
lines.append("-" * 70)
lines.append(f" FAILURE CLUSTERS ({len(failures)} total failures)")
lines.append("-" * 70)
clusters = self.cluster_failures_by_topic(failures)
for c in clusters[:5]:
kw = ", ".join(c["topic_keywords"][:6])
lines.append(f" Cluster {c['cluster_id']}: "
f"{c['num_failures']} failures, "
f"avg score {c['avg_score']:.4f}")
lines.append(f" Topics: {kw}")
for p in c["sample_prompts"][:2]:
lines.append(f" - {p[:70]}...")
# Recommendations
lines.append("")
lines.append("-" * 70)
lines.append(" RECOMMENDATIONS")
lines.append("-" * 70)
recs = self.generate_recommendations(results)
for i, rec in enumerate(recs, 1):
lines.append(f" {i}. {rec}")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
import argparse
parser = argparse.ArgumentParser(
description="Codette Failure Analyzer - identify patterns in evaluation failures"
)
parser.add_argument(
"--results", "-r",
required=True,
help="Path to benchmark results JSON",
)
parser.add_argument(
"--failure-threshold", "-f",
type=float,
default=0.4,
help="Score threshold for failure (default: 0.4)",
)
parser.add_argument(
"--weak-threshold", "-w",
type=float,
default=0.55,
help="Score threshold for weak (default: 0.55)",
)
args = parser.parse_args()
analyzer = FailureAnalyzer(
failure_threshold=args.failure_threshold,
weak_threshold=args.weak_threshold,
)
results = analyzer.load_results(args.results)
print(analyzer.format_report(results))
if __name__ == "__main__":
main()