shinka-backup / scripts /llm_completion_viewer.py
JustinTX's picture
Add files using upload-large-folder tool
6f90f5c verified
#!/usr/bin/env python3
"""
LLM Completion Viewer - Streamlit visualization for LLM completion JSON files.
Usage:
streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions
Default port: 8502
"""
import argparse
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import streamlit as st
ROLE_STYLE = {
"system": {"label": "SYSTEM", "color": "#4B5563", "bg": "#F3F4F6"},
"user": {"label": "USER", "color": "#1D4ED8", "bg": "#DBEAFE"},
"assistant": {"label": "ASSISTANT", "color": "#065F46", "bg": "#D1FAE5"},
"tool": {"label": "TOOL", "color": "#7C2D12", "bg": "#FFEDD5"},
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Streamlit viewer for LLM completion files.")
parser.add_argument("--dir", type=str, default="", help="Directory containing LLM completion JSON files.")
return parser.parse_args()
def extract_timestamp_from_filename(filename: str) -> float:
"""Extract timestamp from filename like 'vertex_ai__gemini-2.5-flash-1771538250.607-8981.json'"""
match = re.search(r'-(\d+\.\d+)-[a-f0-9]+\.json$', filename)
if match:
return float(match.group(1))
return 0.0
def file_sort_key(path: Path) -> Tuple[float, str]:
"""Sort files by timestamp (descending - latest first), then by name"""
timestamp = extract_timestamp_from_filename(path.name)
# Negate timestamp for descending order (latest first)
return (-timestamp, path.name)
def try_load_json(path: Path) -> Optional[Any]:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def extract_text_from_message(message: Dict[str, Any]) -> str:
"""Extract text content from message, handling both string and list[dict] formats"""
text_parts: List[str] = []
content = message.get("content")
if isinstance(content, str):
return content
elif isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text")
if isinstance(text, str) and text:
text_parts.append(text)
return "\n".join(text_parts).strip()
def extract_thinking_blocks(message: Dict[str, Any]) -> List[str]:
"""Extract thinking block texts from message's thinking_blocks field"""
thinking_parts: List[str] = []
thinking_blocks = message.get("thinking_blocks")
if isinstance(thinking_blocks, list):
for block in thinking_blocks:
if isinstance(block, dict):
# Gemini/Vertex AI format: {"type": "thinking", "thinking": "...", "signature": "..."}
thinking = block.get("thinking") or block.get("text") or ""
if thinking:
thinking_parts.append(thinking)
return thinking_parts
def format_timestamp(timestamp: float) -> str:
"""Format Unix timestamp to human-readable string"""
from datetime import datetime
dt = datetime.fromtimestamp(timestamp)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def messages_summary(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Create a summary table of messages"""
rows: List[Dict[str, Any]] = []
for idx, msg in enumerate(messages):
role = msg.get("role", "unknown")
text = extract_text_from_message(msg)
preview = text[:120] + ("..." if len(text) > 120 else "")
tool_calls = msg.get("tool_calls")
tool_call_count = len(tool_calls) if isinstance(tool_calls, list) else 0
thinking_blocks = extract_thinking_blocks(msg)
thinking_chars = sum(len(t) for t in thinking_blocks)
rows.append(
{
"idx": idx,
"role": role,
"thinking_blocks": len(thinking_blocks),
"thinking_chars": thinking_chars,
"tool_calls": tool_call_count,
"chars": len(text),
"preview": preview,
}
)
return rows
def render_completion_data(data: Dict[str, Any]):
"""Render the LLM completion data"""
# Display metadata
st.subheader("Completion Metadata")
metadata_cols = st.columns(3)
with metadata_cols[0]:
context_window = data.get("context_window", "N/A")
st.metric("Context Window", f"{context_window:,}" if isinstance(context_window, int) else context_window)
with metadata_cols[1]:
messages = data.get("messages", [])
st.metric("Total Messages", len(messages))
with metadata_cols[2]:
total_chars = sum(len(extract_text_from_message(msg)) for msg in messages)
st.metric("Total Characters", f"{total_chars:,}")
# Display additional metadata fields if present
other_metadata = {k: v for k, v in data.items() if k not in ["messages", "context_window"]}
if other_metadata:
with st.expander("📋 Additional Metadata", expanded=False):
st.json(other_metadata)
# Display messages
messages = data.get("messages", [])
if not messages:
st.warning("No messages found in this completion.")
return
st.subheader("Messages Overview")
rows = messages_summary(messages)
st.dataframe(rows, use_container_width=True)
st.subheader("Full Message Timeline")
show_raw = st.checkbox("Show raw dict under each message", value=False)
for idx, msg in enumerate(messages):
role = str(msg.get("role", "unknown"))
style = ROLE_STYLE.get(role, {"label": role.upper(), "color": "#111827", "bg": "#F9FAFB"})
text = extract_text_from_message(msg)
tool_calls = msg.get("tool_calls")
tool_call_list = tool_calls if isinstance(tool_calls, list) else []
tool_call_count = len(tool_call_list)
thinking_blocks = extract_thinking_blocks(msg)
title = f"{style['label']} #{idx}"
if thinking_blocks:
title += f" | 🧠 thinking×{len(thinking_blocks)}"
if tool_call_count > 0:
title += f" | 🔧 tool_calls×{tool_call_count}"
st.markdown(
(
f"<div style='margin:8px 0 4px 0;'>"
f"<span style='background:{style['bg']}; color:{style['color']};"
" padding:4px 10px; border-radius:999px; font-weight:700;'>"
f"{title}</span></div>"
),
unsafe_allow_html=True,
)
show_msg = st.toggle(f"Show message #{idx}", value=(idx < 3), key=f"show_msg_{idx}")
if show_msg:
# Render thinking blocks
if thinking_blocks:
for tb_idx, thinking_text in enumerate(thinking_blocks):
with st.expander(f"🧠 Thinking block {tb_idx + 1} ({len(thinking_text):,} chars)", expanded=False):
st.markdown(
(
"<div style='border-left:4px solid #7C3AED; padding:8px 12px;"
" background:#EDE9FE; border-radius:6px; white-space:pre-wrap;"
" font-family:monospace; font-size:0.85em;'>"
f"{thinking_text}</div>"
),
unsafe_allow_html=True,
)
# Render text content
if text:
st.markdown(
(
f"<div style='border-left:4px solid {style['color']}; padding:8px 12px;"
f" background:{style['bg']}; border-radius:6px; white-space:pre-wrap;'>"
f"{text}</div>"
),
unsafe_allow_html=True,
)
elif not thinking_blocks and not tool_call_list:
st.caption("<no text content>")
# Render tool calls
if tool_call_list:
for tc_idx, tc in enumerate(tool_call_list):
tc_name = tc.get("function", {}).get("name", tc.get("name", f"tool_{tc_idx}")) if isinstance(tc, dict) else str(tc)
with st.expander(f"🔧 Tool call {tc_idx + 1}: `{tc_name}`", expanded=False):
st.json(tc)
if show_raw:
st.json(msg)
def main():
args = parse_args()
st.set_page_config(page_title="LLM Completion Viewer", layout="wide")
st.title("🤖 LLM Completion Viewer")
default_dir = args.dir or ""
run_dir_input = st.sidebar.text_input("Completions directory", value=default_dir)
run_dir = Path(run_dir_input).expanduser() if run_dir_input else None
if not run_dir_input:
st.info("Pass `--dir` or set the directory in the sidebar.")
st.markdown("""
**Usage:**
```bash
streamlit run llm_completion_viewer.py --server.port 8502 -- --dir /path/to/llm_completions
```
""")
return
if not run_dir or not run_dir.exists() or not run_dir.is_dir():
st.error(f"Directory not found: {run_dir_input}")
return
# Find all JSON files and sort by timestamp (latest first)
json_files = [p for p in run_dir.iterdir() if p.is_file() and p.suffix == '.json']
if not json_files:
st.warning("No JSON files found in this directory.")
return
# Sort files by timestamp (latest first)
sorted_files = sorted(json_files, key=file_sort_key)
st.sidebar.markdown(f"**Found {len(sorted_files)} completion files**")
# Create file selection with timestamp info
file_options = []
for f in sorted_files:
timestamp = extract_timestamp_from_filename(f.name)
if timestamp > 0:
time_str = format_timestamp(timestamp)
file_options.append(f"{f.name} ({time_str})")
else:
file_options.append(f.name)
selected_idx = st.sidebar.selectbox(
"Select completion file",
options=range(len(file_options)),
format_func=lambda i: file_options[i],
index=0
)
selected_path = sorted_files[selected_idx]
# Display file info
st.caption(f"**Selected:** `{selected_path.name}`")
file_size = selected_path.stat().st_size
st.caption(f"**Size:** {file_size:,} bytes ({file_size / 1024:.1f} KB)")
timestamp = extract_timestamp_from_filename(selected_path.name)
if timestamp > 0:
st.caption(f"**Timestamp:** {format_timestamp(timestamp)}")
# Load and display the completion data
data = try_load_json(selected_path)
if data is None:
st.error("Failed to parse JSON file.")
raw = selected_path.read_text(encoding="utf-8", errors="replace")
st.code(raw, language="json")
return
if not isinstance(data, dict):
st.error("Expected JSON object (dict) at root level.")
st.json(data)
return
render_completion_data(data)
if __name__ == "__main__":
main()