Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Coordination module for the Text2SPARQL repair pipeline. | |
| Merges validation and expert feedback into one decision: | |
| accept, repair, or discard. Selects exactly one repair action. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from collections import Counter | |
| from .config import RuntimeConfig | |
| from .models import ( | |
| CandidateQuery, | |
| CoordinatorDecision, | |
| ExpertFeedback, | |
| ValidationResult, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Fixed action priority order for tie-breaking | |
| _ACTION_PRIORITY = [ | |
| "syntax_fix", | |
| "entity_relink", | |
| "predicate_replace", | |
| "direction_fix", | |
| "form_fix", | |
| "projection_fix", | |
| "constraint_fix", | |
| ] | |
| def decide_action( | |
| candidate: CandidateQuery, | |
| validation: ValidationResult, | |
| feedbacks: list[ExpertFeedback], | |
| repair_iteration: int, | |
| runtime: RuntimeConfig, | |
| ) -> CoordinatorDecision: | |
| """Decide on accept, repair, or discard based on validation and expert feedback. | |
| Fixed decision logic: | |
| 1. If parse_ok is false β repair with syntax_fix | |
| 2. If all applicable experts say "ok" β accept | |
| 3. Otherwise β repair using the best actionable judge suggestion | |
| Action selection: highest-confidence suggested action among experts, | |
| with fixed priority order for tie-breaking. | |
| Args: | |
| candidate: The candidate under inspection. | |
| validation: Validation result for this candidate. | |
| feedbacks: Expert feedback list (may be empty). | |
| repair_iteration: Current repair iteration (0-indexed). | |
| runtime: Runtime configuration. | |
| Returns: | |
| CoordinatorDecision with decision and optional action. | |
| """ | |
| rationale: list[str] = [] | |
| # Rule 1: parse failure β syntax fix | |
| if not validation.parse_ok: | |
| rationale.append("Query failed to parse β applying syntax fix.") | |
| return CoordinatorDecision( | |
| candidate_id=candidate.candidate_id, | |
| decision="repair", | |
| selected_action="syntax_fix", | |
| rationale=rationale, | |
| ) | |
| # Rule 2: all applicable experts ok β accept | |
| if feedbacks: | |
| all_ok = all(f.verdict == "ok" for f in feedbacks) | |
| if all_ok and validation.parse_ok and validation.execute_ok: | |
| rationale.append("All applicable judges approved the syntax-valid query.") | |
| return CoordinatorDecision( | |
| candidate_id=candidate.candidate_id, | |
| decision="accept", | |
| selected_action=None, | |
| rationale=rationale, | |
| ) | |
| # Rule 3: discard if multiple experts complain strongly | |
| bad_count = 0 | |
| for f in feedbacks: | |
| if f.verdict == "bad" and f.confidence >= 0.8: | |
| bad_count += 1 | |
| has_action = any(bool(f.suggested_action) for f in feedbacks) | |
| if bad_count >= 2 and not has_action: | |
| rationale.append("Multiple experts reported fatal errors with high confidence and no available actions.") | |
| return CoordinatorDecision( | |
| candidate_id=candidate.candidate_id, | |
| decision="discard", | |
| selected_action=None, | |
| rationale=rationale, | |
| ) | |
| # Rule 4: otherwise β repair | |
| # Find the best action | |
| selected_action = _select_best_action(feedbacks, validation, rationale) | |
| return CoordinatorDecision( | |
| candidate_id=candidate.candidate_id, | |
| decision="repair", | |
| selected_action=selected_action, | |
| rationale=rationale, | |
| ) | |
| def _select_best_action( | |
| feedbacks: list[ExpertFeedback], | |
| validation: ValidationResult, | |
| rationale: list[str], | |
| ) -> str: | |
| """Select the best repair action from expert feedback. | |
| Selection rule: | |
| - Choose the highest-confidence suggested action among experts | |
| - On tie, prefer actions earlier in _ACTION_PRIORITY | |
| Args: | |
| feedbacks: Expert feedback list. | |
| validation: Validation result. | |
| rationale: Rationale list to append reasoning to. | |
| Returns: | |
| Selected action string. | |
| """ | |
| # Collect all suggested actions with their confidence | |
| action_scores: list[tuple[str, float, int]] = [] | |
| for feedback in feedbacks: | |
| if feedback.suggested_action and feedback.verdict != "ok": | |
| action = feedback.suggested_action | |
| priority = ( | |
| _ACTION_PRIORITY.index(action) | |
| if action in _ACTION_PRIORITY | |
| else len(_ACTION_PRIORITY) | |
| ) | |
| action_scores.append((action, feedback.confidence, priority)) | |
| if not action_scores: | |
| # No expert suggested an action β infer from validation flags | |
| if "form_mismatch" in validation.suspicious_flags: | |
| rationale.append("No expert action β inferring form_fix from validation flags.") | |
| return "form_fix" | |
| if "execute_fail" in validation.suspicious_flags: | |
| rationale.append("No expert action β inferring entity_relink from execution failure.") | |
| return "entity_relink" | |
| if "empty_result" in validation.suspicious_flags: | |
| rationale.append("No expert action β inferring predicate_replace from empty results.") | |
| return "predicate_replace" | |
| rationale.append("No expert action available β defaulting to entity_relink.") | |
| return "entity_relink" | |
| # Sort by confidence DESC, then by priority ASC | |
| action_scores.sort(key=lambda x: (-x[1], x[2])) | |
| best_action = action_scores[0][0] | |
| best_confidence = action_scores[0][1] | |
| rationale.append( | |
| f"Selected action '{best_action}' with confidence {best_confidence:.2f} " | |
| f"from {len(action_scores)} suggested actions." | |
| ) | |
| return best_action | |
| def should_stop( | |
| decision: CoordinatorDecision, | |
| repair_iteration: int, | |
| runtime: RuntimeConfig, | |
| ) -> bool: | |
| """Determine whether the repair loop should stop. | |
| Stop conditions: | |
| - Decision is "accept" | |
| - Decision is "discard" | |
| - Repair iteration reached max_repair_iterations | |
| Args: | |
| decision: The coordinator's decision. | |
| repair_iteration: Current iteration (0-indexed). | |
| runtime: Runtime configuration. | |
| Returns: | |
| True if the loop should stop. | |
| """ | |
| if decision.decision == "accept": | |
| logger.info("Stopping: candidate accepted.") | |
| return True | |
| if decision.decision == "discard": | |
| logger.info("Stopping: candidate discarded.") | |
| return True | |
| if repair_iteration >= runtime.max_repair_iterations: | |
| logger.info( | |
| "Stopping: reached max repair iterations (%d).", | |
| runtime.max_repair_iterations, | |
| ) | |
| return True | |
| return False | |