govon-runtime / src /cli /renderer.py
umyunsang's picture
Upload folder using huggingface_hub
90dad19 verified
"""Result rendering for GovOn CLI.
Uses `rich` when available; falls back to plain print() otherwise.
"""
from __future__ import annotations
from threading import Lock
from typing import Any
from src.cli.terminal import (
get_narrow_terminal_warning,
get_panel_width,
get_terminal_columns,
is_layout_supported,
)
try:
from rich.console import Console, Group
from rich.markdown import Markdown
from rich.panel import Panel
from rich.status import Status
from rich.table import Table
from rich.text import Text
_console = Console()
_RICH_AVAILABLE = True
except ImportError: # pragma: no cover
_console = None # type: ignore[assignment]
_RICH_AVAILABLE = False
_HAS_WARNED_NARROW_TERMINAL = False
_NARROW_WARNING_LOCK = Lock()
# ---------------------------------------------------------------------------
# Node status message mapping
# ---------------------------------------------------------------------------
NODE_STATUS_MESSAGES: dict[str, str] = {
"session_load": "์„ธ์…˜ ๋กœ๋“œ ์ค‘โ€ฆ",
"agent": "์—์ด์ „ํŠธ ์ถ”๋ก  ์ค‘โ€ฆ",
"approval_wait": "์Šน์ธ ๋Œ€๊ธฐ ์ค‘โ€ฆ",
"tools": "๋„๊ตฌ ์‹คํ–‰ ์ค‘โ€ฆ",
"persist": "์ €์žฅ ์ค‘โ€ฆ",
}
MARKDOWN_CODE_THEME = "monokai"
STRUCTURED_TOOL_ORDER = ("stats_lookup", "keyword_analyzer", "demographics_lookup")
STRUCTURED_TOOL_TITLES = {
"stats_lookup": "๋ฏผ์› ํ†ต๊ณ„",
"keyword_analyzer": "ํ‚ค์›Œ๋“œ ๋ถ„์„",
"demographics_lookup": "์ธ๊ตฌํ†ต๊ณ„",
}
STRUCTURED_API_TITLES = {
"doc_count": "์ฑ„๋„๋ณ„ ์ ‘์ˆ˜ ๊ฑด์ˆ˜",
"trend": "์ถ”์ด",
"statistics": "๊ธฐ๊ฐ„๋ณ„ ํ†ต๊ณ„",
"org_ranking": "๊ธฐ๊ด€ ์ˆœ์œ„",
"region_ranking": "์ง€์—ญ ์ˆœ์œ„",
"core_keyword": "ํ•ต์‹ฌ ํ‚ค์›Œ๋“œ",
"related_word": "์—ฐ๊ด€์–ด",
"gender": "์„ฑ๋ณ„ ๋ถ„ํฌ",
"age": "์—ฐ๋ น ๋ถ„ํฌ",
"population": "์ธ๊ตฌ ๋Œ€๋น„ ๋น„์œจ",
}
TABLE_COLUMN_PRIORITY = (
"keyword",
"topic",
"label",
"term",
"hits",
"value",
"ratio",
"prebRatio",
"prevRatio",
"population",
"pttn",
"dfpt",
"saeol",
)
TABLE_COLUMN_LABELS = {
"keyword": "ํ‚ค์›Œ๋“œ",
"topic": "ํ•ญ๋ชฉ",
"label": "ํ•ญ๋ชฉ",
"term": "ํ•ญ๋ชฉ",
"hits": "๊ฑด์ˆ˜",
"value": "๊ฐ’",
"ratio": "๋น„์œจ",
"prebRatio": "์ „์ผ ๋Œ€๋น„",
"prevRatio": "์ „๊ธฐ ๋Œ€๋น„",
"population": "์ธ๊ตฌ",
"pttn": "๊ตญ๋ฏผ์‹ ๋ฌธ๊ณ ",
"dfpt": "๋ฏผ์›24",
"saeol": "์ƒˆ์˜ฌ",
"source_type": "์ถœ์ฒ˜",
"title": "์ œ๋ชฉ",
"page": "ํŽ˜์ด์ง€",
"score": "์ ์ˆ˜",
"link_or_path": "๊ฒฝ๋กœ/๋งํฌ",
}
TABLE_HIDDEN_KEYS = {"_source_api"}
EVIDENCE_SOURCE_LABELS = {
"rag": "๋กœ์ปฌ ๋ฌธ์„œ",
"api": "์™ธ๋ถ€ API",
"llm_generated": "LLM ์ƒ์„ฑ",
}
def get_node_message(node_name: str) -> str:
"""Return a human-readable status message for a given node name."""
return NODE_STATUS_MESSAGES.get(node_name, f"{node_name} ์ฒ˜๋ฆฌ ์ค‘โ€ฆ")
# ---------------------------------------------------------------------------
# Spinner context manager
# ---------------------------------------------------------------------------
class StreamingStatusDisplay:
"""Context manager that shows a spinner and updates the message per node.
Wraps rich.status.Status when rich is available; falls back to plain print().
"""
def __init__(self, initial_message: str = "์ฒ˜๋ฆฌ ์ค‘โ€ฆ") -> None:
self._initial_message = initial_message
self._status: Status | None = None # type: ignore[name-defined]
self._use_rich = False
def __enter__(self) -> "StreamingStatusDisplay":
self._use_rich, _ = _resolve_render_mode()
if self._use_rich:
self._status = _console.status(self._initial_message, spinner="dots")
self._status.__enter__()
else:
print(f"โ†’ {self._initial_message}", flush=True)
return self
def update(self, message: str) -> None:
"""Update the displayed status message."""
if self._use_rich and self._status is not None:
self._status.update(message)
else:
print(f"โ†’ {message}", flush=True)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._use_rich and self._status is not None:
self._status.__exit__(exc_type, exc_val, exc_tb)
self._status = None
def _warn_narrow_terminal_once(columns: int) -> None:
"""Emit the narrow-terminal fallback warning once per narrow-state entry."""
global _HAS_WARNED_NARROW_TERMINAL
with _NARROW_WARNING_LOCK:
if _HAS_WARNED_NARROW_TERMINAL:
return
_HAS_WARNED_NARROW_TERMINAL = True
print(get_narrow_terminal_warning(columns), flush=True)
def _reset_narrow_warning() -> None:
"""Reset narrow-terminal warning state for tests and wide-terminal recovery."""
global _HAS_WARNED_NARROW_TERMINAL
with _NARROW_WARNING_LOCK:
_HAS_WARNED_NARROW_TERMINAL = False
def _resolve_render_mode() -> tuple[bool, int]:
"""Return (use_rich, terminal_columns) for the current render call."""
columns = get_terminal_columns()
if not is_layout_supported(columns):
_warn_narrow_terminal_once(columns)
return False, columns
_reset_narrow_warning()
return _RICH_AVAILABLE, columns
def _plain_rule(columns: int) -> str:
"""Return a separator that fits within the current terminal."""
return "โ”€" * max(columns - 2, 12)
def _format_table_value(key: str, value: Any) -> str:
"""Format a structured value for rich/plain table rendering."""
if value in ("", None):
return "-"
if key == "source_type":
return EVIDENCE_SOURCE_LABELS.get(str(value), str(value))
if key == "page":
return f"p.{value}"
if key == "score":
try:
return f"{float(value):.2f}"
except (TypeError, ValueError):
return str(value)
if key in {"hits", "population", "pttn", "dfpt", "saeol"}:
try:
return f"{int(float(value)):,}"
except (TypeError, ValueError):
return str(value)
if key == "value":
try:
value_f = float(value)
return f"{value_f:,.1f}" if value_f % 1 else f"{value_f:,.0f}"
except (TypeError, ValueError):
return str(value)
if key in {"ratio", "prebRatio", "prevRatio"}:
text = str(value)
return text if text.endswith("%") else f"{text}%"
return str(value)
def _select_table_columns(rows: list[dict], columns: int) -> list[str]:
"""Select visible table columns based on row shape and terminal width."""
visible_keys: list[str] = []
seen: set[str] = set()
for key in TABLE_COLUMN_PRIORITY:
if any(row.get(key) not in ("", None) for row in rows):
visible_keys.append(key)
seen.add(key)
for row in rows:
for key in row:
if key in TABLE_HIDDEN_KEYS or key in seen:
continue
if row.get(key) not in ("", None):
visible_keys.append(key)
seen.add(key)
max_columns = 5 if columns >= 120 else 4 if columns >= 80 else 2
return visible_keys[:max_columns]
def _build_rich_table(rows: list[dict], columns: int, *, column_keys: list[str] | None = None):
"""Build a Rich table from structured rows."""
selected_keys = column_keys or _select_table_columns(rows, columns)
if not selected_keys:
return None
table = Table(expand=True)
for key in selected_keys:
table.add_column(
TABLE_COLUMN_LABELS.get(key, key),
overflow="fold",
no_wrap=key in {"source_type", "page", "score"},
)
for row in rows:
table.add_row(*(_format_table_value(key, row.get(key)) for key in selected_keys))
return table
def _render_plain_table(
title: str,
rows: list[dict],
columns: int,
*,
column_keys: list[str] | None = None,
) -> str:
"""Render structured rows as a tab-delimited plain-text table."""
selected_keys = column_keys or _select_table_columns(rows, columns)
if not selected_keys:
return ""
lines = [title, "\t".join(TABLE_COLUMN_LABELS.get(key, key) for key in selected_keys)]
for row in rows:
lines.append("\t".join(_format_table_value(key, row.get(key)) for key in selected_keys))
return "\n".join(lines)
def _iter_structured_result_sections(tool_results: dict[str, Any]) -> list[tuple[str, list[dict]]]:
"""Extract table-ready structured result sections from tool results."""
sections: list[tuple[str, list[dict]]] = []
for tool_name in STRUCTURED_TOOL_ORDER:
payload = tool_results.get(tool_name)
if not isinstance(payload, dict):
continue
results = payload.get("results")
if not isinstance(results, list) or not results:
continue
grouped_rows: dict[str, list[dict]] = {}
for row in results:
if not isinstance(row, dict):
continue
grouped_rows.setdefault(str(row.get("_source_api") or "results"), []).append(row)
for source_api, rows in grouped_rows.items():
source_title = STRUCTURED_API_TITLES.get(source_api)
tool_title = STRUCTURED_TOOL_TITLES.get(tool_name, tool_name)
title = f"{tool_title} ยท {source_title}" if source_title else tool_title
sections.append((title, rows))
return sections
def _build_evidence_table_rows(evidence_items: list[dict]) -> list[dict]:
"""Normalize evidence items into a table-oriented row schema."""
rows: list[dict] = []
for item in evidence_items:
rows.append(
{
"source_type": item.get("source_type"),
"title": item.get("title") or item.get("excerpt", ""),
"page": item.get("page"),
"score": item.get("score"),
"link_or_path": item.get("link_or_path"),
}
)
return rows
def _select_evidence_columns(columns: int) -> list[str]:
"""Return evidence table columns based on terminal width."""
if columns >= 120:
return ["source_type", "title", "page", "score", "link_or_path"]
if columns >= 80:
return ["source_type", "title", "score"]
return ["source_type", "title"]
def render_evidence_section(evidence_items: list) -> str:
"""EvidenceItem dict ๋ฆฌ์ŠคํŠธ๋ฅผ ์ถœ์ฒ˜ ์„น์…˜ ํ…์ŠคํŠธ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
source_type๋ณ„๋กœ ๊ทธ๋ฃนํ™”ํ•˜์—ฌ ํ‘œ์‹œํ•œ๋‹ค:
[๋กœ์ปฌ ๋ฌธ์„œ] โ€” rag ์ถœ์ฒ˜ (file_path, page, score ํฌํ•จ)
[์™ธ๋ถ€ API] โ€” api ์ถœ์ฒ˜ (URL ํฌํ•จ)
[LLM ์ƒ์„ฑ] โ€” llm_generated ์ถœ์ฒ˜
Parameters
----------
evidence_items : list
EvidenceItem.to_dict() ํ˜•ํƒœ์˜ dict ๋ฆฌ์ŠคํŠธ.
Returns
-------
str
์ถœ์ฒ˜ ์„น์…˜ ํ…์ŠคํŠธ. items๊ฐ€ ์—†์œผ๋ฉด ๋นˆ ๋ฌธ์ž์—ด.
"""
if not evidence_items:
return ""
# source_type๋ณ„ ๊ทธ๋ฃนํ™”
rag_items = [i for i in evidence_items if i.get("source_type") == "rag"]
api_items = [i for i in evidence_items if i.get("source_type") == "api"]
llm_items = [i for i in evidence_items if i.get("source_type") == "llm_generated"]
lines: list[str] = ["โ”€โ”€ ์ฐธ์กฐ ๊ทผ๊ฑฐ โ”€โ”€"]
idx = 1
if rag_items:
lines.append("[๋กœ์ปฌ ๋ฌธ์„œ]")
for item in rag_items:
title = item.get("title") or item.get("link_or_path", "")
page = item.get("page")
score = item.get("score", 0.0)
page_str = f" (p.{page})" if page is not None else ""
score_str = f" [{score:.2f}]" if score else ""
lines.append(f" {idx}. {title}{page_str}{score_str}")
idx += 1
if api_items:
lines.append("[์™ธ๋ถ€ API]")
for item in api_items:
title = item.get("title", "")
link = item.get("link_or_path", "")
link_str = f" โ€” {link}" if link else ""
lines.append(f" {idx}. {title}{link_str}")
idx += 1
if llm_items:
lines.append("[LLM ์ƒ์„ฑ]")
for item in llm_items:
title = item.get("title", "")
excerpt = item.get("excerpt", "")[:80]
lines.append(f" {idx}. {title}: {excerpt}" if title else f" {idx}. {excerpt}")
idx += 1
return "\n".join(lines) if len(lines) > 1 else ""
def _build_citations_text(citations: list[str]) -> Text:
"""Return a styled fallback citations block for rich rendering."""
content = Text("\n์ถœ์ฒ˜\n", style="bold")
for idx, src in enumerate(citations, 1):
content.append(f" {idx}. {src}\n", style="dim")
return content
def _build_rich_result_content(
text_body: str,
evidence_items: list,
citations: list,
tool_results: dict[str, Any],
columns: int,
) -> Text | Markdown | Group:
"""Build the rich renderable used inside the result panel."""
renderables = []
if text_body:
renderables.append(Markdown(text_body, code_theme=MARKDOWN_CODE_THEME))
for title, rows in _iter_structured_result_sections(tool_results):
table = _build_rich_table(rows, columns)
if table is None:
continue
renderables.append(Text(""))
renderables.append(Text(title, style="bold cyan"))
renderables.append(table)
if evidence_items:
evidence_rows = _build_evidence_table_rows(evidence_items)
evidence_table = _build_rich_table(
evidence_rows,
columns,
column_keys=_select_evidence_columns(columns),
)
if evidence_table is not None:
renderables.append(Text(""))
renderables.append(Text("์ฐธ์กฐ ๊ทผ๊ฑฐ", style="bold"))
renderables.append(evidence_table)
elif citations:
renderables.append(_build_citations_text(citations))
if not renderables:
return Text("")
if len(renderables) == 1:
return renderables[0]
return Group(*renderables)
def render_result(result: dict) -> None:
"""Render the final agent response to the terminal.
Expected keys (at least one required):
- result["text"] or result["response"]: main answer text
- result["evidence_items"]: EvidenceItem dict ๋ฆฌ์ŠคํŠธ (structured, ์šฐ์„ )
- result["citations"] or result["sources"]: list of source strings (fallback)
- result["tool_results"]: stats/keyword/demographics structured result dict
"""
text_body: str = result.get("text") or result.get("response") or ""
evidence_items: list = result.get("evidence_items") or []
citations: list = result.get("citations") or result.get("sources") or []
tool_results: dict[str, Any] = result.get("tool_results") or {}
use_rich, columns = _resolve_render_mode()
if use_rich:
content = _build_rich_result_content(
text_body,
evidence_items,
citations,
tool_results,
columns,
)
_console.print(
Panel(
content,
title="[bold green]GovOn[/bold green]",
border_style="green",
width=get_panel_width(columns),
)
)
else:
rule = _plain_rule(columns)
print(f"\n{rule}")
print("GovOn")
print(text_body)
for title, rows in _iter_structured_result_sections(tool_results):
table_text = _render_plain_table(title, rows, columns)
if table_text:
print(f"\n{table_text}")
if evidence_items:
evidence_table = _render_plain_table(
"์ฐธ์กฐ ๊ทผ๊ฑฐ",
_build_evidence_table_rows(evidence_items),
columns,
column_keys=_select_evidence_columns(columns),
)
if evidence_table:
print(f"\n{evidence_table}")
elif citations:
print("\n์ถœ์ฒ˜")
for idx, src in enumerate(citations, 1):
print(f" {idx}. {src}")
print(f"{rule}\n")
def render_status(message: str) -> None:
"""Render a transient status / progress message."""
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]โ†’ {message}[/dim]")
else:
print(f"โ†’ {message}")
def render_error(message: str) -> None:
"""Render an error message in red."""
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[bold red]์˜ค๋ฅ˜:[/bold red] {message}")
else:
print(f"์˜ค๋ฅ˜: {message}")
def render_thinking(content: str) -> None:
"""LLM thinking ๊ณผ์ •์„ dim ์Šคํƒ€์ผ๋กœ ํ‘œ์‹œ."""
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]{content}[/dim]", end="")
else:
print(content, end="", flush=True)
def render_tool_progress(tool_name: str, status: str, latency_ms: float = 0) -> None:
"""๋„๊ตฌ ์‹คํ–‰ ์ง„ํ–‰ ํ‘œ์‹œ."""
use_rich, _ = _resolve_render_mode()
if status == "start":
msg = f"๋„๊ตฌ ์‹คํ–‰: {tool_name}โ€ฆ"
else:
msg = (
f"๋„๊ตฌ ์™„๋ฃŒ: {tool_name} ({latency_ms:.0f}ms)"
if latency_ms
else f"๋„๊ตฌ ์™„๋ฃŒ: {tool_name}"
)
if use_rich:
style = "yellow" if status == "start" else "green"
_console.print(f"[{style}] โ†’ {msg}[/{style}]")
else:
print(f" โ†’ {msg}", flush=True)
def render_metadata(metadata: dict) -> None:
"""์‹คํ–‰ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ (iterations, tool calls, latency) ํ‘œ์‹œ."""
iterations = metadata.get("total_iterations", 0)
tool_calls = metadata.get("total_tool_calls", 0)
latency = metadata.get("total_latency_ms", 0)
summary = f"iterations={iterations} tools={tool_calls} latency={latency:.0f}ms"
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]โŽฏ {summary}[/dim]")
else:
print(f"โŽฏ {summary}")
def render_session_info(session_id: str) -> None:
"""Render session resume hint at shell exit."""
hint = f"[session: {session_id}] govon --session {session_id} ๋กœ ์žฌ๊ฐœ ๊ฐ€๋Šฅ"
use_rich, _ = _resolve_render_mode()
if use_rich:
_console.print(f"[dim]{hint}[/dim]")
else:
print(hint)