| """ |
| Baseline inference script for IndicScriptureQA. |
| |
| Runs an LLM agent against all 3 tasks via the OpenEnv HTTP API. |
| Emits structured [START]/[STEP]/[END] logs per the OpenEnv spec. |
| |
| The agent evaluates BOTH factual accuracy AND semantic structure: |
| - factual: hallucination detection, correction |
| - structural: coherence, completeness, terminology, logical ordering |
| |
| Environment variables: |
| API_BASE_URL LLM endpoint (default: HF router) |
| MODEL_NAME Model identifier (default: Qwen2.5-72B-Instruct) |
| HF_TOKEN API key |
| PING_URL Environment server (default: http://localhost:8000) |
| """ |
|
|
| import json |
| import os |
| import textwrap |
| from typing import Dict, List, Optional |
|
|
| import requests |
| from openai import OpenAI |
|
|
| |
|
|
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") |
| ENV_URL = os.getenv("PING_URL", "http://localhost:8000") |
| BENCHMARK = "indic_scripture_qa" |
| TEMPERATURE = 0.4 |
| MAX_TOKENS = 600 |
|
|
| TASKS = [ |
| {"name": "verify-factual", "max_steps": 5}, |
| {"name": "correct-and-cite", "max_steps": 8}, |
| {"name": "fix-hallucination", "max_steps": 12}, |
| ] |
|
|
| SYSTEM_PROMPT = textwrap.dedent("""\ |
| You are an expert agent that both CORRECTS hallucinations and IMPROVES the |
| semantic structure of answers about Indic scriptures (Vedas, Upanishads, |
| Ramayana, Mahabharata, Bhagavad Gita, Puranas). |
| |
| Each turn you receive an observation with: |
| - question, current_answer, retrieved_passages, current_citations, |
| steps_remaining, feedback, structural_hints |
| |
| You must reply with EXACTLY ONE JSON object (no markdown, no explanation): |
| { |
| "action_type": "RETRIEVE" | "EDIT" | "RESTRUCTURE" | "CITE" | "ACCEPT" | "REJECT", |
| "payload": "<string or null>" |
| } |
| |
| Actions: |
| RETRIEVE β fetch source passages to verify facts |
| EDIT β rewrite the answer to fix factual errors AND improve content |
| RESTRUCTURE β reorganise the answer's flow, ordering, and coherence WITHOUT |
| changing facts (use when facts are right but structure is poor) |
| CITE β add a scripture citation (e.g. "Bhagavad Gita 2.47") |
| ACCEPT β finalise when answer is both accurate and well-structured |
| REJECT β only if the answer is fundamentally unsalvageable |
| |
| Strategy: |
| 1. RETRIEVE first (1β2 times) to get authoritative source passages. |
| 2. Check facts against retrieved passages. EDIT to fix any errors. |
| 3. Read structural_hints. If the answer's flow, terminology, or completeness |
| is poor, use RESTRUCTURE to reorganise it. |
| 4. CITE relevant scripture references. |
| 5. ACCEPT when the answer is factually accurate, well-structured, uses |
| correct Sanskrit terminology, and covers all required aspects. |
| 6. Be efficient β fewer steps score higher. |
| |
| Evaluation axes (the grader checks ALL of these): |
| - Factual similarity to ground truth |
| - Citation accuracy |
| - Terminology precision (correct Sanskrit/domain terms, no misconception markers) |
| - Completeness (all required conceptual aspects covered) |
| - Logical ordering (concepts in proper sequence) |
| - Coherence (smooth transitions, balanced sentence structure) |
| """) |
|
|
|
|
| |
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| print( |
| f"[STEP] step={step} action={action} reward={reward:.2f} " |
| f"done={str(done).lower()} error={error or 'null'}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print( |
| f"[END] success={str(success).lower()} steps={steps} " |
| f"score={score:.2f} rewards={rewards_str}", |
| flush=True, |
| ) |
|
|
|
|
| |
|
|
| def env_reset(task_name: str, scenario_index: int = 0) -> Dict: |
| resp = requests.post( |
| f"{ENV_URL}/reset", |
| json={"task_name": task_name, "scenario_index": scenario_index}, |
| timeout=30, |
| ) |
| resp.raise_for_status() |
| return resp.json() |
|
|
|
|
| def env_step(action_type: str, payload: Optional[str] = None) -> Dict: |
| resp = requests.post( |
| f"{ENV_URL}/step", |
| json={"action_type": action_type, "payload": payload}, |
| timeout=30, |
| ) |
| resp.raise_for_status() |
| return resp.json() |
|
|
|
|
| |
|
|
| def build_user_prompt(obs: Dict, step: int) -> str: |
| return json.dumps({ |
| "step": step, |
| "question": obs["question"], |
| "current_answer": obs["current_answer"], |
| "retrieved_passages": obs["retrieved_passages"], |
| "current_citations": obs["current_citations"], |
| "steps_remaining": obs["steps_remaining"], |
| "feedback": obs.get("feedback"), |
| "structural_hints": obs.get("structural_hints", []), |
| }, indent=2) |
|
|
|
|
| def get_agent_action(client: OpenAI, obs: Dict, step: int) -> Dict: |
| """Ask the LLM for the next action.""" |
| user_prompt = build_user_prompt(obs, step) |
| try: |
| completion = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| temperature=TEMPERATURE, |
| max_tokens=MAX_TOKENS, |
| stream=False, |
| ) |
| raw = (completion.choices[0].message.content or "").strip() |
| if raw.startswith("```"): |
| raw = raw.split("\n", 1)[-1].rsplit("```", 1)[0].strip() |
| return json.loads(raw) |
| except Exception as exc: |
| print(f"[DEBUG] LLM parse error: {exc}", flush=True) |
| if step <= 2: |
| return {"action_type": "RETRIEVE", "payload": None} |
| return {"action_type": "ACCEPT", "payload": None} |
|
|
|
|
| |
|
|
| def run_task(client: OpenAI, task_name: str, max_steps: int, scenario_index: int = 0) -> float: |
| """Run one episode. Returns score in [0, 1].""" |
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) |
|
|
| rewards: List[float] = [] |
| steps_taken = 0 |
| score = 0.0 |
| success = False |
|
|
| try: |
| result = env_reset(task_name, scenario_index) |
| obs = result["observation"] |
|
|
| for step in range(1, max_steps + 1): |
| if result.get("done", False): |
| break |
|
|
| agent_action = get_agent_action(client, obs, step) |
| action_type = agent_action.get("action_type", "ACCEPT") |
| payload = agent_action.get("payload") |
|
|
| result = env_step(action_type, payload) |
| obs = result["observation"] |
| reward = result.get("reward", 0.0) |
| done = result.get("done", False) |
|
|
| rewards.append(reward) |
| steps_taken = step |
|
|
| action_str = f"{action_type}({payload!r})" if payload else action_type |
| log_step(step=step, action=action_str, reward=reward, done=done, error=None) |
|
|
| if done: |
| score = result.get("info", {}).get("score", 0.0) |
| break |
|
|
| success = score >= 0.10 |
|
|
| except Exception as exc: |
| print(f"[DEBUG] Episode error: {exc}", flush=True) |
|
|
| finally: |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) |
|
|
| return score |
|
|
|
|
| def main() -> None: |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) |
|
|
| all_scores: Dict[str, float] = {} |
|
|
| for task in TASKS: |
| task_name = task["name"] |
| max_steps = task["max_steps"] |
| score = run_task(client, task_name, max_steps, scenario_index=0) |
| all_scores[task_name] = score |
| print(flush=True) |
|
|
| print("=" * 60, flush=True) |
| print("BASELINE RESULTS", flush=True) |
| for name, sc in all_scores.items(): |
| print(f" {name:25s} score={sc:.3f}", flush=True) |
| avg = sum(all_scores.values()) / len(all_scores) if all_scores else 0.0 |
| print(f" {'AVERAGE':25s} score={avg:.3f}", flush=True) |
| print("=" * 60, flush=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|