| """ |
| robust_parser.py — Universal LLM output parser. Handles JSON, TOML, or free text. |
| |
| TOML is the preferred output format because: |
| - ~40% fewer tokens than JSON (no quotes on keys, no braces, no commas) |
| - More natural for LLMs to generate (looks like config files) |
| - Python 3.11+ has tomllib in stdlib; we include a minimal fallback parser |
| |
| The parser tries in order: TOML → JSON → field extraction → regex fallback. |
| Always returns something usable. Never crashes. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import re |
| import logging |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _parse_toml_minimal(text: str) -> dict[str, Any] | None: |
| """ |
| Minimal TOML parser for LLM output. Handles the subset LLMs actually produce: |
| key = "value" |
| key = number |
| [section] |
| key = "value" |
| |
| Uses stdlib tomllib (Python 3.11+) with fallback regex parser. |
| """ |
| try: |
| import tomllib |
| return tomllib.loads(text) |
| except ImportError: |
| pass |
| except Exception: |
| pass |
|
|
| |
| result = {} |
| current_section = result |
| section_path = [] |
|
|
| for line in text.split('\n'): |
| line = line.strip() |
| if not line or line.startswith('#'): |
| continue |
|
|
| |
| sec_match = re.match(r'^\[([^\]]+)\]$', line) |
| if sec_match: |
| parts = sec_match.group(1).split('.') |
| current_section = result |
| for part in parts: |
| part = part.strip() |
| if part not in current_section: |
| current_section[part] = {} |
| current_section = current_section[part] |
| section_path = parts |
| continue |
|
|
| |
| kv_match = re.match(r'^(\w+)\s*=\s*(.+)$', line) |
| if kv_match: |
| key = kv_match.group(1).strip() |
| val = kv_match.group(2).strip() |
| |
| if val.startswith('"') and val.endswith('"'): |
| val = val[1:-1].replace('\\n', '\n').replace('\\"', '"') |
| elif val.startswith("'") and val.endswith("'"): |
| val = val[1:-1] |
| elif val.lower() in ('true', 'false'): |
| val = val.lower() == 'true' |
| else: |
| try: |
| val = float(val) if '.' in val else int(val) |
| except ValueError: |
| pass |
| current_section[key] = val |
|
|
| return result if result else None |
|
|
|
|
| def extract_structured(text: str) -> dict[str, Any] | None: |
| """ |
| Try to extract structured data from LLM text. |
| Order: TOML → JSON → None. |
| """ |
| text = text.strip() |
|
|
| |
| |
| if re.search(r'^\w+\s*=\s*', text, re.MULTILINE): |
| |
| toml_match = re.search(r'```(?:toml)?\s*\n(.*?)```', text, re.DOTALL) |
| toml_text = toml_match.group(1) if toml_match else text |
| result = _parse_toml_minimal(toml_text) |
| if result: |
| return result |
|
|
| |
| return extract_json(text) |
|
|
|
|
| def extract_json(text: str) -> dict[str, Any] | None: |
| """ |
| Try to extract a JSON object from arbitrary LLM text. |
| Handles: pure JSON, JSON in code blocks, JSON embedded in prose. |
| Returns None if no valid JSON found. |
| """ |
| text = text.strip() |
|
|
| |
| try: |
| return json.loads(text) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| m = re.search(r'```(?:json)?\s*(\{.*\})\s*```', text, re.DOTALL) |
| if m: |
| try: |
| return json.loads(m.group(1)) |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| start = text.find('{') |
| if start >= 0: |
| depth = 0 |
| for i in range(start, len(text)): |
| if text[i] == '{': |
| depth += 1 |
| elif text[i] == '}': |
| depth -= 1 |
| if depth == 0: |
| try: |
| return json.loads(text[start:i + 1]) |
| except (json.JSONDecodeError, ValueError): |
| break |
|
|
| return None |
|
|
|
|
| def extract_field(text: str, field_name: str, default: str = "") -> str: |
| """ |
| Extract a named field value from LLM text, regardless of format. |
| |
| Handles: |
| - TOML: field = "value" |
| - JSON: {"field": "value"} |
| - Markdown: **field:** value / field: value |
| - Labeled: FIELD: value |
| - Line-based: field\nvalue |
| """ |
| text_lower = text.lower() |
| name_lower = field_name.lower() |
|
|
| |
| obj = extract_structured(text) |
| if obj and field_name in obj: |
| return str(obj[field_name]) |
|
|
| |
| patterns = [ |
| rf'"{field_name}"\s*:\s*"((?:[^"\\]|\\.)*)"', |
| rf'"{field_name}"\s*:\s*(\d+\.?\d*)', |
| rf'\*?\*?{field_name}\*?\*?\s*:\s*(.+?)(?:\n|$)', |
| rf'{field_name}\s*[=:]\s*(.+?)(?:\n|$)', |
| ] |
| for pattern in patterns: |
| m = re.search(pattern, text, re.IGNORECASE) |
| if m: |
| return m.group(1).strip().strip('"').strip("'") |
|
|
| return default |
|
|
|
|
| def extract_number(text: str, field_name: str, default: float = 0.0) -> float: |
| """Extract a numeric field from LLM text.""" |
| val = extract_field(text, field_name) |
| if val: |
| try: |
| return float(val.rstrip('.').rstrip(',')) |
| except (ValueError, TypeError): |
| pass |
|
|
| |
| m = re.search(rf'{field_name}\s*[=:]\s*([\d.]+)', text, re.IGNORECASE) |
| if m: |
| try: |
| return float(m.group(1).rstrip('.')) |
| except ValueError: |
| pass |
|
|
| return default |
|
|
|
|
| def extract_code(text: str) -> str: |
| """ |
| Extract Python code from LLM text. |
| |
| Handles: |
| - Code in ``` blocks |
| - Code in "code" JSON field |
| - Raw code with def/class keywords |
| """ |
| |
| obj = extract_json(text) |
| if obj: |
| |
| action = obj.get("action", {}) |
| if isinstance(action, dict): |
| params = action.get("params", {}) |
| if isinstance(params, dict) and "code" in params: |
| return params["code"] |
| if "code" in obj: |
| return obj["code"] |
|
|
| |
| m = re.search(r'```(?:python)?\s*\n(.*?)```', text, re.DOTALL) |
| if m: |
| return m.group(1).strip() |
|
|
| |
| lines = text.split('\n') |
| code_lines = [] |
| in_code = False |
| for line in lines: |
| if re.match(r'^(def |class |import |from )', line.strip()): |
| in_code = True |
| if in_code: |
| |
| if line.strip() == '' and code_lines and not code_lines[-1].strip().endswith(':'): |
| |
| code_lines.append(line) |
| elif in_code and (line.startswith(' ') or line.startswith('\t') or |
| re.match(r'^(def |class |import |from |#|$)', line.strip())): |
| code_lines.append(line) |
| elif re.match(r'^(def |class )', line.strip()): |
| code_lines.append(line) |
| else: |
| if code_lines: |
| break |
|
|
| if code_lines: |
| return '\n'.join(code_lines).strip() |
|
|
| return "" |
|
|
|
|
| def parse_actor_response(text: str) -> dict[str, Any]: |
| """ |
| Parse an actor's response into thought/action/expected_delta. |
| Works with TOML, JSON, or free text. |
| """ |
| |
| obj = extract_structured(text) |
| if obj and ("action" in obj or "thought" in obj): |
| return obj |
|
|
| |
| thought = extract_field(text, "thought") |
| expected_delta = extract_field(text, "expected_delta") |
|
|
| |
| action_name = extract_field(text, "name", "") |
| if not action_name: |
| action_name = extract_field(text, "action", "") |
| if action_name and action_name.startswith("{"): |
| action_name = "" |
|
|
| |
| code = extract_code(text) |
|
|
| |
| action = {"name": action_name or "UNKNOWN", "params": {}} |
| if code: |
| action["name"] = action.get("name", "submit_code") if action["name"] == "UNKNOWN" else action["name"] |
| action["params"]["code"] = code |
|
|
| if not thought: |
| |
| thought = text.split('\n')[0][:200] if text else "" |
|
|
| return { |
| "thought": thought, |
| "action": action, |
| "expected_delta": expected_delta or "", |
| } |
|
|
|
|
| def parse_critic_response(text: str) -> dict[str, Any]: |
| """ |
| Parse a critic's response into phi_before/phi_after/reasoning/evidence/confidence. |
| Works with TOML, JSON, or free text. |
| """ |
| |
| obj = extract_structured(text) |
| if obj and ("phi_before" in obj or "phi_after" in obj): |
| return { |
| "phi_before": float(obj.get("phi_before", 0)), |
| "phi_after": float(obj.get("phi_after", 0)), |
| "reasoning": str(obj.get("reasoning", "")), |
| "evidence": str(obj.get("evidence", "")), |
| "confidence": float(obj.get("confidence", 0.5)), |
| } |
|
|
| |
| phi_before = extract_number(text, "phi_before", 0.0) |
| if phi_before == 0.0: |
| phi_before = extract_number(text, "Φ(state_before)", 0.0) |
| if phi_before == 0.0: |
| phi_before = extract_number(text, "state_before", 0.0) |
|
|
| phi_after = extract_number(text, "phi_after", 0.0) |
| if phi_after == 0.0: |
| phi_after = extract_number(text, "Φ(state_after)", 0.0) |
| if phi_after == 0.0: |
| phi_after = extract_number(text, "state_after", 0.0) |
|
|
| |
| if phi_before == 0.0 and phi_after == 0.0: |
| scores = re.findall(r'(?:score|SCORE|Score)\s*[=:]\s*([\d.]+)', text) |
| if len(scores) >= 2: |
| phi_before = float(scores[0].rstrip('.')) |
| phi_after = float(scores[1].rstrip('.')) |
| elif len(scores) == 1: |
| phi_after = float(scores[0].rstrip('.')) |
|
|
| reasoning = extract_field(text, "reasoning") |
| evidence = extract_field(text, "evidence") |
| confidence = extract_number(text, "confidence", 0.5) |
|
|
| if not reasoning: |
| reasoning = text[:300] |
| if not evidence: |
| evidence = text[300:500] if len(text) > 300 else "" |
|
|
| return { |
| "phi_before": min(10.0, max(0.0, phi_before)), |
| "phi_after": min(10.0, max(0.0, phi_after)), |
| "reasoning": reasoning, |
| "evidence": evidence, |
| "confidence": min(1.0, max(0.0, confidence)), |
| } |
|
|
|
|
| def parse_optimizer_response(text: str) -> dict[str, Any]: |
| """ |
| Parse optimizer output into heuristics list. |
| """ |
| obj = extract_json(text) |
| if obj and "heuristics" in obj: |
| return obj |
|
|
| |
| m = re.search(r'\[.*\]', text, re.DOTALL) |
| if m: |
| try: |
| arr = json.loads(m.group()) |
| if isinstance(arr, list): |
| return {"heuristics": arr} |
| except (json.JSONDecodeError, ValueError): |
| pass |
|
|
| |
| heuristics = [] |
| patterns = re.findall(r'(?:pattern|when|if)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE) |
| strategies = re.findall(r'(?:strategy|do|then|action)\s*[:\-]\s*(.+?)(?:\n|$)', text, re.IGNORECASE) |
|
|
| for pat, strat in zip(patterns, strategies): |
| heuristics.append({"tier": "strategic", "pattern": pat.strip(), "strategy": strat.strip()}) |
|
|
| |
| if not heuristics: |
| items = re.findall(r'\d+\.\s*(.+?)(?:\n|$)', text) |
| for item in items[:5]: |
| heuristics.append({"tier": "strategic", "pattern": "General", "strategy": item.strip()}) |
|
|
| return {"heuristics": heuristics} |
|
|