nsys-llm-explainer / space_utils.py
KokosDev's picture
Fix sample path resolution for Space root layout
65a157d verified
from __future__ import annotations
import json
import sys
import tempfile
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
def _bootstrap_src_path() -> None:
here = Path(__file__).resolve()
candidates = [parent / "src" for parent in (here.parent, *tuple(here.parents))]
for candidate in candidates:
if candidate.exists() and str(candidate) not in sys.path:
sys.path.insert(0, str(candidate))
return
_bootstrap_src_path()
from nsys_llm_explainer.queries import TraceDB # type: ignore
from nsys_llm_explainer.report import AnalysisOutputs, analyze, render_markdown, write_artifacts # type: ignore
@dataclass(frozen=True)
class SpaceBundle:
source_path: Path
source_kind: str
report: Dict[str, Any]
markdown: str
artifacts_dir: Path
artifact_paths: List[Path]
summary_rows: List[Dict[str, str]]
manifest_rows: List[Dict[str, str]]
findings_markdown: str
status_markdown: str
def _coerce_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return float(default)
def _safe_text(value: Any, default: str = "-") -> str:
text = str(value).strip() if value is not None else ""
return text if text else default
def _safe_trace_name(report: Mapping[str, Any]) -> str:
trace_path = ((report.get("trace") or {}).get("path") or report.get("_source_name") or "")
return Path(str(trace_path)).name if trace_path else "unknown"
def _top_kernel_row(report: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
rows = ((report.get("metrics") or {}).get("top_kernels") or {}).get("kernels") or []
return rows[0] if rows else None
def _top_nccl_row(report: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
rows = ((report.get("metrics") or {}).get("nccl") or {}).get("ops") or []
return rows[0] if rows else None
def _format_ms(value: Any) -> str:
return "{:.3f} ms".format(_coerce_float(value))
def _format_us(value: Any) -> str:
return "{:.2f} us".format(_coerce_float(value))
def _format_pct(value: Any) -> str:
return "{:.1f}%".format(_coerce_float(value))
def _bottleneck_sentence(report: Mapping[str, Any]) -> str:
metrics = report.get("metrics") or {}
total_gpu_ms = _coerce_float((metrics.get("top_kernels") or {}).get("total_kernel_time_ns")) / 1_000_000.0
top_kernel = _top_kernel_row(report)
top_nccl = _top_nccl_row(report)
if total_gpu_ms > 0.0 and top_nccl:
nccl_pct = (_coerce_float(top_nccl.get("total_time_ms")) / total_gpu_ms) * 100.0
kernel_pct = _coerce_float(top_kernel.get("pct_total_kernel_time") if top_kernel else 0.0)
if nccl_pct >= kernel_pct:
return "{} dominates {:.1f}% of GPU time".format(str(top_nccl.get("op_name") or "NCCL"), nccl_pct)
if top_kernel:
return "{} dominates {:.1f}% of GPU time".format(
str(top_kernel.get("kernel_name") or "Top kernel"),
_coerce_float(top_kernel.get("pct_total_kernel_time")),
)
return "No dominant GPU bottleneck detected from available metrics"
def _summary_rows(report: Mapping[str, Any]) -> List[Dict[str, str]]:
metrics = report.get("metrics") or {}
timeline = metrics.get("timeline") or {}
gpu_total_ms = _coerce_float(timeline.get("total_gpu_time_ms"))
if gpu_total_ms <= 0:
gpu_total_ms = _coerce_float((metrics.get("top_kernels") or {}).get("total_kernel_time_ns")) / 1_000_000.0
cpu_total_ms = _coerce_float(timeline.get("total_cpu_time_ms"))
if cpu_total_ms <= 0:
sync_rows = (metrics.get("sync") or {}).get("sync_calls") or []
cpu_total_ms = sum(_coerce_float(row.get("total_time_ms")) for row in sync_rows)
warnings = report.get("warnings") or []
report_version = _safe_text((report.get("tool") or {}).get("version"), default="unknown")
top_kernel = _top_kernel_row(report)
top_nccl = _top_nccl_row(report)
nvlink = (metrics.get("nvlink_during_nccl") or {}).get("rows") or []
nvlink_row = nvlink[0] if nvlink else None
capability_checks = {
"Kernel table": bool((metrics.get("top_kernels") or {}).get("present")),
"Runtime table": bool((metrics.get("sync") or {}).get("present")),
"NVTX ranges": bool((metrics.get("nvtx") or {}).get("present")),
"GPU metrics": bool((metrics.get("nvlink_during_nccl") or {}).get("present")),
"Per-process breakdown": bool((metrics.get("per_pid") or {}).get("present")),
}
rows: List[Dict[str, str]] = [
{"section": "Overview", "metric": "Trace", "value": _safe_trace_name(report)},
{"section": "Overview", "metric": "Tool version", "value": report_version},
{"section": "Overview", "metric": "Generated at (UTC)", "value": _safe_text(report.get("generated_at"))},
{"section": "Overview", "metric": "Total GPU time", "value": _format_ms(gpu_total_ms)},
{"section": "Overview", "metric": "Total CPU time", "value": _format_ms(cpu_total_ms)},
{"section": "Overview", "metric": "Top bottleneck", "value": _bottleneck_sentence(report)},
{"section": "Overview", "metric": "Warnings", "value": str(len(warnings))},
]
if top_kernel:
rows.extend(
[
{"section": "Evidence", "metric": "Top kernel", "value": _safe_text(top_kernel.get("kernel_name"))},
{"section": "Evidence", "metric": "Top kernel time", "value": _format_ms(top_kernel.get("total_time_ms"))},
{"section": "Evidence", "metric": "Top kernel share", "value": _format_pct(top_kernel.get("pct_total_kernel_time"))},
]
)
if top_nccl:
rows.extend(
[
{"section": "Evidence", "metric": "Top NCCL op", "value": _safe_text(top_nccl.get("op_name"))},
{"section": "Evidence", "metric": "Top NCCL time", "value": _format_ms(top_nccl.get("total_time_ms"))},
{"section": "Evidence", "metric": "Top NCCL overlap", "value": _format_pct(top_nccl.get("compute_overlap_pct"))},
]
)
if nvlink_row:
rows.extend(
[
{"section": "Evidence", "metric": "NVLink metric(s)", "value": _safe_text(nvlink_row.get("metric_names"))},
{
"section": "Evidence",
"metric": "NVLink during NCCL",
"value": "{:.2f} export units".format(_coerce_float(nvlink_row.get("avg_metric_during_nccl"), 0.0)),
},
{
"section": "Evidence",
"metric": "NVLink outside NCCL",
"value": "{:.2f} export units".format(_coerce_float(nvlink_row.get("avg_metric_outside_nccl"), 0.0)),
},
{
"section": "Evidence",
"metric": "NVLink correlation",
"value": "{:.3f}".format(_coerce_float(nvlink_row.get("nccl_activity_correlation"), 0.0)),
},
]
)
for label, present in capability_checks.items():
rows.append({"section": "Capabilities", "metric": label, "value": "present" if present else "missing"})
return rows
def _findings_markdown(report: Mapping[str, Any]) -> str:
findings = report.get("findings") or []
warnings = report.get("warnings") or []
lines: List[str] = ["## What to do next", ""]
if not findings:
lines.append("No findings were generated for this trace.")
else:
for finding in findings:
severity = _safe_text(finding.get("severity"), default="unknown").upper()
title = _safe_text(finding.get("title"), default="Untitled finding")
lines.append("### [{}] {}".format(severity, title))
evidence = finding.get("evidence") or []
recommendations = finding.get("recommendation") or finding.get("recommendations") or []
if evidence:
lines.append("Evidence:")
for item in evidence:
lines.append("- {}".format(item))
if recommendations:
lines.append("Recommendation:")
if isinstance(recommendations, (list, tuple)):
for item in recommendations:
lines.append("- {}".format(item))
else:
lines.append("- {}".format(recommendations))
lines.append("")
if warnings:
lines.append("## Warnings")
lines.append("")
for warning in warnings:
lines.append("- {}".format(warning))
return "\n".join(lines).strip()
def _artifact_manifest(out_dir: Path) -> List[Dict[str, str]]:
purpose_map = {
"report.md": "Human-readable report",
"report.json": "Machine-readable report",
"kernels.csv": "Top kernels",
"barriers.csv": "CPU/GPU barriers",
"nccl_ops.csv": "Top NCCL ops",
"nccl_rank_skew.csv": "Per-rank NCCL skew",
"nccl_by_pid.csv": "NCCL per PID",
"nvlink_during_nccl.csv": "NVLink correlation rows",
"nvlink_timeseries.csv": "NVLink correlation timeseries",
"timeline_events.csv": "Timeline events",
"copy_engine_events.csv": "Copy engine events",
"launch_latency_rows.csv": "Launch latency rows",
"launch_latency_histogram.csv": "Launch latency histogram",
"stream_overlap.csv": "Stream overlap summary",
"phase_split.csv": "Phase split",
"roofline.csv": "Roofline rows",
"gpu_idle_gaps.csv": "GPU idle gaps",
"kernels_by_pid.csv": "Per-PID kernels",
"sync_by_pid.csv": "Per-PID sync calls",
"nvtx_by_pid.csv": "Per-PID NVTX ranges",
"nvtx_ranges.csv": "NVTX ranges",
"bundle.zip": "Download all artifacts as a zip",
}
rows: List[Dict[str, str]] = []
for name, purpose in purpose_map.items():
path = out_dir / name
if not path.exists():
path = out_dir / "tables" / name
if path.exists():
rows.append({"artifact": name, "purpose": purpose, "path": str(path)})
return rows
def _zip_artifacts(out_dir: Path) -> Path:
zip_path = out_dir / "bundle.zip"
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path in sorted(out_dir.rglob("*")):
if path.is_file() and path != zip_path:
zf.write(path, arcname=path.relative_to(out_dir).as_posix())
return zip_path
def _normalize_report_for_artifacts(report: Mapping[str, Any]) -> Dict[str, Any]:
normalized: Dict[str, Any] = dict(report)
metrics: Dict[str, Any] = dict(normalized.get("metrics") or {})
metrics.setdefault("top_kernels", {"present": False, "kernels": []})
metrics.setdefault("barriers", {"present": False, "barriers": []})
metrics.setdefault("nccl", {"present": False, "ops": [], "rank_rows": [], "pids": []})
metrics.setdefault("nvlink_during_nccl", {"present": False, "rows": [], "timeseries": []})
metrics.setdefault("timeline", {"present": False, "events": []})
metrics.setdefault("copy_engine", {"present": False, "events": []})
metrics.setdefault("launch_latency", {"present": False, "rows": [], "histogram": []})
metrics.setdefault("stream_overlap", {"present": False, "summary": []})
metrics.setdefault("phase_split", {"present": False, "rows": []})
metrics.setdefault("roofline", {"present": False, "rows": []})
metrics.setdefault("gpu_idle", {"present": False, "gaps": []})
metrics.setdefault("nvtx", {"present": False, "ranges": []})
by_pid = dict(metrics.get("by_pid") or {})
by_pid.setdefault("kernels", {"kernels": []})
by_pid.setdefault("sync", {"sync_calls": []})
by_pid.setdefault("nvtx", {"present": False, "ranges": []})
metrics["by_pid"] = by_pid
normalized["metrics"] = metrics
return normalized
def _load_report(path: Path) -> Tuple[str, Dict[str, Any], str]:
lower = path.suffix.lower()
if lower in (".sqlite", ".db"):
db = TraceDB.open(path)
try:
outputs = analyze(
db,
phase_map_path=None,
kernel_limit=50,
compute_kernel_percentiles=True,
compute_nvtx_kernel_map=True,
)
return "sqlite", dict(outputs.report), str(outputs.markdown)
finally:
db.close()
if lower == ".json":
report = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(report, dict):
raise ValueError("Input JSON root must be an object.")
try:
markdown = render_markdown(report)
except Exception:
markdown = "# Nsight Systems LLM Hotspot Report\n\nJSON loaded, but markdown rendering failed for this input."
return "json", report, markdown
header = path.read_bytes()[:32]
if header.startswith(b"SQLite format 3"):
db = TraceDB.open(path)
try:
outputs = analyze(
db,
phase_map_path=None,
kernel_limit=50,
compute_kernel_percentiles=True,
compute_nvtx_kernel_map=True,
)
return "sqlite", dict(outputs.report), str(outputs.markdown)
finally:
db.close()
report = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(report, dict):
raise ValueError("Input JSON root must be an object.")
try:
markdown = render_markdown(report)
except Exception:
markdown = "# Nsight Systems LLM Hotspot Report\n\nJSON loaded, but markdown rendering failed for this input."
return "json", report, markdown
def analyze_path(path: Path) -> SpaceBundle:
source_kind, report, markdown = _load_report(path)
report = _normalize_report_for_artifacts(report)
outputs = AnalysisOutputs(report=report, markdown=markdown)
artifacts_dir = Path(tempfile.mkdtemp(prefix="nsys-llm-explainer-space-")) / path.stem
write_artifacts(outputs, artifacts_dir)
_zip_artifacts(artifacts_dir)
artifact_paths = sorted(
[p for p in artifacts_dir.rglob("*") if p.is_file()],
key=lambda item: item.relative_to(artifacts_dir).as_posix(),
)
return SpaceBundle(
source_path=path,
source_kind=source_kind,
report=report,
markdown=markdown,
artifacts_dir=artifacts_dir,
artifact_paths=artifact_paths,
summary_rows=_summary_rows(report),
manifest_rows=_artifact_manifest(artifacts_dir),
findings_markdown=_findings_markdown(report),
status_markdown="Loaded `{}` as `{}` and wrote artifacts to `{}`.".format(path.name, source_kind, artifacts_dir),
)
def find_local_sample() -> Optional[Path]:
here = Path(__file__).resolve()
candidates = [here.parent / "sample_report.json"]
for parent in (here.parent, *tuple(here.parents)):
candidates.append(parent / "examples" / "synthetic" / "report.json")
candidates.append(parent / "examples" / "a100_vllm" / "report.json")
for candidate in candidates:
if candidate.exists():
return candidate
return None
def coerce_upload_path(uploaded: Any) -> Optional[Path]:
if uploaded is None:
return None
if isinstance(uploaded, (str, Path)):
path = Path(uploaded)
return path if path.exists() else None
if isinstance(uploaded, Sequence) and uploaded:
first = uploaded[0]
if isinstance(first, (str, Path)):
path = Path(first)
return path if path.exists() else None
return None