| import json |
| import re |
| from typing import Any |
|
|
|
|
| def _strip_code_fences(text: str) -> str: |
| """ |
| Removes markdown code fences like ```json ... ``` or ``` ... ```. |
| """ |
| text = text.strip() |
| if text.startswith("```"): |
| first_newline = text.find("\n") |
| if first_newline != -1: |
| text = text[first_newline + 1 :] |
| if text.rstrip().endswith("```"): |
| text = text.rstrip()[:-3] |
| return text.strip() |
|
|
|
|
| def _extract_first_json_object(text: str) -> str | None: |
| """ |
| Extracts the first valid JSON object substring using brace counting. |
| Works even if additional text exists after JSON. |
| """ |
| start = text.find("{") |
| if start == -1: |
| return None |
|
|
| depth = 0 |
| in_str = False |
| escape = False |
|
|
| for idx in range(start, len(text)): |
| ch = text[idx] |
|
|
| if in_str: |
| if escape: |
| escape = False |
| elif ch == "\\": |
| escape = True |
| elif ch == '"': |
| in_str = False |
| continue |
|
|
| if ch == '"': |
| in_str = True |
| continue |
|
|
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| if depth == 0: |
| return text[start : idx + 1] |
|
|
| return text[start:] |
|
|
|
|
| def _close_open_braces(text: str) -> str: |
| """ |
| If JSON is truncated, add missing closing braces. |
| """ |
| open_braces = text.count("{") |
| close_braces = text.count("}") |
| if close_braces < open_braces: |
| text = text + ("}" * (open_braces - close_braces)) |
| return text |
|
|
|
|
| def _remove_trailing_commas(text: str) -> str: |
| """ |
| Removes trailing commas before closing ] or } |
| """ |
| return re.sub(r",\s*([}\]])", r"\1", text) |
|
|
|
|
| def _truncate_to_last_safe_boundary(text: str) -> str | None: |
| """ |
| Truncates to the last comma outside of strings to drop incomplete tail data. |
| Also handles cases where we're in the middle of a field value. |
| """ |
| depth = 0 |
| in_str = False |
| escape = False |
| last_cut = None |
| last_colon = None |
|
|
| for idx, ch in enumerate(text): |
| if in_str: |
| if escape: |
| escape = False |
| elif ch == "\\": |
| escape = True |
| elif ch == '"': |
| in_str = False |
| continue |
|
|
| if ch == '"': |
| in_str = True |
| continue |
|
|
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| elif ch == ":" and depth >= 1: |
| last_colon = idx |
| elif ch == "," and depth >= 1: |
| last_cut = idx |
|
|
| |
| if last_cut is not None: |
| return text[:last_cut] |
| |
| |
| |
| if last_colon is not None: |
| |
| rest = text[last_colon:] |
| |
| for i, c in enumerate(rest[1:], 1): |
| if c in ['\n', ',', '}']: |
| return text[:last_colon + i] |
| |
| return None |
|
|
|
|
| def try_repair_json(text: str) -> dict[str, Any] | None: |
| """ |
| Attempts to recover JSON from LLM output: |
| - Strips code fences |
| - Extracts first JSON object using brace counting |
| - Repairs missing closing braces |
| - Tries json.loads() |
| """ |
| if not text: |
| return None |
|
|
| text = _strip_code_fences(text) |
|
|
| candidate = _extract_first_json_object(text) |
| if candidate is None: |
| return None |
|
|
| candidate = _close_open_braces(candidate) |
| candidate = _remove_trailing_commas(candidate) |
|
|
| try: |
| return json.loads(candidate) |
| except Exception: |
| pass |
|
|
| truncated = _truncate_to_last_safe_boundary(candidate) |
| if truncated: |
| truncated = _close_open_braces(truncated) |
| truncated = _remove_trailing_commas(truncated) |
| try: |
| return json.loads(truncated) |
| except Exception: |
| return None |
|
|
| return None |
|
|