import requests import base64 import json import zipfile import io import os from typing import List, Dict, Tuple, Optional from pathlib import Path import re from utils import matches_patterns, is_binary_file, format_file_size from config import GITHUB_API_BASE, HF_API_BASE def process_github_repo( repo_path: str, token: str, include_patterns: List[str], exclude_patterns: List[str], max_file_size: int ) -> Tuple[List[Tuple[str, str, int]], Dict]: """Process GitHub repository and return file contents""" headers = {} if token: headers['Authorization'] = f'token {token}' # Get repository info repo_url = f"{GITHUB_API_BASE}/repos/{repo_path}" repo_response = requests.get(repo_url, headers=headers) if repo_response.status_code != 200: raise Exception(f"Failed to fetch repository info: {repo_response.json().get('message', 'Unknown error')}") repo_info = repo_response.json() # Get all files recursively files_data = [] contents_queue = [""] while contents_queue: current_path = contents_queue.pop(0) # Get directory contents contents_url = f"{GITHUB_API_BASE}/repos/{repo_path}/contents/{current_path}" contents_response = requests.get(contents_url, headers=headers) if contents_response.status_code != 200: continue contents = contents_response.json() if isinstance(contents, dict): # Single file contents = [contents] for item in contents: item_path = f"{current_path}/{item['name']}" if current_path else item['name'] if item['type'] == 'dir': contents_queue.append(item_path) elif item['type'] == 'file': # Check if file matches patterns if not matches_patterns(item_path, include_patterns, exclude_patterns): continue # Check file size if item['size'] > max_file_size: continue # Get file content try: file_url = item['url'] file_response = requests.get(file_url, headers=headers) if file_response.status_code == 200: file_data = file_response.json() content = base64.b64decode(file_data['content']).decode('utf-8', errors='ignore') # Skip binary files if is_binary_file(content, item_path): continue files_data.append((item_path, content, item['size'])) except Exception as e: print(f"Error processing file {item_path}: {e}") continue return files_data, repo_info def process_huggingface_repo( repo_path: str, token: str, include_patterns: List[str], exclude_patterns: List[str], max_file_size: int ) -> Tuple[List[Tuple[str, str, int]], Dict]: """Process Hugging Face repository and return file contents""" headers = {} if token: headers['Authorization'] = f'Bearer {token}' # Get repository info repo_url = f"{HF_API_BASE}/api/models/{repo_path}" repo_response = requests.get(repo_url, headers=headers) if repo_response.status_code != 200: raise Exception(f"Failed to fetch repository info: {repo_response.json().get('error', 'Unknown error')}") repo_info = repo_response.json() # Get repository tree tree_url = f"{HF_API_BASE}/api/models/{repo_path}/tree/main" tree_response = requests.get(tree_url, headers=headers) if tree_response.status_code != 200: raise Exception(f"Failed to fetch repository tree: {tree_response.json().get('error', 'Unknown error')}") tree_data = tree_response.json() files_data = [] def process_tree_item(item, current_path=""): if isinstance(item, list): for subitem in item: process_tree_item(subitem, current_path) elif isinstance(item, dict): item_path = f"{current_path}/{item['path']}" if current_path else item['path'] if item['type'] == 'directory': # Get directory contents dir_url = f"{HF_API_BASE}/api/models/{repo_path}/tree/main/{item_path}" dir_response = requests.get(dir_url, headers=headers) if dir_response.status_code == 200: process_tree_item(dir_response.json(), item_path) elif item['type'] == 'file': # Check if file matches patterns if not matches_patterns(item_path, include_patterns, exclude_patterns): return # Check file size if item.get('size', 0) > max_file_size: return # Get file content try: raw_url = f"https://huggingface.co/{repo_path}/raw/main/{item_path}" file_response = requests.get(raw_url, headers=headers) if file_response.status_code == 200: content = file_response.text # Skip binary files if is_binary_file(content, item_path): return files_data.append((item_path, content, len(content))) except Exception as e: print(f"Error processing file {item_path}: {e}") return process_tree_item(tree_data) return files_data, repo_info def download_repo_as_zip(repo_url: str, token: str) -> str: """Download repository as ZIP file""" if "github.com" in repo_url: # GitHub ZIP URL if token: headers = {'Authorization': f'token {token}'} zip_url = repo_url.replace("github.com", "api.github.com/repos") + "/zipball/main" else: headers = {} zip_url = repo_url.replace("github.com", "codeload.github.com") + "/zip/main" elif "huggingface.co" in repo_url: # Hugging Face ZIP URL headers = {} if token: headers['Authorization'] = f'Bearer {token}' zip_url = repo_url.replace("huggingface.co", "huggingface.co") + "/resolve/main?download=true" else: raise ValueError("Unsupported repository URL") response = requests.get(zip_url, headers=headers, stream=True) if response.status_code != 200: raise Exception(f"Failed to download repository: {response.status_code}") # Save to temporary file temp_path = f"/tmp/repo_{hash(repo_url)}.zip" with open(temp_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return temp_path def extract_repo_info(repo_url: str, repo_type: str) -> Dict: """Extract basic repository information""" if repo_type == "github": # Extract owner and repo name match = re.search(r'github\.com/([^/]+)/([^/]+)', repo_url) if match: return { 'owner': match.group(1), 'repo': match.group(2), 'full_name': f"{match.group(1)}/{match.group(2)}", 'url': repo_url } elif repo_type == "huggingface": # Extract owner and repo name match = re.search(r'huggingface\.co/([^/]+)/([^/]+)', repo_url) if match: return { 'owner': match.group(1), 'repo': match.group(2), 'full_name': f"{match.group(1)}/{match.group(2)}", 'url': repo_url } return {'url': repo_url}