Rajan Sharma
Update pipeline/run_two_phase.py
54746f6 verified
"""
Reference pipeline orchestrator.
Wire into your LLM runner. Supports packs built from free-form scenarios.
"""
from pathlib import Path
from typing import Tuple, Dict, Any
import json
from ..validators import schema_validator, unit_validator, math_validator, policy_validator
from ..graders.rule_grader import grade
from .io_utils import load_json
# -------------------------
# REPLACE with your actual LLM runner
def call_model(system_prompt: str, user_prompt: str, temperature: float = 0.2, top_p: float = 0.9) -> str:
raise NotImplementedError("call_model must be implemented in your environment.")
# -------------------------
def build_user_prompt(template_text: str, context: str, data_inputs: str, constraints: str) -> str:
return (
template_text
.replace("{CONTEXT}", context)
.replace("{DATA_INPUTS}", data_inputs)
.replace("{CONSTRAINTS}", constraints)
)
def run_clarityops(pack_dir: str) -> Tuple[Dict[str, Any], Any]:
pack = Path(pack_dir)
root = pack.parents[1]
system_prompt = (root / "prompts" / "system_two_phase.txt").read_text(encoding="utf-8")
user_template = (root / "prompts" / "user_template.txt").read_text(encoding="utf-8")
inputs = load_json(pack / "inputs.json")
constraints = load_json(pack / "constraints.json")
schema_cfg = load_json(pack / "schema.json")
# Optional — only required if you want grading
rubric_path = pack / "rubric.json"
expected_path = pack / "expected.json"
rubric = load_json(rubric_path) if rubric_path.exists() else {"set_equals": [], "must_contain": [], "numeric_equals": []}
expected = load_json(expected_path) if expected_path.exists() else {"note": "No expected gold provided."}
# Build Phase 1 user prompt
context_block = inputs.get("context", "No context provided.")
data_block = json.dumps(inputs.get("data_inputs", {}), ensure_ascii=False, indent=2)
constraints_block = json.dumps(constraints, ensure_ascii=False, indent=2)
# ---- Phase 1: Clarification Questions
user_prompt_phase1 = build_user_prompt(user_template, context_block, data_block, constraints_block)
user_prompt_phase1 += "\n\n[INSTRUCTION TO MODEL] Produce **Phase 1** only. Do not produce Phase 2 yet."
clarif_raw = call_model(system_prompt, user_prompt_phase1)
# ---- Collect answers
clarif_answers_path = pack / "clarifications.json"
if clarif_answers_path.exists():
clarif_answers = load_json(clarif_answers_path)
# If the file is still the placeholder, raise to force operator to fill it
if clarif_answers.get("_note"):
raise RuntimeError(f"Clarification answers required. Edit and remove _note in: {clarif_answers_path}")
else:
raise RuntimeError(f"Clarification answers file missing: {clarif_answers_path}")
# Merge clarifications into inputs for Phase 2
merged_inputs = inputs.copy()
merged_inputs["clarifications"] = clarif_answers
# ---- Phase 2: Structured Analysis
user_prompt_phase2 = build_user_prompt(
user_template,
context_block,
json.dumps(merged_inputs, ensure_ascii=False, indent=2),
constraints_block
)
user_prompt_phase2 += "\n\n[INSTRUCTION TO MODEL] Produce **Phase 2** only (final structured analysis), using clarified inputs."
final_raw = call_model(system_prompt, user_prompt_phase2)
# Parse final output JSON
try:
output = json.loads(final_raw)
except Exception as e:
raise ValueError(f"Failed to parse model output as JSON. Raw:\n{final_raw}") from e
# Validators (hard guardrails)
schema_validator.assert_valid(output, str(root / "schemas" / "analysis_output.schema.json"))
unit_validator.assert_valid(output, str(root / "core" / "policy_global.json"))
math_validator.assert_valid(output)
policy_validator.assert_valid(output, str(pack / "constraints.json"))
# Optional grading
grader_result = grade(output, str(rubric_path)) if rubric_path.exists() else {"score": 0, "max_score": 0, "notes": ["No rubric."]}
output["_grader"] = grader_result
output["_clarifications_summary"] = clarif_raw
return output, clarif_raw