iris-at-text2sparql / src /coordination.py
Alex Latipov
Harden frozen eval prompts and judge JSON handling
d745844
"""Coordination module for the Text2SPARQL repair pipeline.
Merges validation and expert feedback into one decision:
accept, repair, or discard. Selects exactly one repair action.
"""
from __future__ import annotations
import logging
from collections import Counter
from .config import RuntimeConfig
from .models import (
CandidateQuery,
CoordinatorDecision,
ExpertFeedback,
ValidationResult,
)
logger = logging.getLogger(__name__)
# Fixed action priority order for tie-breaking
_ACTION_PRIORITY = [
"syntax_fix",
"entity_relink",
"predicate_replace",
"direction_fix",
"form_fix",
"projection_fix",
"constraint_fix",
]
def decide_action(
candidate: CandidateQuery,
validation: ValidationResult,
feedbacks: list[ExpertFeedback],
repair_iteration: int,
runtime: RuntimeConfig,
) -> CoordinatorDecision:
"""Decide on accept, repair, or discard based on validation and expert feedback.
Fixed decision logic:
1. If parse_ok is false β†’ repair with syntax_fix
2. If all applicable experts say "ok" β†’ accept
3. Otherwise β†’ repair using the best actionable judge suggestion
Action selection: highest-confidence suggested action among experts,
with fixed priority order for tie-breaking.
Args:
candidate: The candidate under inspection.
validation: Validation result for this candidate.
feedbacks: Expert feedback list (may be empty).
repair_iteration: Current repair iteration (0-indexed).
runtime: Runtime configuration.
Returns:
CoordinatorDecision with decision and optional action.
"""
rationale: list[str] = []
# Rule 1: parse failure β†’ syntax fix
if not validation.parse_ok:
rationale.append("Query failed to parse β€” applying syntax fix.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="repair",
selected_action="syntax_fix",
rationale=rationale,
)
# Rule 2: all applicable experts ok β†’ accept
if feedbacks:
all_ok = all(f.verdict == "ok" for f in feedbacks)
if all_ok and validation.parse_ok and validation.execute_ok:
rationale.append("All applicable judges approved the syntax-valid query.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="accept",
selected_action=None,
rationale=rationale,
)
# Rule 3: discard if multiple experts complain strongly
bad_count = 0
for f in feedbacks:
if f.verdict == "bad" and f.confidence >= 0.8:
bad_count += 1
has_action = any(bool(f.suggested_action) for f in feedbacks)
if bad_count >= 2 and not has_action:
rationale.append("Multiple experts reported fatal errors with high confidence and no available actions.")
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="discard",
selected_action=None,
rationale=rationale,
)
# Rule 4: otherwise β†’ repair
# Find the best action
selected_action = _select_best_action(feedbacks, validation, rationale)
return CoordinatorDecision(
candidate_id=candidate.candidate_id,
decision="repair",
selected_action=selected_action,
rationale=rationale,
)
def _select_best_action(
feedbacks: list[ExpertFeedback],
validation: ValidationResult,
rationale: list[str],
) -> str:
"""Select the best repair action from expert feedback.
Selection rule:
- Choose the highest-confidence suggested action among experts
- On tie, prefer actions earlier in _ACTION_PRIORITY
Args:
feedbacks: Expert feedback list.
validation: Validation result.
rationale: Rationale list to append reasoning to.
Returns:
Selected action string.
"""
# Collect all suggested actions with their confidence
action_scores: list[tuple[str, float, int]] = []
for feedback in feedbacks:
if feedback.suggested_action and feedback.verdict != "ok":
action = feedback.suggested_action
priority = (
_ACTION_PRIORITY.index(action)
if action in _ACTION_PRIORITY
else len(_ACTION_PRIORITY)
)
action_scores.append((action, feedback.confidence, priority))
if not action_scores:
# No expert suggested an action β€” infer from validation flags
if "form_mismatch" in validation.suspicious_flags:
rationale.append("No expert action β€” inferring form_fix from validation flags.")
return "form_fix"
if "execute_fail" in validation.suspicious_flags:
rationale.append("No expert action β€” inferring entity_relink from execution failure.")
return "entity_relink"
if "empty_result" in validation.suspicious_flags:
rationale.append("No expert action β€” inferring predicate_replace from empty results.")
return "predicate_replace"
rationale.append("No expert action available β€” defaulting to entity_relink.")
return "entity_relink"
# Sort by confidence DESC, then by priority ASC
action_scores.sort(key=lambda x: (-x[1], x[2]))
best_action = action_scores[0][0]
best_confidence = action_scores[0][1]
rationale.append(
f"Selected action '{best_action}' with confidence {best_confidence:.2f} "
f"from {len(action_scores)} suggested actions."
)
return best_action
def should_stop(
decision: CoordinatorDecision,
repair_iteration: int,
runtime: RuntimeConfig,
) -> bool:
"""Determine whether the repair loop should stop.
Stop conditions:
- Decision is "accept"
- Decision is "discard"
- Repair iteration reached max_repair_iterations
Args:
decision: The coordinator's decision.
repair_iteration: Current iteration (0-indexed).
runtime: Runtime configuration.
Returns:
True if the loop should stop.
"""
if decision.decision == "accept":
logger.info("Stopping: candidate accepted.")
return True
if decision.decision == "discard":
logger.info("Stopping: candidate discarded.")
return True
if repair_iteration >= runtime.max_repair_iterations:
logger.info(
"Stopping: reached max repair iterations (%d).",
runtime.max_repair_iterations,
)
return True
return False