Spaces:
Running on Zero
Running on Zero
| """ | |
| CommitLens — Information Extraction Pipeline | |
| ============================================= | |
| Fetches the two latest commits from a GitHub repository, diffs them, | |
| and returns a list of per-file structured prompts ready for Melum 2. | |
| Usage: | |
| python commitlens.py <github_repo_url> [--token <PAT>] | |
| Example: | |
| python commitlens.py https://github.com/psf/requests --token ghp_xxx | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import sys | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from urllib.parse import urlparse | |
| import requests # pip install requests | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| GITHUB_API = "https://api.github.com" | |
| # Source-code extensions to keep (lower-cased, with leading dot) | |
| KEEP_EXTENSIONS: set[str] = { | |
| ".py", ".js", ".ts", ".tsx", ".jsx", | |
| ".java", ".cpp", ".c", ".h", ".hpp", | |
| ".go", ".rs", ".php", ".rb", ".cs", | |
| ".swift", ".kt", ".scala", ".r", ".m", | |
| ".sh", ".bash", ".zsh", | |
| } | |
| # Config / infra filenames to keep regardless of extension | |
| KEEP_FILENAMES: set[str] = { | |
| "dockerfile", | |
| "docker-compose.yml", | |
| "docker-compose.yaml", | |
| "requirements.txt", | |
| "package.json", | |
| "pyproject.toml", | |
| "setup.py", | |
| "setup.cfg", | |
| "cargo.toml", | |
| "go.mod", | |
| ".env.example", | |
| "makefile", | |
| } | |
| # Extensions/patterns to ignore explicitly | |
| IGNORE_EXTENSIONS: set[str] = { | |
| ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".webp", | |
| ".mp4", ".mov", ".avi", ".mkv", ".webm", | |
| ".pdf", ".docx", ".xlsx", | |
| ".zip", ".tar", ".gz", ".bz2", ".7z", | |
| ".exe", ".dll", ".so", ".dylib", | |
| ".woff", ".woff2", ".ttf", ".eot", | |
| ".lock", # package-lock.json, yarn.lock, Cargo.lock, etc. | |
| ".map", # source maps | |
| ".min.js", # minified assets (handled via endswith below) | |
| } | |
| IGNORE_FILENAME_PATTERNS: tuple[str, ...] = ( | |
| ".min.js", | |
| ".min.css", | |
| "-lock.json", | |
| ".lock", | |
| ".pb", # protobuf binaries | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Data classes | |
| # --------------------------------------------------------------------------- | |
| class FileContext: | |
| filename: str | |
| status: str # added | modified | removed | renamed | |
| additions: int | |
| deletions: int | |
| patch: Optional[str] | |
| before_content: Optional[str] = None # full file at old commit | |
| after_content: Optional[str] = None # full file at new commit | |
| class CommitContext: | |
| message: str | |
| author: str | |
| timestamp: str | |
| old_sha: str | |
| new_sha: str | |
| total_additions: int | |
| total_deletions: int | |
| total_changed_files: int | |
| files: list[FileContext] = field(default_factory=list) | |
| # --------------------------------------------------------------------------- | |
| # GitHub helpers | |
| # --------------------------------------------------------------------------- | |
| class GitHubClient: | |
| def __init__(self, token: Optional[str] = None): | |
| self.session = requests.Session() | |
| self.session.headers["Accept"] = "application/vnd.github+json" | |
| if token: | |
| self.session.headers["Authorization"] = f"Bearer {token}" | |
| def _get(self, url: str, params: Optional[dict] = None) -> dict | list: | |
| resp = self.session.get(url, params=params, timeout=30) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def get_raw(self, url: str) -> str: | |
| resp = self.session.get(url, timeout=30) | |
| resp.raise_for_status() | |
| return resp.text | |
| # ---- domain methods --------------------------------------------------- | |
| def latest_two_shas(self, owner: str, repo: str) -> tuple[str, str]: | |
| """Return (old_sha, new_sha) for the two most recent commits.""" | |
| commits = self._get( | |
| f"{GITHUB_API}/repos/{owner}/{repo}/commits", | |
| params={"per_page": 2}, | |
| ) | |
| if len(commits) < 2: | |
| raise ValueError("Repository has fewer than 2 commits.") | |
| new_sha: str = commits[0]["sha"] | |
| old_sha: str = commits[1]["sha"] | |
| return old_sha, new_sha | |
| def compare(self, owner: str, repo: str, old: str, new: str) -> dict: | |
| return self._get( | |
| f"{GITHUB_API}/repos/{owner}/{repo}/compare/{old}...{new}" | |
| ) | |
| def file_content_at(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]: | |
| """Return raw file content at a specific commit ref, or None if missing.""" | |
| try: | |
| meta = self._get( | |
| f"{GITHUB_API}/repos/{owner}/{repo}/contents/{path}", | |
| params={"ref": ref}, | |
| ) | |
| download_url: str = meta.get("download_url", "") | |
| if not download_url: | |
| return None | |
| return self.get_raw(download_url) | |
| except requests.HTTPError: | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Filtering | |
| # --------------------------------------------------------------------------- | |
| def _should_keep(filename: str) -> bool: | |
| lower = filename.lower() | |
| basename = lower.split("/")[-1] | |
| # Explicit ignore patterns take priority | |
| for pat in IGNORE_FILENAME_PATTERNS: | |
| if lower.endswith(pat): | |
| return False | |
| # Keep by exact filename match (infra / config files) | |
| if basename in KEEP_FILENAMES: | |
| return True | |
| # Keep by extension | |
| for ext in KEEP_EXTENSIONS: | |
| if lower.endswith(ext): | |
| return True | |
| # Ignore by extension | |
| for ext in IGNORE_EXTENSIONS: | |
| if lower.endswith(ext): | |
| return False | |
| # Default: skip unknown binary-looking files | |
| return False | |
| # --------------------------------------------------------------------------- | |
| # Pipeline steps | |
| # --------------------------------------------------------------------------- | |
| def parse_repo_url(url: str) -> tuple[str, str]: | |
| """Extract (owner, repo) from a GitHub URL.""" | |
| parsed = urlparse(url.rstrip("/")) | |
| parts = [p for p in parsed.path.split("/") if p] | |
| if len(parts) < 2: | |
| raise ValueError(f"Cannot parse owner/repo from URL: {url}") | |
| owner, repo = parts[0], parts[1] | |
| if repo.endswith(".git"): | |
| repo = repo[:-4] | |
| return owner, repo | |
| def fetch_commit_context( | |
| client: GitHubClient, | |
| owner: str, | |
| repo: str, | |
| ) -> CommitContext: | |
| """Steps 2–6: fetch SHAs, comparison, file contents.""" | |
| # Step 2 — latest two SHAs | |
| old_sha, new_sha = client.latest_two_shas(owner, repo) | |
| # Step 3 — comparison | |
| comparison = client.compare(owner, repo, old_sha, new_sha) | |
| commit_meta = comparison["commits"][-1]["commit"] | |
| message: str = commit_meta["message"] | |
| author: str = commit_meta["author"]["name"] | |
| timestamp: str = commit_meta["author"]["date"] | |
| stats = comparison.get("stats", {}) | |
| total_additions: int = stats.get("additions", 0) | |
| total_deletions: int = stats.get("deletions", 0) | |
| raw_files: list[dict] = comparison.get("files", []) | |
| total_changed_files: int = len(raw_files) | |
| # Step 4 — filter files | |
| filtered: list[dict] = [f for f in raw_files if _should_keep(f["filename"])] | |
| # --- NEW: Sort by total changes (additions + deletions) descending and take top 2 --- | |
| filtered = sorted( | |
| filtered, | |
| key=lambda x: x.get("additions", 0) + x.get("deletions", 0), | |
| reverse=True | |
| )[:2] | |
| # Step 5 + 6 — build FileContext, fetch before/after content | |
| file_contexts: list[FileContext] = [] | |
| for f in filtered: | |
| filename: str = f["filename"] | |
| status: str = f.get("status", "modified") | |
| fc = FileContext( | |
| filename=filename, | |
| status=status, | |
| additions=f.get("additions", 0), | |
| deletions=f.get("deletions", 0), | |
| patch=f.get("patch"), | |
| ) | |
| # Fetch full file content for semantic context (Step 6) | |
| if status != "added": | |
| fc.before_content = client.file_content_at(owner, repo, filename, old_sha) | |
| if status != "removed": | |
| fc.after_content = client.file_content_at(owner, repo, filename, new_sha) | |
| file_contexts.append(fc) | |
| return CommitContext( | |
| message=message, | |
| author=author, | |
| timestamp=timestamp, | |
| old_sha=old_sha, | |
| new_sha=new_sha, | |
| total_additions=total_additions, | |
| total_deletions=total_deletions, | |
| total_changed_files=total_changed_files, | |
| files=file_contexts, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Step 7 — Build per-file prompts | |
| # --------------------------------------------------------------------------- | |
| def build_prompts(ctx: CommitContext) -> list[str]: | |
| """ | |
| Return one structured prompt string per changed file. | |
| Each prompt contains: | |
| - commit-level header (message, author, timestamp, stats) | |
| - file-specific info (status, additions/deletions) | |
| - before/after content (if available) | |
| - the diff patch | |
| """ | |
| prompts: list[str] = [] | |
| commit_header = ( | |
| "=== COMMIT METADATA ===\n" | |
| f"Message : {ctx.message}\n" | |
| f"Author : {ctx.author}\n" | |
| ) | |
| for fc in ctx.files: | |
| sections: list[str] = [commit_header] | |
| # File identity | |
| sections.append( | |
| "=== FILE ===\n" | |
| f"Filename : {fc.filename}\n" | |
| ) | |
| # # Before content | |
| # if fc.before_content is not None: | |
| # sections.append( | |
| # "=== BEFORE CODE ===\n" | |
| # f"{fc.before_content}\n" | |
| # ) | |
| # else: | |
| # sections.append("=== BEFORE CODE ===\n(file did not exist)\n") | |
| # # After content | |
| # if fc.after_content is not None: | |
| # sections.append( | |
| # "=== AFTER CODE ===\n" | |
| # f"{fc.after_content}\n" | |
| # ) | |
| # else: | |
| # sections.append("=== AFTER CODE ===\n(file was deleted)\n") | |
| # Diff patch | |
| if fc.patch: | |
| sections.append( | |
| "=== DIFF ===\n" | |
| f"{fc.patch}\n" | |
| ) | |
| else: | |
| sections.append("=== DIFF ===\n(no patch available)\n") | |
| prompts.append("\n".join(sections)) | |
| return prompts | |
| # --------------------------------------------------------------------------- | |
| # Public entry point | |
| # --------------------------------------------------------------------------- | |
| def run_pipeline( | |
| repo_url: str, | |
| token: Optional[str] = None, | |
| ) -> list[str]: | |
| """ | |
| Full CommitLens pipeline. | |
| Parameters | |
| ---------- | |
| repo_url : str | |
| GitHub repository URL, e.g. ``https://github.com/owner/repo`` | |
| token : str, optional | |
| GitHub personal access token for authenticated requests (higher rate | |
| limits, private repos). | |
| Returns | |
| ------- | |
| list[str] | |
| One prompt string per changed source-code file. | |
| """ | |
| owner, repo = parse_repo_url(repo_url) | |
| client = GitHubClient(token=token) | |
| ctx = fetch_commit_context(client, owner, repo) | |
| return build_prompts(ctx) | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def _cli() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="CommitLens: extract per-file commit prompts for Melum 2", | |
| ) | |
| parser.add_argument("repo_url", help="GitHub repository URL") | |
| parser.add_argument( | |
| "--token", "-t", | |
| default=None, | |
| help="GitHub Personal Access Token (optional but recommended)", | |
| ) | |
| parser.add_argument( | |
| "--print-prompts", "-p", | |
| action="store_true", | |
| help="Print all generated prompts to stdout", | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| prompts = run_pipeline(args.repo_url, token=args.token) | |
| except Exception as exc: | |
| print(f"[ERROR] {exc}", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"\n[CommitLens] Generated {len(prompts)} prompt(s).\n") | |
| if args.print_prompts: | |
| for i, prompt in enumerate(prompts, 1): | |
| print(f"{'='*60}") | |
| print(f"PROMPT {i} / {len(prompts)}") | |
| print(f"{'='*60}") | |
| print(prompt) | |
| print() | |
| else: | |
| for i, prompt in enumerate(prompts, 1): | |
| # Print just the file header so the caller sees what was captured | |
| first_line = [ | |
| line for line in prompt.splitlines() | |
| if line.startswith("Filename :") | |
| ] | |
| label = first_line[0] if first_line else f"File {i}" | |
| print(f" [{i}] {label}") | |
| # Return value is available when imported as a module | |
| return prompts | |
| if __name__ == "__main__": | |
| _cli() | |