| |
| """ |
| LLM Completion Viewer - Streamlit visualization for LLM completion JSON files. |
| |
| Usage: |
| streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions |
| |
| Default port: 8502 |
| """ |
| import argparse |
| import json |
| import re |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import streamlit as st |
|
|
| ROLE_STYLE = { |
| "system": {"label": "SYSTEM", "color": "#4B5563", "bg": "#F3F4F6"}, |
| "user": {"label": "USER", "color": "#1D4ED8", "bg": "#DBEAFE"}, |
| "assistant": {"label": "ASSISTANT", "color": "#065F46", "bg": "#D1FAE5"}, |
| "tool": {"label": "TOOL", "color": "#7C2D12", "bg": "#FFEDD5"}, |
| } |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Streamlit viewer for LLM completion files.") |
| parser.add_argument("--dir", type=str, default="", help="Directory containing LLM completion JSON files.") |
| return parser.parse_args() |
|
|
|
|
| def extract_timestamp_from_filename(filename: str) -> float: |
| """Extract timestamp from filename like 'vertex_ai__gemini-2.5-flash-1771538250.607-8981.json'""" |
| match = re.search(r'-(\d+\.\d+)-[a-f0-9]+\.json$', filename) |
| if match: |
| return float(match.group(1)) |
| return 0.0 |
|
|
|
|
| def file_sort_key(path: Path) -> Tuple[float, str]: |
| """Sort files by timestamp (descending - latest first), then by name""" |
| timestamp = extract_timestamp_from_filename(path.name) |
| |
| return (-timestamp, path.name) |
|
|
|
|
| def try_load_json(path: Path) -> Optional[Any]: |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return None |
|
|
|
|
| def extract_text_from_message(message: Dict[str, Any]) -> str: |
| """Extract text content from message, handling both string and list[dict] formats""" |
| text_parts: List[str] = [] |
| content = message.get("content") |
| |
| if isinstance(content, str): |
| return content |
| elif isinstance(content, list): |
| for item in content: |
| if isinstance(item, dict) and item.get("type") == "text": |
| text = item.get("text") |
| if isinstance(text, str) and text: |
| text_parts.append(text) |
| |
| return "\n".join(text_parts).strip() |
|
|
|
|
| def extract_thinking_blocks(message: Dict[str, Any]) -> List[str]: |
| """Extract thinking block texts from message's thinking_blocks field""" |
| thinking_parts: List[str] = [] |
| thinking_blocks = message.get("thinking_blocks") |
| if isinstance(thinking_blocks, list): |
| for block in thinking_blocks: |
| if isinstance(block, dict): |
| |
| thinking = block.get("thinking") or block.get("text") or "" |
| if thinking: |
| thinking_parts.append(thinking) |
| return thinking_parts |
|
|
|
|
| def format_timestamp(timestamp: float) -> str: |
| """Format Unix timestamp to human-readable string""" |
| from datetime import datetime |
| dt = datetime.fromtimestamp(timestamp) |
| return dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
| def messages_summary(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Create a summary table of messages""" |
| rows: List[Dict[str, Any]] = [] |
| for idx, msg in enumerate(messages): |
| role = msg.get("role", "unknown") |
| text = extract_text_from_message(msg) |
| preview = text[:120] + ("..." if len(text) > 120 else "") |
| tool_calls = msg.get("tool_calls") |
| tool_call_count = len(tool_calls) if isinstance(tool_calls, list) else 0 |
| thinking_blocks = extract_thinking_blocks(msg) |
| thinking_chars = sum(len(t) for t in thinking_blocks) |
| rows.append( |
| { |
| "idx": idx, |
| "role": role, |
| "thinking_blocks": len(thinking_blocks), |
| "thinking_chars": thinking_chars, |
| "tool_calls": tool_call_count, |
| "chars": len(text), |
| "preview": preview, |
| } |
| ) |
| return rows |
|
|
|
|
| def render_completion_data(data: Dict[str, Any]): |
| """Render the LLM completion data""" |
| |
| |
| st.subheader("Completion Metadata") |
| metadata_cols = st.columns(3) |
| |
| with metadata_cols[0]: |
| context_window = data.get("context_window", "N/A") |
| st.metric("Context Window", f"{context_window:,}" if isinstance(context_window, int) else context_window) |
| |
| with metadata_cols[1]: |
| messages = data.get("messages", []) |
| st.metric("Total Messages", len(messages)) |
| |
| with metadata_cols[2]: |
| total_chars = sum(len(extract_text_from_message(msg)) for msg in messages) |
| st.metric("Total Characters", f"{total_chars:,}") |
| |
| |
| other_metadata = {k: v for k, v in data.items() if k not in ["messages", "context_window"]} |
| if other_metadata: |
| with st.expander("📋 Additional Metadata", expanded=False): |
| st.json(other_metadata) |
| |
| |
| messages = data.get("messages", []) |
| if not messages: |
| st.warning("No messages found in this completion.") |
| return |
| |
| st.subheader("Messages Overview") |
| rows = messages_summary(messages) |
| st.dataframe(rows, use_container_width=True) |
| |
| st.subheader("Full Message Timeline") |
| show_raw = st.checkbox("Show raw dict under each message", value=False) |
| |
| for idx, msg in enumerate(messages): |
| role = str(msg.get("role", "unknown")) |
| style = ROLE_STYLE.get(role, {"label": role.upper(), "color": "#111827", "bg": "#F9FAFB"}) |
| text = extract_text_from_message(msg) |
| tool_calls = msg.get("tool_calls") |
| tool_call_list = tool_calls if isinstance(tool_calls, list) else [] |
| tool_call_count = len(tool_call_list) |
| thinking_blocks = extract_thinking_blocks(msg) |
| |
| title = f"{style['label']} #{idx}" |
| if thinking_blocks: |
| title += f" | 🧠 thinking×{len(thinking_blocks)}" |
| if tool_call_count > 0: |
| title += f" | 🔧 tool_calls×{tool_call_count}" |
| |
| st.markdown( |
| ( |
| f"<div style='margin:8px 0 4px 0;'>" |
| f"<span style='background:{style['bg']}; color:{style['color']};" |
| " padding:4px 10px; border-radius:999px; font-weight:700;'>" |
| f"{title}</span></div>" |
| ), |
| unsafe_allow_html=True, |
| ) |
| |
| show_msg = st.toggle(f"Show message #{idx}", value=(idx < 3), key=f"show_msg_{idx}") |
| if show_msg: |
| |
| if thinking_blocks: |
| for tb_idx, thinking_text in enumerate(thinking_blocks): |
| with st.expander(f"🧠 Thinking block {tb_idx + 1} ({len(thinking_text):,} chars)", expanded=False): |
| st.markdown( |
| ( |
| "<div style='border-left:4px solid #7C3AED; padding:8px 12px;" |
| " background:#EDE9FE; border-radius:6px; white-space:pre-wrap;" |
| " font-family:monospace; font-size:0.85em;'>" |
| f"{thinking_text}</div>" |
| ), |
| unsafe_allow_html=True, |
| ) |
| |
| |
| if text: |
| st.markdown( |
| ( |
| f"<div style='border-left:4px solid {style['color']}; padding:8px 12px;" |
| f" background:{style['bg']}; border-radius:6px; white-space:pre-wrap;'>" |
| f"{text}</div>" |
| ), |
| unsafe_allow_html=True, |
| ) |
| elif not thinking_blocks and not tool_call_list: |
| st.caption("<no text content>") |
| |
| |
| if tool_call_list: |
| for tc_idx, tc in enumerate(tool_call_list): |
| tc_name = tc.get("function", {}).get("name", tc.get("name", f"tool_{tc_idx}")) if isinstance(tc, dict) else str(tc) |
| with st.expander(f"🔧 Tool call {tc_idx + 1}: `{tc_name}`", expanded=False): |
| st.json(tc) |
| |
| if show_raw: |
| st.json(msg) |
|
|
|
|
| def main(): |
| args = parse_args() |
|
|
| st.set_page_config(page_title="LLM Completion Viewer", layout="wide") |
| st.title("🤖 LLM Completion Viewer") |
|
|
| default_dir = args.dir or "" |
| run_dir_input = st.sidebar.text_input("Completions directory", value=default_dir) |
| run_dir = Path(run_dir_input).expanduser() if run_dir_input else None |
|
|
| if not run_dir_input: |
| st.info("Pass `--dir` or set the directory in the sidebar.") |
| st.markdown(""" |
| **Usage:** |
| ```bash |
| streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions |
| ``` |
| """) |
| return |
|
|
| if not run_dir or not run_dir.exists() or not run_dir.is_dir(): |
| st.error(f"Directory not found: {run_dir_input}") |
| return |
|
|
| |
| json_files = [p for p in run_dir.iterdir() if p.is_file() and p.suffix == '.json'] |
| |
| if not json_files: |
| st.warning("No JSON files found in this directory.") |
| return |
| |
| |
| sorted_files = sorted(json_files, key=file_sort_key) |
| |
| st.sidebar.markdown(f"**Found {len(sorted_files)} completion files**") |
| |
| |
| file_options = [] |
| for f in sorted_files: |
| timestamp = extract_timestamp_from_filename(f.name) |
| if timestamp > 0: |
| time_str = format_timestamp(timestamp) |
| file_options.append(f"{f.name} ({time_str})") |
| else: |
| file_options.append(f.name) |
| |
| selected_idx = st.sidebar.selectbox( |
| "Select completion file", |
| options=range(len(file_options)), |
| format_func=lambda i: file_options[i], |
| index=0 |
| ) |
| |
| selected_path = sorted_files[selected_idx] |
| |
| |
| st.caption(f"**Selected:** `{selected_path.name}`") |
| file_size = selected_path.stat().st_size |
| st.caption(f"**Size:** {file_size:,} bytes ({file_size / 1024:.1f} KB)") |
| |
| timestamp = extract_timestamp_from_filename(selected_path.name) |
| if timestamp > 0: |
| st.caption(f"**Timestamp:** {format_timestamp(timestamp)}") |
| |
| |
| data = try_load_json(selected_path) |
| |
| if data is None: |
| st.error("Failed to parse JSON file.") |
| raw = selected_path.read_text(encoding="utf-8", errors="replace") |
| st.code(raw, language="json") |
| return |
| |
| if not isinstance(data, dict): |
| st.error("Expected JSON object (dict) at root level.") |
| st.json(data) |
| return |
| |
| render_completion_data(data) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|