Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| import typing | |
| from agent_server.sanitizing_think_tags import scrub_think_tags | |
| def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str: | |
| """ | |
| Lightweight formatter for reasoning stream. Avoid huge code fences; | |
| make it readable and incremental. Also filters out ASCII/box-drawing noise. | |
| """ | |
| text = scrub_think_tags(text).rstrip("\n") | |
| if not text: | |
| return "" | |
| noisy_prefixes = ( | |
| "OpenAIServerModel", | |
| "Output message of the LLM", | |
| "─ Executing parsed code", | |
| "New run", | |
| "╭", | |
| "╰", | |
| "│", | |
| "━", | |
| "─", | |
| ) | |
| stripped = text.strip() | |
| if not stripped: | |
| return "" | |
| # Lines made mostly of box drawing/separators | |
| if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped): | |
| return "" | |
| if any(stripped.startswith(p) for p in noisy_prefixes): | |
| return "" | |
| # Excessively long lines with little signal (no alphanumerics) | |
| if len(stripped) > 240 and not re.search(r"[A-Za-z0-9]{3,}", stripped): | |
| return "" | |
| # No tag/idx prefix; add a trailing blank line for readability in markdown | |
| return f"{stripped}\n\n" | |
| def _extract_final_text(item: typing.Any) -> typing.Optional[str]: | |
| if isinstance(item, dict) and ("__stdout__" in item or "__step__" in item): | |
| return None | |
| if isinstance(item, (bytes, bytearray)): | |
| try: | |
| item = item.decode("utf-8", errors="ignore") | |
| except Exception: | |
| item = str(item) | |
| if isinstance(item, str): | |
| s = scrub_think_tags(item.strip()) | |
| return s or None | |
| # If it's a step-like object with an 'output' attribute, use that | |
| try: | |
| if not isinstance(item, (dict, list, bytes, bytearray)): | |
| out = getattr(item, "output", None) | |
| if out is not None: | |
| s = scrub_think_tags(str(out)).strip() | |
| if s: | |
| return s | |
| except Exception: | |
| pass | |
| if isinstance(item, dict): | |
| for key in ("content", "text", "message", "output", "final", "answer"): | |
| if key in item: | |
| val = item[key] | |
| if isinstance(val, (dict, list)): | |
| try: | |
| return scrub_think_tags(json.dumps(val, ensure_ascii=False)) | |
| except Exception: | |
| return scrub_think_tags(str(val)) | |
| if isinstance(val, (bytes, bytearray)): | |
| try: | |
| val = val.decode("utf-8", errors="ignore") | |
| except Exception: | |
| val = str(val) | |
| s = scrub_think_tags(str(val).strip()) | |
| return s or None | |
| try: | |
| return scrub_think_tags(json.dumps(item, ensure_ascii=False)) | |
| except Exception: | |
| return scrub_think_tags(str(item)) | |
| try: | |
| return scrub_think_tags(str(item)) | |
| except Exception: | |
| return None | |
| _FINAL_RE = re.compile(r"(?:^|\\b)Final\\s+answer:\\s*(.+)$", flags=re.IGNORECASE) | |
| def _maybe_parse_final_from_stdout(line: str) -> typing.Optional[str]: | |
| if not isinstance(line, str): | |
| return None | |
| m = _FINAL_RE.search(line.strip()) | |
| if not m: | |
| return None | |
| return scrub_think_tags(m.group(1)).strip() or None | |