Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import sys | |
| import tempfile | |
| import zipfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple | |
| def _bootstrap_src_path() -> None: | |
| here = Path(__file__).resolve() | |
| candidates = [parent / "src" for parent in (here.parent, *tuple(here.parents))] | |
| for candidate in candidates: | |
| if candidate.exists() and str(candidate) not in sys.path: | |
| sys.path.insert(0, str(candidate)) | |
| return | |
| _bootstrap_src_path() | |
| from nsys_llm_explainer.queries import TraceDB # type: ignore | |
| from nsys_llm_explainer.report import AnalysisOutputs, analyze, render_markdown, write_artifacts # type: ignore | |
| class SpaceBundle: | |
| source_path: Path | |
| source_kind: str | |
| report: Dict[str, Any] | |
| markdown: str | |
| artifacts_dir: Path | |
| artifact_paths: List[Path] | |
| summary_rows: List[Dict[str, str]] | |
| manifest_rows: List[Dict[str, str]] | |
| findings_markdown: str | |
| status_markdown: str | |
| def _coerce_float(value: Any, default: float = 0.0) -> float: | |
| try: | |
| return float(value) | |
| except Exception: | |
| return float(default) | |
| def _safe_text(value: Any, default: str = "-") -> str: | |
| text = str(value).strip() if value is not None else "" | |
| return text if text else default | |
| def _safe_trace_name(report: Mapping[str, Any]) -> str: | |
| trace_path = ((report.get("trace") or {}).get("path") or report.get("_source_name") or "") | |
| return Path(str(trace_path)).name if trace_path else "unknown" | |
| def _top_kernel_row(report: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: | |
| rows = ((report.get("metrics") or {}).get("top_kernels") or {}).get("kernels") or [] | |
| return rows[0] if rows else None | |
| def _top_nccl_row(report: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: | |
| rows = ((report.get("metrics") or {}).get("nccl") or {}).get("ops") or [] | |
| return rows[0] if rows else None | |
| def _format_ms(value: Any) -> str: | |
| return "{:.3f} ms".format(_coerce_float(value)) | |
| def _format_us(value: Any) -> str: | |
| return "{:.2f} us".format(_coerce_float(value)) | |
| def _format_pct(value: Any) -> str: | |
| return "{:.1f}%".format(_coerce_float(value)) | |
| def _bottleneck_sentence(report: Mapping[str, Any]) -> str: | |
| metrics = report.get("metrics") or {} | |
| total_gpu_ms = _coerce_float((metrics.get("top_kernels") or {}).get("total_kernel_time_ns")) / 1_000_000.0 | |
| top_kernel = _top_kernel_row(report) | |
| top_nccl = _top_nccl_row(report) | |
| if total_gpu_ms > 0.0 and top_nccl: | |
| nccl_pct = (_coerce_float(top_nccl.get("total_time_ms")) / total_gpu_ms) * 100.0 | |
| kernel_pct = _coerce_float(top_kernel.get("pct_total_kernel_time") if top_kernel else 0.0) | |
| if nccl_pct >= kernel_pct: | |
| return "{} dominates {:.1f}% of GPU time".format(str(top_nccl.get("op_name") or "NCCL"), nccl_pct) | |
| if top_kernel: | |
| return "{} dominates {:.1f}% of GPU time".format( | |
| str(top_kernel.get("kernel_name") or "Top kernel"), | |
| _coerce_float(top_kernel.get("pct_total_kernel_time")), | |
| ) | |
| return "No dominant GPU bottleneck detected from available metrics" | |
| def _summary_rows(report: Mapping[str, Any]) -> List[Dict[str, str]]: | |
| metrics = report.get("metrics") or {} | |
| timeline = metrics.get("timeline") or {} | |
| gpu_total_ms = _coerce_float(timeline.get("total_gpu_time_ms")) | |
| if gpu_total_ms <= 0: | |
| gpu_total_ms = _coerce_float((metrics.get("top_kernels") or {}).get("total_kernel_time_ns")) / 1_000_000.0 | |
| cpu_total_ms = _coerce_float(timeline.get("total_cpu_time_ms")) | |
| if cpu_total_ms <= 0: | |
| sync_rows = (metrics.get("sync") or {}).get("sync_calls") or [] | |
| cpu_total_ms = sum(_coerce_float(row.get("total_time_ms")) for row in sync_rows) | |
| warnings = report.get("warnings") or [] | |
| report_version = _safe_text((report.get("tool") or {}).get("version"), default="unknown") | |
| top_kernel = _top_kernel_row(report) | |
| top_nccl = _top_nccl_row(report) | |
| nvlink = (metrics.get("nvlink_during_nccl") or {}).get("rows") or [] | |
| nvlink_row = nvlink[0] if nvlink else None | |
| capability_checks = { | |
| "Kernel table": bool((metrics.get("top_kernels") or {}).get("present")), | |
| "Runtime table": bool((metrics.get("sync") or {}).get("present")), | |
| "NVTX ranges": bool((metrics.get("nvtx") or {}).get("present")), | |
| "GPU metrics": bool((metrics.get("nvlink_during_nccl") or {}).get("present")), | |
| "Per-process breakdown": bool((metrics.get("per_pid") or {}).get("present")), | |
| } | |
| rows: List[Dict[str, str]] = [ | |
| {"section": "Overview", "metric": "Trace", "value": _safe_trace_name(report)}, | |
| {"section": "Overview", "metric": "Tool version", "value": report_version}, | |
| {"section": "Overview", "metric": "Generated at (UTC)", "value": _safe_text(report.get("generated_at"))}, | |
| {"section": "Overview", "metric": "Total GPU time", "value": _format_ms(gpu_total_ms)}, | |
| {"section": "Overview", "metric": "Total CPU time", "value": _format_ms(cpu_total_ms)}, | |
| {"section": "Overview", "metric": "Top bottleneck", "value": _bottleneck_sentence(report)}, | |
| {"section": "Overview", "metric": "Warnings", "value": str(len(warnings))}, | |
| ] | |
| if top_kernel: | |
| rows.extend( | |
| [ | |
| {"section": "Evidence", "metric": "Top kernel", "value": _safe_text(top_kernel.get("kernel_name"))}, | |
| {"section": "Evidence", "metric": "Top kernel time", "value": _format_ms(top_kernel.get("total_time_ms"))}, | |
| {"section": "Evidence", "metric": "Top kernel share", "value": _format_pct(top_kernel.get("pct_total_kernel_time"))}, | |
| ] | |
| ) | |
| if top_nccl: | |
| rows.extend( | |
| [ | |
| {"section": "Evidence", "metric": "Top NCCL op", "value": _safe_text(top_nccl.get("op_name"))}, | |
| {"section": "Evidence", "metric": "Top NCCL time", "value": _format_ms(top_nccl.get("total_time_ms"))}, | |
| {"section": "Evidence", "metric": "Top NCCL overlap", "value": _format_pct(top_nccl.get("compute_overlap_pct"))}, | |
| ] | |
| ) | |
| if nvlink_row: | |
| rows.extend( | |
| [ | |
| {"section": "Evidence", "metric": "NVLink metric(s)", "value": _safe_text(nvlink_row.get("metric_names"))}, | |
| { | |
| "section": "Evidence", | |
| "metric": "NVLink during NCCL", | |
| "value": "{:.2f} export units".format(_coerce_float(nvlink_row.get("avg_metric_during_nccl"), 0.0)), | |
| }, | |
| { | |
| "section": "Evidence", | |
| "metric": "NVLink outside NCCL", | |
| "value": "{:.2f} export units".format(_coerce_float(nvlink_row.get("avg_metric_outside_nccl"), 0.0)), | |
| }, | |
| { | |
| "section": "Evidence", | |
| "metric": "NVLink correlation", | |
| "value": "{:.3f}".format(_coerce_float(nvlink_row.get("nccl_activity_correlation"), 0.0)), | |
| }, | |
| ] | |
| ) | |
| for label, present in capability_checks.items(): | |
| rows.append({"section": "Capabilities", "metric": label, "value": "present" if present else "missing"}) | |
| return rows | |
| def _findings_markdown(report: Mapping[str, Any]) -> str: | |
| findings = report.get("findings") or [] | |
| warnings = report.get("warnings") or [] | |
| lines: List[str] = ["## What to do next", ""] | |
| if not findings: | |
| lines.append("No findings were generated for this trace.") | |
| else: | |
| for finding in findings: | |
| severity = _safe_text(finding.get("severity"), default="unknown").upper() | |
| title = _safe_text(finding.get("title"), default="Untitled finding") | |
| lines.append("### [{}] {}".format(severity, title)) | |
| evidence = finding.get("evidence") or [] | |
| recommendations = finding.get("recommendation") or finding.get("recommendations") or [] | |
| if evidence: | |
| lines.append("Evidence:") | |
| for item in evidence: | |
| lines.append("- {}".format(item)) | |
| if recommendations: | |
| lines.append("Recommendation:") | |
| if isinstance(recommendations, (list, tuple)): | |
| for item in recommendations: | |
| lines.append("- {}".format(item)) | |
| else: | |
| lines.append("- {}".format(recommendations)) | |
| lines.append("") | |
| if warnings: | |
| lines.append("## Warnings") | |
| lines.append("") | |
| for warning in warnings: | |
| lines.append("- {}".format(warning)) | |
| return "\n".join(lines).strip() | |
| def _artifact_manifest(out_dir: Path) -> List[Dict[str, str]]: | |
| purpose_map = { | |
| "report.md": "Human-readable report", | |
| "report.json": "Machine-readable report", | |
| "kernels.csv": "Top kernels", | |
| "barriers.csv": "CPU/GPU barriers", | |
| "nccl_ops.csv": "Top NCCL ops", | |
| "nccl_rank_skew.csv": "Per-rank NCCL skew", | |
| "nccl_by_pid.csv": "NCCL per PID", | |
| "nvlink_during_nccl.csv": "NVLink correlation rows", | |
| "nvlink_timeseries.csv": "NVLink correlation timeseries", | |
| "timeline_events.csv": "Timeline events", | |
| "copy_engine_events.csv": "Copy engine events", | |
| "launch_latency_rows.csv": "Launch latency rows", | |
| "launch_latency_histogram.csv": "Launch latency histogram", | |
| "stream_overlap.csv": "Stream overlap summary", | |
| "phase_split.csv": "Phase split", | |
| "roofline.csv": "Roofline rows", | |
| "gpu_idle_gaps.csv": "GPU idle gaps", | |
| "kernels_by_pid.csv": "Per-PID kernels", | |
| "sync_by_pid.csv": "Per-PID sync calls", | |
| "nvtx_by_pid.csv": "Per-PID NVTX ranges", | |
| "nvtx_ranges.csv": "NVTX ranges", | |
| "bundle.zip": "Download all artifacts as a zip", | |
| } | |
| rows: List[Dict[str, str]] = [] | |
| for name, purpose in purpose_map.items(): | |
| path = out_dir / name | |
| if not path.exists(): | |
| path = out_dir / "tables" / name | |
| if path.exists(): | |
| rows.append({"artifact": name, "purpose": purpose, "path": str(path)}) | |
| return rows | |
| def _zip_artifacts(out_dir: Path) -> Path: | |
| zip_path = out_dir / "bundle.zip" | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| for path in sorted(out_dir.rglob("*")): | |
| if path.is_file() and path != zip_path: | |
| zf.write(path, arcname=path.relative_to(out_dir).as_posix()) | |
| return zip_path | |
| def _normalize_report_for_artifacts(report: Mapping[str, Any]) -> Dict[str, Any]: | |
| normalized: Dict[str, Any] = dict(report) | |
| metrics: Dict[str, Any] = dict(normalized.get("metrics") or {}) | |
| metrics.setdefault("top_kernels", {"present": False, "kernels": []}) | |
| metrics.setdefault("barriers", {"present": False, "barriers": []}) | |
| metrics.setdefault("nccl", {"present": False, "ops": [], "rank_rows": [], "pids": []}) | |
| metrics.setdefault("nvlink_during_nccl", {"present": False, "rows": [], "timeseries": []}) | |
| metrics.setdefault("timeline", {"present": False, "events": []}) | |
| metrics.setdefault("copy_engine", {"present": False, "events": []}) | |
| metrics.setdefault("launch_latency", {"present": False, "rows": [], "histogram": []}) | |
| metrics.setdefault("stream_overlap", {"present": False, "summary": []}) | |
| metrics.setdefault("phase_split", {"present": False, "rows": []}) | |
| metrics.setdefault("roofline", {"present": False, "rows": []}) | |
| metrics.setdefault("gpu_idle", {"present": False, "gaps": []}) | |
| metrics.setdefault("nvtx", {"present": False, "ranges": []}) | |
| by_pid = dict(metrics.get("by_pid") or {}) | |
| by_pid.setdefault("kernels", {"kernels": []}) | |
| by_pid.setdefault("sync", {"sync_calls": []}) | |
| by_pid.setdefault("nvtx", {"present": False, "ranges": []}) | |
| metrics["by_pid"] = by_pid | |
| normalized["metrics"] = metrics | |
| return normalized | |
| def _load_report(path: Path) -> Tuple[str, Dict[str, Any], str]: | |
| lower = path.suffix.lower() | |
| if lower in (".sqlite", ".db"): | |
| db = TraceDB.open(path) | |
| try: | |
| outputs = analyze( | |
| db, | |
| phase_map_path=None, | |
| kernel_limit=50, | |
| compute_kernel_percentiles=True, | |
| compute_nvtx_kernel_map=True, | |
| ) | |
| return "sqlite", dict(outputs.report), str(outputs.markdown) | |
| finally: | |
| db.close() | |
| if lower == ".json": | |
| report = json.loads(path.read_text(encoding="utf-8")) | |
| if not isinstance(report, dict): | |
| raise ValueError("Input JSON root must be an object.") | |
| try: | |
| markdown = render_markdown(report) | |
| except Exception: | |
| markdown = "# Nsight Systems LLM Hotspot Report\n\nJSON loaded, but markdown rendering failed for this input." | |
| return "json", report, markdown | |
| header = path.read_bytes()[:32] | |
| if header.startswith(b"SQLite format 3"): | |
| db = TraceDB.open(path) | |
| try: | |
| outputs = analyze( | |
| db, | |
| phase_map_path=None, | |
| kernel_limit=50, | |
| compute_kernel_percentiles=True, | |
| compute_nvtx_kernel_map=True, | |
| ) | |
| return "sqlite", dict(outputs.report), str(outputs.markdown) | |
| finally: | |
| db.close() | |
| report = json.loads(path.read_text(encoding="utf-8")) | |
| if not isinstance(report, dict): | |
| raise ValueError("Input JSON root must be an object.") | |
| try: | |
| markdown = render_markdown(report) | |
| except Exception: | |
| markdown = "# Nsight Systems LLM Hotspot Report\n\nJSON loaded, but markdown rendering failed for this input." | |
| return "json", report, markdown | |
| def analyze_path(path: Path) -> SpaceBundle: | |
| source_kind, report, markdown = _load_report(path) | |
| report = _normalize_report_for_artifacts(report) | |
| outputs = AnalysisOutputs(report=report, markdown=markdown) | |
| artifacts_dir = Path(tempfile.mkdtemp(prefix="nsys-llm-explainer-space-")) / path.stem | |
| write_artifacts(outputs, artifacts_dir) | |
| _zip_artifacts(artifacts_dir) | |
| artifact_paths = sorted( | |
| [p for p in artifacts_dir.rglob("*") if p.is_file()], | |
| key=lambda item: item.relative_to(artifacts_dir).as_posix(), | |
| ) | |
| return SpaceBundle( | |
| source_path=path, | |
| source_kind=source_kind, | |
| report=report, | |
| markdown=markdown, | |
| artifacts_dir=artifacts_dir, | |
| artifact_paths=artifact_paths, | |
| summary_rows=_summary_rows(report), | |
| manifest_rows=_artifact_manifest(artifacts_dir), | |
| findings_markdown=_findings_markdown(report), | |
| status_markdown="Loaded `{}` as `{}` and wrote artifacts to `{}`.".format(path.name, source_kind, artifacts_dir), | |
| ) | |
| def find_local_sample() -> Optional[Path]: | |
| here = Path(__file__).resolve() | |
| candidates = [here.parent / "sample_report.json"] | |
| for parent in (here.parent, *tuple(here.parents)): | |
| candidates.append(parent / "examples" / "synthetic" / "report.json") | |
| candidates.append(parent / "examples" / "a100_vllm" / "report.json") | |
| for candidate in candidates: | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def coerce_upload_path(uploaded: Any) -> Optional[Path]: | |
| if uploaded is None: | |
| return None | |
| if isinstance(uploaded, (str, Path)): | |
| path = Path(uploaded) | |
| return path if path.exists() else None | |
| if isinstance(uploaded, Sequence) and uploaded: | |
| first = uploaded[0] | |
| if isinstance(first, (str, Path)): | |
| path = Path(first) | |
| return path if path.exists() else None | |
| return None | |