|
|
import io |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Callable, Dict, Optional |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
from structlog import get_logger |
|
|
import requests |
|
|
import zipfile |
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
class GitHubRepoDownloader: |
|
|
def __init__(self, repo_url: str, branch: str = "main", cache_dir: str = ".cache"): |
|
|
""" |
|
|
Initialize downloader with a GitHub repo URL. |
|
|
|
|
|
Args: |
|
|
repo_url: Full GitHub repo URL (e.g., https://github.com/owner/repo) |
|
|
branch: Branch name to download (default: main) |
|
|
cache_dir: Directory to cache downloaded files |
|
|
""" |
|
|
self.owner, self.repo = self._parse_repo_url(repo_url) |
|
|
self.branch = branch |
|
|
self.cache_dir = Path(cache_dir) |
|
|
self.cache_dir.mkdir(exist_ok=True) |
|
|
self._validate_branch() |
|
|
|
|
|
def _parse_repo_url(self, repo_url: str) -> tuple[str, str]: |
|
|
"""Extract owner and repo name from GitHub URL""" |
|
|
|
|
|
repo_url = repo_url.rstrip('/').replace('.git', '') |
|
|
|
|
|
|
|
|
parsed = urlparse(repo_url) |
|
|
if 'github.com' not in repo_url: |
|
|
message = f"Not a GitHub URL: {repo_url}" |
|
|
logger.error(message) |
|
|
raise ValueError(message) |
|
|
|
|
|
parts = repo_url.split('/') |
|
|
if len(parts) < 2: |
|
|
message = f"Invalid GitHub URL format: {repo_url}" |
|
|
logger.error(message) |
|
|
raise ValueError(message) |
|
|
|
|
|
repo = parts[-1] |
|
|
owner = parts[-2] |
|
|
|
|
|
return owner, repo |
|
|
|
|
|
def _validate_branch(self) -> None: |
|
|
"""Validate that the branch exists in the repository""" |
|
|
url = f"https://api.github.com/repos/{self.owner}/{self.repo}/branches/{self.branch}" |
|
|
logger.info(f"Validating branch: {self.branch}") |
|
|
|
|
|
response = requests.get(url) |
|
|
if response.status_code == 404: |
|
|
message = f"Branch '{self.branch}' not found in {self.owner}/{self.repo}" |
|
|
logger.error(message) |
|
|
raise ValueError(message) |
|
|
response.raise_for_status() |
|
|
|
|
|
def _get_cache_path(self) -> Path: |
|
|
"""Get the cache file path for this repo""" |
|
|
return self.cache_dir / f"{self.owner}_{self.repo}_{self.branch}.zip" |
|
|
|
|
|
def _download_zip(self) -> Path: |
|
|
"""Download repo ZIP to cache""" |
|
|
cache_path = self._get_cache_path() |
|
|
|
|
|
|
|
|
if cache_path.exists(): |
|
|
logger.info(f"Using cached file: {cache_path}") |
|
|
return cache_path |
|
|
|
|
|
|
|
|
url = f"https://github.com/{self.owner}/{self.repo}/archive/refs/heads/{self.branch}.zip" |
|
|
logger.info(f"Downloading {self.owner}/{self.repo} (branch: {self.branch})...") |
|
|
|
|
|
response = requests.get(url) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
with open(cache_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
|
|
|
logger.info(f"Saved to cache: {cache_path}") |
|
|
return cache_path |
|
|
|
|
|
def read_files(self, file_filter: Optional[Callable[[str], bool]] = None) -> Dict[str, str]: |
|
|
""" |
|
|
Read files from the repo without extracting. |
|
|
|
|
|
Args: |
|
|
file_filter: Optional function to filter files (e.g., lambda path: path.endswith('.py')) |
|
|
|
|
|
|
|
|
Returns: |
|
|
Dictionary mapping file paths to their contents |
|
|
""" |
|
|
|
|
|
cache_path = self._download_zip() |
|
|
|
|
|
files_content = {} |
|
|
|
|
|
with zipfile.ZipFile(cache_path) as zip_file: |
|
|
for filename in zip_file.namelist(): |
|
|
|
|
|
if filename.endswith('/'): |
|
|
continue |
|
|
|
|
|
|
|
|
clean_path = '/'.join(filename.split('/')[1:]) |
|
|
if not clean_path: |
|
|
continue |
|
|
|
|
|
|
|
|
if file_filter and not file_filter(clean_path): |
|
|
continue |
|
|
|
|
|
logger.info(f"Reading: {clean_path}") |
|
|
|
|
|
try: |
|
|
with zip_file.open(filename) as file: |
|
|
content = file.read().decode('utf-8', errors='ignore') |
|
|
files_content[clean_path] = content |
|
|
except Exception as e: |
|
|
logger.exception(f"⚠️ Error reading {clean_path}: {e}") |
|
|
|
|
|
return files_content |
|
|
|