trakshan-mishra
Deploy FastAPI & MCP server over SSE
036a2db
Raw
History Blame Contribute Delete
15.1 kB
"""
compiler.py — Compile selected symbols into an LLM-ready context package.
Key additions over the naive "dump code blocks" approach:
- META header: LLM knows repo size, what was dropped, graph confidence,
broken files, and scoring basis BEFORE reading any code.
- Per-symbol relationship annotations: callers / callees, and explicit
"NOT IN CONTEXT" tags so the LLM doesn't hallucinate missing deps.
- Dropped-symbol manifest: explicit list of symbols cut by token budget.
- Graph confidence score: fraction of edges that resolved to known symbols.
- Auto-generated suggestions: rule-based hints derived from graph data.
"""
import ast
import re
from typing import Dict, List, Optional, Set, Tuple
from ..models import Symbol, ContextPackage
def _get_module_docstring(abs_file_path: str) -> str:
"""
Read the first line of the module-level docstring from an ABSOLUTE path.
Returns "" if file is unreadable or has no docstring.
Must receive sym.file (absolute), NOT the relative path from sym_id.
"""
try:
with open(abs_file_path, "r", encoding="utf-8") as f:
source = f.read()
tree = ast.parse(source)
doc = ast.get_docstring(tree)
if doc:
first_line = doc.strip().split("\n")[0]
first_line = re.sub(r'^[\w/\-\.]+\.py\s*[—\-]+\s*', '', first_line).strip()
if len(first_line) > 60:
return first_line[:57] + "..."
return first_line
except Exception:
pass
return ""
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def compile_context(
symbols: Dict[str, Symbol],
selected_ids: List[str],
changed_ids: List[str],
scores: Dict[str, float],
graph: Optional[Dict[str, List[str]]] = None,
dropped_ids: Optional[List[str]] = None,
skipped_files: Optional[List[str]] = None,
notes: Optional[str] = None,
) -> ContextPackage:
"""
Build the final context text from selected symbols.
Args:
symbols: Full symbol table for the repo.
selected_ids: Symbols chosen for this context (respects token budget).
changed_ids: The symbols that actually changed (always selected).
scores: Impact scores for all scored symbols.
graph: Full call graph (id -> [dep ids]). Enables relationship
annotations and confidence calculation.
dropped_ids: Symbols that were scored but cut by token budget.
skipped_files: Files that raised SyntaxError (graph has holes here).
notes: Optional user notes injected into meta header.
"""
dropped_ids = dropped_ids or []
skipped_files = skipped_files or []
changed_set = set(changed_ids)
selected_set = set(selected_ids)
# Build reverse graph for caller annotation
reverse: Dict[str, Set[str]] = {}
if graph:
for caller, callees in graph.items():
for callee in callees:
reverse.setdefault(callee, set()).add(caller)
# Graph confidence: fraction of edges that point to a known symbol
graph_confidence = _compute_confidence(graph, symbols)
# Token bookkeeping
total_repo_code = "\n\n".join(s.code for s in symbols.values())
total_repo_tokens = max(1, len(total_repo_code) // 4)
# --- Bucket symbols into sections ---
sections: Dict[str, List[str]] = {"CHANGED": [], "IMPACTED": [], "DEPENDENCIES": []}
for sym_id in selected_ids:
if sym_id not in symbols:
continue
sym = symbols[sym_id]
score = scores.get(sym_id, 0)
file_name, func_name = sym_id.split(":", 1)
# Relationship annotation block
rel_block = _build_relationship_block(
sym_id, graph, reverse, selected_set, symbols
)
entry = (
f"FILE: {file_name}\n"
f"FUNCTION: {func_name} (score: {score:.0f})\n"
+ rel_block +
f"\n{sym.code}"
)
if sym_id in changed_set:
sections["CHANGED"].append(entry)
elif score >= 70:
sections["IMPACTED"].append(entry)
else:
sections["DEPENDENCIES"].append(entry)
# --- Assemble code sections ---
parts = []
for label, entries in sections.items():
if entries:
parts.append(f"=== {label} SYMBOLS ===\n")
parts.append("\n\n---\n\n".join(entries))
code_text = "\n\n".join(parts)
context_tokens = max(1, len(code_text) // 4)
# --- Build meta-header (prepended so LLM reads it first) ---
meta = _build_meta_header(
symbols = symbols,
selected_ids = selected_ids,
dropped_ids = dropped_ids,
skipped_files = skipped_files,
changed_ids = changed_ids,
graph = graph,
reverse = reverse,
graph_confidence = graph_confidence,
token_budget = total_repo_tokens, # not the budget cap; just total repo
context_tokens = context_tokens,
scores = scores,
notes = notes,
)
# --- Build suggestions block (appended at bottom) ---
suggestions = _build_suggestions(
changed_ids = changed_ids,
dropped_ids = dropped_ids,
skipped_files = skipped_files,
graph = graph,
reverse = reverse,
graph_confidence = graph_confidence,
scores = scores,
)
full_text = meta + "\n\n" + code_text
if suggestions:
full_text += "\n\n" + suggestions
return ContextPackage(
text = full_text,
symbol_count = len(selected_ids),
token_estimate = context_tokens,
total_repo_tokens = total_repo_tokens,
dropped_symbols = dropped_ids,
skipped_files = skipped_files,
graph_confidence = graph_confidence,
)
# ---------------------------------------------------------------------------
# Meta-header
# ---------------------------------------------------------------------------
def _build_meta_header(
symbols: Dict[str, Symbol],
selected_ids: List[str],
dropped_ids: List[str],
skipped_files: List[str],
changed_ids: List[str],
graph: Optional[Dict[str, List[str]]],
reverse: Dict[str, Set[str]],
graph_confidence: float,
token_budget: int,
context_tokens: int,
scores: Dict[str, float],
notes: Optional[str] = None,
) -> str:
total_syms = len(symbols)
selected_cnt = len(selected_ids)
dropped_cnt = len(dropped_ids)
scored_cnt = len(scores)
total_edges = sum(len(v) for v in graph.values()) if graph else 0
direct_callers = sum(
1 for s in changed_ids
for c in reverse.get(s, set())
)
direct_callees = sum(
len(graph.get(s, []))
for s in changed_ids
) if graph else 0
lines = [
"=== DIFFCONTEXT META ===",
f"Repo symbols total : {total_syms}",
f"Symbols scored : {scored_cnt}",
f"Symbols IN context : {selected_cnt}",
f"Symbols DROPPED : {dropped_cnt} ← you cannot see these",
f"Graph edges total : {total_edges}",
f"Graph confidence : {graph_confidence * 100:.0f}%"
+ (" ✓" if graph_confidence >= 0.9 else " ⚠ incomplete"),
f"Changed symbols : {len(changed_ids)}",
f"Direct callers found : {direct_callers}",
f"Direct callees found : {direct_callees}",
f"Context tokens (est.) : {context_tokens:,}",
f"Scoring basis : changed=100 | direct_callee=90 | direct_caller=80 "
f"| 2hop_callee=60 | 2hop_caller=50 | indegree*2+outdegree bonus",
]
# --- Repository Architecture Snapshot ---
# Build rel_file -> absolute_path mapping from symbol table.
# sym_id gives us relative path; sym.file gives us the absolute path we
# need to actually open the file for its docstring.
rel_to_abs: Dict[str, str] = {}
for sym_id, sym in symbols.items():
rel_file = sym_id.split(":", 1)[0]
if rel_file not in rel_to_abs:
rel_to_abs[rel_file] = sym.file # sym.file is always absolute
modules_total = {}
modules_selected = {}
for sym_id in symbols:
file_name = sym_id.split(":", 1)[0]
modules_total[file_name] = modules_total.get(file_name, 0) + 1
for sym_id in selected_ids:
if sym_id in symbols:
file_name = sym_id.split(":", 1)[0]
modules_selected[file_name] = modules_selected.get(file_name, 0) + 1
lines.append("")
lines.append("=== REPOSITORY ARCHITECTURE SNAPSHOT ===")
loaded_files = []
blind_files = []
for file_name, total in sorted(modules_total.items()):
selected = modules_selected.get(file_name, 0)
doc_snippet = ""
if file_name.endswith(".py"):
# FIX: use absolute path, not the relative file_name
abs_path = rel_to_abs.get(file_name, "")
doc_str = _get_module_docstring(abs_path) if abs_path else ""
if doc_str:
doc_snippet = f" — {doc_str}"
if selected > 0:
loaded_files.append(f" - {file_name} ({selected}/{total} symbols loaded){doc_snippet}")
else:
blind_files.append(f" - {file_name} ({total} symbols){doc_snippet}")
lines.append("MODULES IN CONTEXT:")
if loaded_files:
lines.extend(loaded_files)
else:
lines.append(" (none)")
lines.append("")
lines.append("KNOWN MODULES (NOT IN CONTEXT - BLIND SPOTS):")
if blind_files:
lines.extend(blind_files)
else:
lines.append(" (none)")
if skipped_files:
lines.append("")
lines.append(f"FILES WITH SYNTAXERROR ({len(skipped_files)}) — graph has holes here:")
for f in skipped_files:
lines.append(f" ✗ {f}")
if dropped_cnt > 0:
lines.append("")
lines.append(f"DROPPED SYMBOLS ({dropped_cnt}) — scored but cut by token budget:")
for d in dropped_ids[:15]:
lines.append(f" - {d} (score: {scores.get(d, 0):.0f})")
if dropped_cnt > 15:
lines.append(f" ... and {dropped_cnt - 15} more")
lines.append(" → If any of these are critical, re-run with a higher --max-tokens.")
warnings = []
if skipped_files:
warnings.append(
f"⚠ {len(skipped_files)} file(s) had SyntaxErrors. "
"Call graph may be incomplete for those files."
)
if dropped_cnt > 0:
warnings.append(
f"⚠ {dropped_cnt} symbol(s) were dropped. "
"References to them in the code below are NOT backed by visible implementations."
)
if graph_confidence < 0.8:
warnings.append(
f"⚠ Graph confidence is {graph_confidence * 100:.0f}%. "
"Many calls could not be resolved — likely external/stdlib deps or dynamic dispatch."
)
if warnings:
lines.append("")
for w in warnings:
lines.append(w)
if notes:
lines.append(f"\n=== DEVELOPER NOTES ===\n{notes}")
lines.append("=== END META ===")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Per-symbol relationship block
# ---------------------------------------------------------------------------
def _build_relationship_block(
sym_id: str,
graph: Optional[Dict[str, List[str]]],
reverse: Dict[str, Set[str]],
selected_set: Set[str],
symbols: Dict[str, Symbol],
) -> str:
if not graph:
return ""
lines = []
callers = sorted(reverse.get(sym_id, set()))
callees = graph.get(sym_id, [])
if callers:
caller_parts = []
for c in callers[:6]:
tag = "" if c in selected_set else " [NOT IN CONTEXT]"
caller_parts.append(c + tag)
if len(callers) > 6:
caller_parts.append(f"... +{len(callers) - 6} more")
lines.append(f"CALLERS: {', '.join(caller_parts)}")
if callees:
callee_parts = []
for c in callees[:6]:
tag = "" if c in selected_set else " [NOT IN CONTEXT]"
callee_parts.append(c + tag)
if len(callees) > 6:
callee_parts.append(f"... +{len(callees) - 6} more")
lines.append(f"CALLEES: {', '.join(callee_parts)}")
if not callers and not callees:
lines.append("CALLERS: (none found in repo)")
lines.append("CALLEES: (none found in repo)")
return "\n".join(lines) + "\n" if lines else ""
# ---------------------------------------------------------------------------
# Suggestions block
# ---------------------------------------------------------------------------
def _build_suggestions(
changed_ids, dropped_ids, skipped_files,
graph, reverse, graph_confidence, scores,
) -> str:
tips = []
if skipped_files:
tips.append(
f"Fix SyntaxErrors in {len(skipped_files)} file(s) to improve graph accuracy."
)
if graph_confidence < 0.7:
tips.append(
f"Graph confidence is {graph_confidence * 100:.0f}%. "
"Consider running diffcontext on installed site-packages too, "
"or add the dependency source to your repo path."
)
for sym_id in changed_ids:
callers = list(reverse.get(sym_id, set()))
if len(callers) > 20:
tips.append(
f"'{sym_id.split(':')[-1]}' has {len(callers)} callers — "
"unusually high blast radius. Review changes carefully."
)
if dropped_ids:
tips.append(
f"{len(dropped_ids)} symbol(s) were dropped by token budget. "
"Run with --max-tokens=0 (unlimited) to see full context."
)
if not tips:
return ""
lines = ["=== DIFFCONTEXT SUGGESTIONS ==="]
for i, tip in enumerate(tips, 1):
lines.append(f" {i}. {tip}")
lines.append("=== END SUGGESTIONS ===")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Graph confidence
# ---------------------------------------------------------------------------
def _compute_confidence(
graph: Optional[Dict[str, List[str]]],
symbols: Dict[str, Symbol],
) -> float:
"""
Fraction of graph edges that resolve to a known symbol.
Edges to external/stdlib deps count as unresolved.
Returns 1.0 if graph is empty or None (no data = no known holes).
"""
if not graph:
return 1.0
total = 0
resolved = 0
for deps in graph.values():
for d in deps:
total += 1
if d in symbols:
resolved += 1
return resolved / total if total > 0 else 1.0