""" CommitLens — Information Extraction Pipeline ============================================= Fetches the two latest commits from a GitHub repository, diffs them, and returns a list of per-file structured prompts ready for Melum 2. Usage: python commitlens.py [--token ] Example: python commitlens.py https://github.com/psf/requests --token ghp_xxx """ from __future__ import annotations import argparse import sys from dataclasses import dataclass, field from typing import Optional from urllib.parse import urlparse import requests # pip install requests # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- GITHUB_API = "https://api.github.com" # Source-code extensions to keep (lower-cased, with leading dot) KEEP_EXTENSIONS: set[str] = { ".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".cpp", ".c", ".h", ".hpp", ".go", ".rs", ".php", ".rb", ".cs", ".swift", ".kt", ".scala", ".r", ".m", ".sh", ".bash", ".zsh", } # Config / infra filenames to keep regardless of extension KEEP_FILENAMES: set[str] = { "dockerfile", "docker-compose.yml", "docker-compose.yaml", "requirements.txt", "package.json", "pyproject.toml", "setup.py", "setup.cfg", "cargo.toml", "go.mod", ".env.example", "makefile", } # Extensions/patterns to ignore explicitly IGNORE_EXTENSIONS: set[str] = { ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".webp", ".mp4", ".mov", ".avi", ".mkv", ".webm", ".pdf", ".docx", ".xlsx", ".zip", ".tar", ".gz", ".bz2", ".7z", ".exe", ".dll", ".so", ".dylib", ".woff", ".woff2", ".ttf", ".eot", ".lock", # package-lock.json, yarn.lock, Cargo.lock, etc. ".map", # source maps ".min.js", # minified assets (handled via endswith below) } IGNORE_FILENAME_PATTERNS: tuple[str, ...] = ( ".min.js", ".min.css", "-lock.json", ".lock", ".pb", # protobuf binaries ) # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class FileContext: filename: str status: str # added | modified | removed | renamed additions: int deletions: int patch: Optional[str] before_content: Optional[str] = None # full file at old commit after_content: Optional[str] = None # full file at new commit @dataclass class CommitContext: message: str author: str timestamp: str old_sha: str new_sha: str total_additions: int total_deletions: int total_changed_files: int files: list[FileContext] = field(default_factory=list) # --------------------------------------------------------------------------- # GitHub helpers # --------------------------------------------------------------------------- class GitHubClient: def __init__(self, token: Optional[str] = None): self.session = requests.Session() self.session.headers["Accept"] = "application/vnd.github+json" if token: self.session.headers["Authorization"] = f"Bearer {token}" def _get(self, url: str, params: Optional[dict] = None) -> dict | list: resp = self.session.get(url, params=params, timeout=30) resp.raise_for_status() return resp.json() def get_raw(self, url: str) -> str: resp = self.session.get(url, timeout=30) resp.raise_for_status() return resp.text # ---- domain methods --------------------------------------------------- def latest_two_shas(self, owner: str, repo: str) -> tuple[str, str]: """Return (old_sha, new_sha) for the two most recent commits.""" commits = self._get( f"{GITHUB_API}/repos/{owner}/{repo}/commits", params={"per_page": 2}, ) if len(commits) < 2: raise ValueError("Repository has fewer than 2 commits.") new_sha: str = commits[0]["sha"] old_sha: str = commits[1]["sha"] return old_sha, new_sha def compare(self, owner: str, repo: str, old: str, new: str) -> dict: return self._get( f"{GITHUB_API}/repos/{owner}/{repo}/compare/{old}...{new}" ) def file_content_at(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]: """Return raw file content at a specific commit ref, or None if missing.""" try: meta = self._get( f"{GITHUB_API}/repos/{owner}/{repo}/contents/{path}", params={"ref": ref}, ) download_url: str = meta.get("download_url", "") if not download_url: return None return self.get_raw(download_url) except requests.HTTPError: return None # --------------------------------------------------------------------------- # Filtering # --------------------------------------------------------------------------- def _should_keep(filename: str) -> bool: lower = filename.lower() basename = lower.split("/")[-1] # Explicit ignore patterns take priority for pat in IGNORE_FILENAME_PATTERNS: if lower.endswith(pat): return False # Keep by exact filename match (infra / config files) if basename in KEEP_FILENAMES: return True # Keep by extension for ext in KEEP_EXTENSIONS: if lower.endswith(ext): return True # Ignore by extension for ext in IGNORE_EXTENSIONS: if lower.endswith(ext): return False # Default: skip unknown binary-looking files return False # --------------------------------------------------------------------------- # Pipeline steps # --------------------------------------------------------------------------- def parse_repo_url(url: str) -> tuple[str, str]: """Extract (owner, repo) from a GitHub URL.""" parsed = urlparse(url.rstrip("/")) parts = [p for p in parsed.path.split("/") if p] if len(parts) < 2: raise ValueError(f"Cannot parse owner/repo from URL: {url}") owner, repo = parts[0], parts[1] if repo.endswith(".git"): repo = repo[:-4] return owner, repo def fetch_commit_context( client: GitHubClient, owner: str, repo: str, ) -> CommitContext: """Steps 2–6: fetch SHAs, comparison, file contents.""" # Step 2 — latest two SHAs old_sha, new_sha = client.latest_two_shas(owner, repo) # Step 3 — comparison comparison = client.compare(owner, repo, old_sha, new_sha) commit_meta = comparison["commits"][-1]["commit"] message: str = commit_meta["message"] author: str = commit_meta["author"]["name"] timestamp: str = commit_meta["author"]["date"] stats = comparison.get("stats", {}) total_additions: int = stats.get("additions", 0) total_deletions: int = stats.get("deletions", 0) raw_files: list[dict] = comparison.get("files", []) total_changed_files: int = len(raw_files) # Step 4 — filter files filtered: list[dict] = [f for f in raw_files if _should_keep(f["filename"])] # --- NEW: Sort by total changes (additions + deletions) descending and take top 2 --- filtered = sorted( filtered, key=lambda x: x.get("additions", 0) + x.get("deletions", 0), reverse=True )[:2] # Step 5 + 6 — build FileContext, fetch before/after content file_contexts: list[FileContext] = [] for f in filtered: filename: str = f["filename"] status: str = f.get("status", "modified") fc = FileContext( filename=filename, status=status, additions=f.get("additions", 0), deletions=f.get("deletions", 0), patch=f.get("patch"), ) # Fetch full file content for semantic context (Step 6) if status != "added": fc.before_content = client.file_content_at(owner, repo, filename, old_sha) if status != "removed": fc.after_content = client.file_content_at(owner, repo, filename, new_sha) file_contexts.append(fc) return CommitContext( message=message, author=author, timestamp=timestamp, old_sha=old_sha, new_sha=new_sha, total_additions=total_additions, total_deletions=total_deletions, total_changed_files=total_changed_files, files=file_contexts, ) # --------------------------------------------------------------------------- # Step 7 — Build per-file prompts # --------------------------------------------------------------------------- def build_prompts(ctx: CommitContext) -> list[str]: """ Return one structured prompt string per changed file. Each prompt contains: - commit-level header (message, author, timestamp, stats) - file-specific info (status, additions/deletions) - before/after content (if available) - the diff patch """ prompts: list[str] = [] commit_header = ( "=== COMMIT METADATA ===\n" f"Message : {ctx.message}\n" f"Author : {ctx.author}\n" ) for fc in ctx.files: sections: list[str] = [commit_header] # File identity sections.append( "=== FILE ===\n" f"Filename : {fc.filename}\n" ) # # Before content # if fc.before_content is not None: # sections.append( # "=== BEFORE CODE ===\n" # f"{fc.before_content}\n" # ) # else: # sections.append("=== BEFORE CODE ===\n(file did not exist)\n") # # After content # if fc.after_content is not None: # sections.append( # "=== AFTER CODE ===\n" # f"{fc.after_content}\n" # ) # else: # sections.append("=== AFTER CODE ===\n(file was deleted)\n") # Diff patch if fc.patch: sections.append( "=== DIFF ===\n" f"{fc.patch}\n" ) else: sections.append("=== DIFF ===\n(no patch available)\n") prompts.append("\n".join(sections)) return prompts # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def run_pipeline( repo_url: str, token: Optional[str] = None, ) -> list[str]: """ Full CommitLens pipeline. Parameters ---------- repo_url : str GitHub repository URL, e.g. ``https://github.com/owner/repo`` token : str, optional GitHub personal access token for authenticated requests (higher rate limits, private repos). Returns ------- list[str] One prompt string per changed source-code file. """ owner, repo = parse_repo_url(repo_url) client = GitHubClient(token=token) ctx = fetch_commit_context(client, owner, repo) return build_prompts(ctx) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _cli() -> None: parser = argparse.ArgumentParser( description="CommitLens: extract per-file commit prompts for Melum 2", ) parser.add_argument("repo_url", help="GitHub repository URL") parser.add_argument( "--token", "-t", default=None, help="GitHub Personal Access Token (optional but recommended)", ) parser.add_argument( "--print-prompts", "-p", action="store_true", help="Print all generated prompts to stdout", ) args = parser.parse_args() try: prompts = run_pipeline(args.repo_url, token=args.token) except Exception as exc: print(f"[ERROR] {exc}", file=sys.stderr) sys.exit(1) print(f"\n[CommitLens] Generated {len(prompts)} prompt(s).\n") if args.print_prompts: for i, prompt in enumerate(prompts, 1): print(f"{'='*60}") print(f"PROMPT {i} / {len(prompts)}") print(f"{'='*60}") print(prompt) print() else: for i, prompt in enumerate(prompts, 1): # Print just the file header so the caller sees what was captured first_line = [ line for line in prompt.splitlines() if line.startswith("Filename :") ] label = first_line[0] if first_line else f"File {i}" print(f" [{i}] {label}") # Return value is available when imported as a module return prompts if __name__ == "__main__": _cli()