Reframr-RFM-v2-Base / reframr /evaluation.py
OkeyMeta's picture
Add Reframr-RFM-v2-Base release files
52da7b7 verified
import json
import unicodedata
from pathlib import Path
from typing import Sequence
from .model import ReframrModel
META_VOICE_PHRASES = (
"the answer should",
"the response should",
"a strong answer",
"a safe answer",
"the safe answer",
"the safe move",
"the passage",
)
PROTOCOL_STARTS = (
"<tool_call>",
"<tool_result>",
"<source>",
"<final>",
"<reason>",
"<answer>",
)
def load_manifest(path: str | Path) -> dict[str, object]:
return json.loads(Path(path).read_text(encoding="utf-8"))
def _expected_next_token(model: ReframrModel, expected_text: str) -> str:
assert model.tokenizer is not None
encoded = model.tokenizer.encode(f" {expected_text}")
return encoded[0] if encoded else ""
def _normalize_text(text: str) -> str:
return " ".join(text.casefold().split())
def _word_ngrams(words: list[str], size: int) -> list[tuple[str, ...]]:
if size <= 0 or len(words) < size:
return []
return [tuple(words[index : index + size]) for index in range(len(words) - size + 1)]
def _distinct_ratio(words: list[str], size: int) -> float:
grams = _word_ngrams(words, size)
if not grams:
return 0.0
return len(set(grams)) / len(grams)
def _repetition_ratio(words: list[str], size: int) -> float:
grams = _word_ngrams(words, size)
if not grams:
return 0.0
repeated = len(grams) - len(set(grams))
return repeated / len(grams)
def _source_replay_index(
sources: Sequence[str] | None,
*,
ngram_size: int,
) -> list[tuple[str, set[tuple[str, ...]]]]:
if not sources:
return []
index: list[tuple[str, set[tuple[str, ...]]]] = []
for source in sources:
normalized = _normalize_text(str(source))
grams = set(_word_ngrams(normalized.split(), ngram_size))
if grams:
index.append((normalized, grams))
return index
def _source_replay_overlap(
generated: str,
replay_index: list[tuple[str, set[tuple[str, ...]]]],
*,
ngram_size: int,
) -> tuple[float, str]:
generated_grams = set(_word_ngrams(_normalize_text(generated).split(), ngram_size))
if not generated_grams or not replay_index:
return 0.0, ""
best_overlap = 0.0
best_source = ""
for normalized_source, source_grams in replay_index:
overlap = len(generated_grams & source_grams) / len(generated_grams)
if overlap > best_overlap:
best_overlap = overlap
best_source = normalized_source
return best_overlap, best_source
def _text_from_replay_row(row: object) -> str:
if isinstance(row, str):
return row.strip()
if not isinstance(row, dict):
return ""
for field in ("answer", "response", "chosen", "text", "content", "completion"):
value = row.get(field)
if isinstance(value, str) and value.strip():
return value.strip()
if "messages" in row:
return _content_to_text(row["messages"])
return ""
def load_replay_sources(
paths: Sequence[str | Path],
*,
limit: int = 10_000,
) -> list[str]:
sources: list[str] = []
for source_path in paths:
path = Path(source_path)
if not path.exists():
continue
suffix = path.suffix.lower()
if suffix == ".jsonl":
for line in path.read_text(encoding="utf-8").splitlines():
if limit > 0 and len(sources) >= limit:
return sources
if not line.strip():
continue
text = _text_from_replay_row(json.loads(line))
if text:
sources.append(text)
continue
if suffix == ".json":
payload = json.loads(path.read_text(encoding="utf-8"))
rows = payload.get("records", payload.get("texts", payload)) if isinstance(payload, dict) else payload
if isinstance(rows, list):
for row in rows:
if limit > 0 and len(sources) >= limit:
return sources
text = _text_from_replay_row(row)
if text:
sources.append(text)
else:
text = _text_from_replay_row(rows)
if text:
sources.append(text)
continue
text = path.read_text(encoding="utf-8").strip()
if text:
sources.append(text)
if limit > 0 and len(sources) >= limit:
return sources[:limit]
return sources[:limit] if limit > 0 else sources
def _normalize_phrase_list(value: object) -> list[str]:
if not isinstance(value, list):
return []
phrases: list[str] = []
for item in value:
if isinstance(item, str):
phrase = item.strip()
if phrase:
phrases.append(phrase)
return phrases
def _normalize_required_groups(value: object) -> list[list[str]]:
if not isinstance(value, list):
return []
groups: list[list[str]] = []
for raw_group in value:
if isinstance(raw_group, list):
group = [
str(term).casefold().strip()
for term in raw_group
if str(term).strip()
]
else:
term = str(raw_group).casefold().strip()
group = [term] if term else []
if group:
groups.append(group)
return groups
def _required_group_summary(
normalized_text: str,
required_groups: object,
) -> tuple[int, int, float]:
groups = _normalize_required_groups(required_groups)
hit_count = sum(
1
for group in groups
if any(term in normalized_text for term in group)
)
group_count = len(groups)
coverage = hit_count / group_count if group_count else 0.0
return hit_count, group_count, coverage
def _banned_phrase_hit(normalized_text: str, banned_phrases: object) -> bool:
return any(
_normalize_text(phrase) in normalized_text
for phrase in _normalize_phrase_list(banned_phrases)
if _normalize_text(phrase)
)
def _meta_voice_hit(normalized_text: str) -> bool:
return any(phrase in normalized_text for phrase in META_VOICE_PHRASES)
def _has_malformed_sentence_start(text: str) -> bool:
stripped = text.strip()
if not stripped:
return True
if any(stripped.startswith(protocol) for protocol in PROTOCOL_STARTS):
return False
leading_quote = False
for character in stripped:
if character.isspace():
continue
category = unicodedata.category(character)
if category.startswith(("P", "S")):
if character in {"'", '"', "‘", "’", "“", "”"}:
leading_quote = True
continue
if character.isalpha():
if leading_quote:
return False
return character.islower()
return False
return False
def _quality_gate_passed(
*,
word_count: int,
punctuation_hit: bool,
required_group_coverage: float,
exact_copy: bool,
banned_phrase_hit: bool,
meta_voice_hit: bool,
malformed_start: bool,
repetition_3: float,
tool_call_hit: bool,
fabricated_tool_result_hit: bool,
fabricated_source_hit: bool,
source_replay_hit: bool,
item: dict[str, object],
) -> bool:
blocking_failure = any(
(
exact_copy,
banned_phrase_hit,
meta_voice_hit,
malformed_start,
fabricated_tool_result_hit,
fabricated_source_hit,
source_replay_hit,
)
)
if bool(item.get("allow_tool_call", False)) and tool_call_hit:
return not blocking_failure
min_words = int(item.get("min_words", 1))
required_min_coverage = float(
item.get(
"min_required_group_coverage",
1.0 if item.get("required_groups") else 0.0,
)
)
require_punctuation = bool(item.get("require_punctuation", False))
max_repetition_3 = float(item.get("max_repetition_3", 0.35))
if (
_item_contains_source_evidence(item)
and required_group_coverage >= required_min_coverage
and (punctuation_hit or not require_punctuation)
and repetition_3 <= max_repetition_3
):
return not blocking_failure
if word_count < min_words:
return False
if required_group_coverage < required_min_coverage:
return False
if require_punctuation and not punctuation_hit:
return False
if repetition_3 > max_repetition_3:
return False
return not blocking_failure
def _item_contains_source_evidence(value: object) -> bool:
if isinstance(value, dict):
sources = value.get("sources")
if isinstance(sources, list) and any(isinstance(source, dict) for source in sources):
return True
if {"title", "url", "snippet"}.intersection(value.keys()) and (
value.get("title") or value.get("snippet")
):
return True
return any(_item_contains_source_evidence(child) for child in value.values())
if isinstance(value, list):
return any(_item_contains_source_evidence(child) for child in value)
return False
def _variation_group_summary(samples: list[dict[str, object]]) -> dict[str, dict[str, object]]:
grouped: dict[str, list[str]] = {}
for sample in samples:
key = str(sample.get("variation_key", "")).strip()
if not key:
continue
grouped.setdefault(key, []).append(
_normalize_text(str(sample.get("generated_text", "")))
)
summaries: dict[str, dict[str, object]] = {}
for key, responses in grouped.items():
sample_count = len(responses)
unique_count = len(set(responses))
summaries[key] = {
"sample_count": sample_count,
"unique_response_count": unique_count,
"unique_response_rate": unique_count / sample_count if sample_count else 0.0,
"duplicate_response_rate": (
(sample_count - unique_count) / sample_count
if sample_count
else 0.0
),
}
return summaries
def _content_to_text(content: object) -> str:
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
if "text" in item:
parts.append(str(item["text"]))
elif item.get("type") == "text" and "content" in item:
parts.append(str(item["content"]))
elif item is not None:
parts.append(str(item))
return " ".join(part.strip() for part in parts if part and part.strip()).strip()
if content is None:
return ""
return str(content).strip()
def _render_tool_call(call: object) -> str:
if not isinstance(call, dict):
return f"<tool_call> {str(call).strip()}"
function_payload = call.get("function", {})
function = function_payload if isinstance(function_payload, dict) else {}
name = str(call.get("name", function.get("name", "tool"))).strip() or "tool"
arguments = call.get("arguments", function.get("arguments", {}))
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False, separators=(",", ":"))
return f"<tool_call> {name} {arguments}".strip()
def _render_tool_result(tool_name: str, result: object) -> list[str]:
if isinstance(result, dict):
status = str(result.get("status", "ok")).strip() or "ok"
if status != "ok":
error = str(result.get("error", status)).strip() or status
return [f"<tool_result> {tool_name} failed: {error}"]
lines = [f"<tool_result> {tool_name} ok"]
sources = result.get("sources", [])
if isinstance(sources, list):
for source in sources:
if not isinstance(source, dict):
continue
title = str(source.get("title", "Source")).strip() or "Source"
url = str(source.get("url", "")).strip()
snippet = str(source.get("snippet", source.get("text", ""))).strip()
lines.append(f"<source> {title} | {url} | {snippet}".strip())
return lines
content = _content_to_text(result)
return [f"<tool_result> {tool_name} {content or 'empty'}"]
def _compose_prompt_context(item: dict[str, object]) -> str:
prompt = str(item.get("prompt", "")).strip()
system = str(item.get("system", "")).strip()
lines: list[str] = []
tool_protocol_seen = False
if system:
lines.append(system)
messages = item.get("messages")
if isinstance(messages, list):
for message in messages:
if not isinstance(message, dict):
continue
role = str(message.get("role", "")).casefold()
content = _content_to_text(message.get("content", ""))
if role == "system":
if content:
lines.append(f"System instruction: {content}")
elif role == "user":
if content:
lines.append(f"User: {content}")
elif role == "assistant":
if content:
lines.append(f"Assistant: {content}")
if "<tool_call>" in content:
tool_protocol_seen = True
tool_calls = message.get("tool_calls", [])
if isinstance(tool_calls, list):
for call in tool_calls:
lines.append(_render_tool_call(call))
tool_protocol_seen = True
elif role == "tool":
tool_name = str(message.get("name", message.get("tool_call_id", "tool")))
lines.extend(_render_tool_result(tool_name, message.get("content", "")))
tool_protocol_seen = True
elif content:
lines.append(f"{role.capitalize()}: {content}")
if prompt:
lines.append(f"User: {prompt}" if isinstance(messages, list) else prompt)
tool_results = item.get("tool_results")
if isinstance(tool_results, list):
for result in tool_results:
tool_name = "tool"
if isinstance(result, dict):
tool_name = str(result.get("name", result.get("tool", "tool")))
lines.extend(_render_tool_result(tool_name, result))
tool_protocol_seen = True
elif tool_results:
lines.extend(_render_tool_result("tool", tool_results))
tool_protocol_seen = True
if tool_protocol_seen:
lines.append("<final>")
return "\n".join(line for line in lines if line).strip()
def _open_ended_score(
model: ReframrModel,
sample: dict[str, object],
*,
reasoning_mode: str | None,
) -> dict[str, object]:
generated = model.generate_text(
str(sample["context"]),
max_tokens=int(sample.get("max_tokens", 56)),
reasoning_mode=reasoning_mode,
)
normalized = _normalize_text(generated)
required_groups = [
[str(term).casefold() for term in group]
for group in sample.get("required_groups", [])
]
satisfied_groups = sum(
1
for group in required_groups
if any(term in normalized for term in group)
)
group_coverage = (
satisfied_groups / len(required_groups) if required_groups else 0.0
)
punctuation_hit = any(mark in generated for mark in ".,;:?!")
min_words = int(sample.get("min_words", 12))
min_word_hit = len(generated.split()) >= min_words
banned_phrases = [str(phrase) for phrase in sample.get("banned_phrases", [])]
exact_copy = any(normalized == _normalize_text(phrase) for phrase in banned_phrases)
novelty_hit = not exact_copy
require_punctuation = bool(sample.get("require_punctuation", True))
score_components = [
group_coverage,
1.0 if min_word_hit else 0.0,
1.0 if novelty_hit else 0.0,
]
if require_punctuation:
score_components.append(1.0 if punctuation_hit else 0.0)
return {
"section": str(sample["section"]),
"context": str(sample["context"]),
"generated_text": generated,
"group_coverage": group_coverage,
"punctuation_hit": punctuation_hit,
"min_word_hit": min_word_hit,
"exact_copy": exact_copy,
"score": sum(score_components) / len(score_components) if score_components else 0.0,
}
def evaluate_manifest(
model: ReframrModel,
manifest: dict[str, object],
*,
reasoning_mode: str | None = None,
top_k: int = 5,
) -> dict[str, object]:
results: dict[str, object] = {
"corpus_name": manifest["name"],
"reasoning_mode": reasoning_mode or model.config.default_reasoning_profile,
"splits": {},
}
splits = manifest["splits"]
for split_name in ("memorization", "generalization"):
samples = splits[split_name]
top1_hits = 0
topk_hits = 0
expected_probabilities = []
for sample in samples:
distribution = model.predict_next_token_distribution(
sample["context"],
reasoning_mode=reasoning_mode,
)
ranked = sorted(distribution.items(), key=lambda item: item[1], reverse=True)
predicted = ranked[0][0] if ranked else ""
top_tokens = [token for token, _ in ranked[:top_k]]
expected = _expected_next_token(model, sample["expected"])
expected_probability = distribution.get(expected, 0.0)
if predicted == expected:
top1_hits += 1
if expected in top_tokens:
topk_hits += 1
expected_probabilities.append(expected_probability)
sample_count = len(samples)
mean_expected_probability = (
sum(expected_probabilities) / sample_count if sample_count else 0.0
)
results["splits"][split_name] = {
"sample_count": sample_count,
"top1_accuracy": top1_hits / sample_count if sample_count else 0.0,
"topk_accuracy": topk_hits / sample_count if sample_count else 0.0,
"mean_expected_probability": mean_expected_probability,
}
open_ended_samples = splits.get("open_ended", [])
if open_ended_samples:
sample_results = [
_open_ended_score(
model,
sample,
reasoning_mode=reasoning_mode,
)
for sample in open_ended_samples
]
sample_count = len(sample_results)
results["open_ended"] = {
"sample_count": sample_count,
"mean_score": (
sum(float(sample["score"]) for sample in sample_results) / sample_count
if sample_count
else 0.0
),
"mean_group_coverage": (
sum(float(sample["group_coverage"]) for sample in sample_results) / sample_count
if sample_count
else 0.0
),
"punctuation_rate": (
sum(1 for sample in sample_results if bool(sample["punctuation_hit"])) / sample_count
if sample_count
else 0.0
),
"min_word_rate": (
sum(1 for sample in sample_results if bool(sample["min_word_hit"])) / sample_count
if sample_count
else 0.0
),
"exact_copy_rate": (
sum(1 for sample in sample_results if bool(sample["exact_copy"])) / sample_count
if sample_count
else 0.0
),
"samples": sample_results,
}
return results
def benchmark_open_prompts(
model: ReframrModel,
prompts: list[dict[str, object]],
*,
reasoning_mode: str | None = None,
max_tokens: int = 64,
temperature: float = 0.82,
top_k: int = 24,
top_p: float = 0.92,
repetition_penalty: float = 1.18,
replay_sources: Sequence[str] | None = None,
replay_ngram_size: int = 8,
replay_overlap_threshold: float = 0.70,
) -> dict[str, object]:
samples: list[dict[str, object]] = []
normalized_replay_ngram_size = max(3, int(replay_ngram_size))
replay_index = _source_replay_index(
replay_sources,
ngram_size=normalized_replay_ngram_size,
)
avoid_texts = list(replay_sources or [])
for item in prompts:
prompt = str(item["prompt"])
context = _compose_prompt_context(item)
generated = model.generate_text(
context,
max_tokens=max_tokens,
reasoning_mode=reasoning_mode,
temperature=temperature,
top_k=top_k,
top_p=top_p,
repetition_penalty=repetition_penalty,
avoid_texts=avoid_texts,
)
normalized = _normalize_text(generated)
banned_phrases = [str(phrase) for phrase in item.get("banned_phrases", [])]
exact_copy = any(
normalized == _normalize_text(phrase)
for phrase in banned_phrases
)
words = generated.split()
punctuation_hit = any(mark in generated for mark in ".,;:?!")
tool_call_hit = "<tool_call>" in generated
generated_tool_result_hit = "<tool_result>" in generated
generated_source_hit = "<source>" in generated
fabricated_tool_result_hit = generated_tool_result_hit and "<tool_result>" not in context
fabricated_source_hit = generated_source_hit and "<source>" not in context
required_group_hits, required_group_count, required_group_coverage = (
_required_group_summary(normalized, item.get("required_groups", []))
)
source_replay_overlap, source_replay_source = _source_replay_overlap(
generated,
replay_index,
ngram_size=normalized_replay_ngram_size,
)
source_replay_hit = (
bool(replay_index)
and source_replay_overlap >= float(replay_overlap_threshold)
)
banned_hit = _banned_phrase_hit(normalized, item.get("banned_phrases", []))
meta_hit = _meta_voice_hit(normalized)
malformed_start = _has_malformed_sentence_start(generated)
distinct_2 = _distinct_ratio(words, 2)
distinct_3 = _distinct_ratio(words, 3)
repetition_3 = _repetition_ratio(words, 3)
passed_quality_gate = _quality_gate_passed(
word_count=len(words),
punctuation_hit=punctuation_hit,
required_group_coverage=required_group_coverage,
exact_copy=exact_copy,
banned_phrase_hit=banned_hit,
meta_voice_hit=meta_hit,
malformed_start=malformed_start,
repetition_3=repetition_3,
tool_call_hit=tool_call_hit,
fabricated_tool_result_hit=fabricated_tool_result_hit,
fabricated_source_hit=fabricated_source_hit,
source_replay_hit=source_replay_hit,
item=item,
)
samples.append(
{
"prompt": prompt,
"context": context,
"tags": [str(tag) for tag in item.get("tags", [])],
"variation_key": str(item.get("variation_key", "")).strip(),
"generated_text": generated,
"word_count": len(words),
"char_count": len(generated),
"punctuation_hit": punctuation_hit,
"distinct_2": distinct_2,
"distinct_3": distinct_3,
"repetition_3": repetition_3,
"exact_copy": exact_copy,
"banned_phrase_hit": banned_hit,
"tool_call_hit": tool_call_hit,
"generated_tool_result_hit": generated_tool_result_hit,
"generated_source_hit": generated_source_hit,
"fabricated_tool_result_hit": fabricated_tool_result_hit,
"fabricated_source_hit": fabricated_source_hit,
"source_replay_overlap": source_replay_overlap,
"source_replay_hit": source_replay_hit,
"source_replay_source": source_replay_source,
"required_group_hits": required_group_hits,
"required_group_count": required_group_count,
"required_group_coverage": required_group_coverage,
"malformed_start": malformed_start,
"meta_voice_hit": meta_hit,
"passed_quality_gate": passed_quality_gate,
}
)
sample_count = len(samples)
normalized_responses = [
_normalize_text(str(sample["generated_text"]))
for sample in samples
]
unique_response_count = len(set(normalized_responses))
exact_copy_count = sum(1 for sample in samples if bool(sample["exact_copy"]))
banned_phrase_count = sum(
1 for sample in samples if bool(sample["banned_phrase_hit"])
)
malformed_start_count = sum(
1 for sample in samples if bool(sample["malformed_start"])
)
meta_voice_count = sum(1 for sample in samples if bool(sample["meta_voice_hit"]))
tool_call_count = sum(1 for sample in samples if bool(sample["tool_call_hit"]))
fabricated_tool_result_count = sum(
1 for sample in samples if bool(sample["fabricated_tool_result_hit"])
)
fabricated_source_count = sum(
1 for sample in samples if bool(sample["fabricated_source_hit"])
)
source_replay_count = sum(
1 for sample in samples if bool(sample["source_replay_hit"])
)
quality_pass_count = sum(
1 for sample in samples if bool(sample["passed_quality_gate"])
)
variation_groups = _variation_group_summary(samples)
worst_variation_group_unique_rate = (
min(
float(summary["unique_response_rate"])
for summary in variation_groups.values()
)
if variation_groups
else 1.0
)
required_group_samples = [
sample
for sample in samples
if int(sample.get("required_group_count", 0)) > 0
]
required_group_sample_count = len(required_group_samples)
mean_required_group_coverage = (
sum(float(sample["required_group_coverage"]) for sample in required_group_samples)
/ required_group_sample_count
if required_group_sample_count
else 0.0
)
quality_scores = [
quality_pass_count / sample_count if sample_count else 0.0,
unique_response_count / sample_count if sample_count else 0.0,
mean_required_group_coverage,
1.0 - (exact_copy_count / sample_count if sample_count else 0.0),
1.0 - (banned_phrase_count / sample_count if sample_count else 0.0),
1.0 - (fabricated_tool_result_count / sample_count if sample_count else 0.0),
1.0 - (fabricated_source_count / sample_count if sample_count else 0.0),
1.0 - (source_replay_count / sample_count if sample_count else 0.0),
1.0 - (malformed_start_count / sample_count if sample_count else 0.0),
1.0 - (meta_voice_count / sample_count if sample_count else 0.0),
worst_variation_group_unique_rate,
]
return {
"schema_version": "reframr.open_benchmark.v2",
"sample_count": sample_count,
"reasoning_mode": reasoning_mode or model.config.default_reasoning_profile,
"generation_policy": {
"temperature": temperature,
"top_k": top_k,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
},
"mean_word_count": (
sum(int(sample["word_count"]) for sample in samples) / sample_count
if sample_count
else 0.0
),
"mean_char_count": (
sum(int(sample["char_count"]) for sample in samples) / sample_count
if sample_count
else 0.0
),
"punctuation_rate": (
sum(1 for sample in samples if bool(sample["punctuation_hit"])) / sample_count
if sample_count
else 0.0
),
"required_group_sample_count": required_group_sample_count,
"mean_required_group_coverage": mean_required_group_coverage,
"mean_distinct_2": (
sum(float(sample["distinct_2"]) for sample in samples) / sample_count
if sample_count
else 0.0
),
"mean_distinct_3": (
sum(float(sample["distinct_3"]) for sample in samples) / sample_count
if sample_count
else 0.0
),
"mean_repetition_3": (
sum(float(sample["repetition_3"]) for sample in samples) / sample_count
if sample_count
else 0.0
),
"exact_copy_count": exact_copy_count,
"exact_copy_rate": exact_copy_count / sample_count if sample_count else 0.0,
"banned_phrase_count": banned_phrase_count,
"banned_phrase_rate": (
banned_phrase_count / sample_count if sample_count else 0.0
),
"malformed_start_count": malformed_start_count,
"malformed_start_rate": (
malformed_start_count / sample_count if sample_count else 0.0
),
"meta_voice_count": meta_voice_count,
"meta_voice_rate": meta_voice_count / sample_count if sample_count else 0.0,
"tool_call_count": tool_call_count,
"tool_call_rate": tool_call_count / sample_count if sample_count else 0.0,
"fabricated_tool_result_count": fabricated_tool_result_count,
"fabricated_tool_result_rate": (
fabricated_tool_result_count / sample_count if sample_count else 0.0
),
"fabricated_source_count": fabricated_source_count,
"fabricated_source_rate": (
fabricated_source_count / sample_count if sample_count else 0.0
),
"source_replay_count": source_replay_count,
"source_replay_rate": (
source_replay_count / sample_count if sample_count else 0.0
),
"replay_ngram_size": normalized_replay_ngram_size,
"replay_overlap_threshold": float(replay_overlap_threshold),
"quality_pass_count": quality_pass_count,
"quality_pass_rate": quality_pass_count / sample_count if sample_count else 0.0,
"unique_response_count": unique_response_count,
"unique_response_rate": unique_response_count / sample_count if sample_count else 0.0,
"duplicate_response_rate": (
(sample_count - unique_response_count) / sample_count
if sample_count
else 0.0
),
"variation_groups": variation_groups,
"worst_variation_group_unique_rate": worst_variation_group_unique_rate,
"v2_readiness_score": sum(quality_scores) / len(quality_scores),
"samples": samples,
}