diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,64 +1,2354 @@ import gradio as gr -from huggingface_hub import InferenceClient +import google.generativeai as genai +import os +from dotenv import load_dotenv +from github import Github, RateLimitExceededException, GithubException +import json +from pathlib import Path +from datetime import datetime, timedelta +from collections import defaultdict +import base64 +from typing import Dict, List, Any, Optional, Tuple +import tempfile +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type +import asyncio +import aiohttp +import re +import ast +from concurrent.futures import ThreadPoolExecutor +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +from packaging import version +import requests +from bs4 import BeautifulSoup +import networkx as nx +import math +import logging +import time -""" -For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference -""" -client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Load environment variables (consider handling missing .env) +load_dotenv() + +# --- Constants and Global Variables --- + +# Store API tokens globally +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") #getting github token using os +GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") #getting gemini api key using os + +# Constants for rate limiting - make them configurable if needed +MIN_RATE_LIMIT_BUFFER = 50 # Keep a buffer to avoid hitting the limit +INITIAL_BACKOFF = 60 # Initial backoff time in seconds + +# Enhanced relevant file extensions +RELEVANT_EXTENSIONS = { + ".py": "Python", + ".js": "JavaScript", + ".ts": "TypeScript", + ".jsx": "React", + ".tsx": "React TypeScript", + ".java": "Java", + ".cpp": "C++", + ".c": "C", + ".h": "C Header", + ".hpp": "C++ Header", + ".rb": "Ruby", + ".php": "PHP", + ".go": "Go", + ".rs": "Rust", + ".swift": "Swift", + ".kt": "Kotlin", + ".cs": "C#", + ".scala": "Scala", + ".r": "R", + ".dart": "Dart", + ".lua": "Lua", + ".sql": "SQL", + ".sh": "Shell", + ".md": "Markdown", # Include Markdown for documentation analysis + ".txt": "Text", + ".json": "JSON", + ".yml": "YAML", + ".yaml": "YAML", + ".xml": "XML", + ".html": "HTML", + ".css": "CSS" +} + + +# --- Initialization and Validation --- + +def validate_github_token(token: str) -> Tuple[bool, str]: + """ + Validate GitHub token before proceeding with analysis. + Returns (is_valid: bool, message: str) + """ + if not token: + return False, "GitHub token is missing." # Check for missing + + try: + gh = Github(token) + user = gh.get_user() + username = user.login #important: accessing properties for validation + rate_limit = gh.get_rate_limit() + remaining = rate_limit.core.remaining + + + if remaining == 0: #using remaining + reset_time = rate_limit.core.reset.strftime("%Y-%m-%d %H:%M:%S UTC") + return False, f"Rate limit exceeded. Resets at {reset_time}" + + return True, f"Token validated successfully (authenticated as {username})" + + + except GithubException as e: + if e.status == 401: + return False, "Invalid token - authentication failed" + elif e.status == 403: + return False, "Token lacks required permissions or rate limit exceeded" #more specific 403 message + elif e.status == 404: + return False, "Invalid token or API endpoint not found" # More specific 404 message + else: + + return False, f"GitHub error (status {e.status}): {e.data.get('message', str(e))}" + except Exception as e: # General exception handling as a fallback. + return False, f"Error validating token: {str(e)}" + + +def initialize_tokens(github_token: str, gemini_key: str) -> str: + """Initialize API tokens globally with enhanced validation (using env vars now).""" + global GITHUB_TOKEN, GEMINI_API_KEY + + if not github_token or not gemini_key: + return "❌ Both GitHub and Gemini API keys are required." + + is_valid, message = validate_github_token(github_token) + if not is_valid: + return f"❌ GitHub token validation failed: {message}" + + try: + genai.configure(api_key=gemini_key) + model = genai.GenerativeModel('gemini-1.0-pro') + response = model.generate_content("Test") + if response.text is None : # important check. + return "❌ Invalid Gemini API key (no response)" #More informative. + # else: + # return "Invalid" + except Exception as e: + return f"❌ Gemini API key validation failed: {str(e)}" + + GITHUB_TOKEN = github_token # Overwrite with validated tokens + GEMINI_API_KEY = gemini_key + + return "✅ All tokens validated and initialized successfully!" + + +# --- Classes --- + +class GitHubAPIHandler: + """Enhanced GitHub API handler with minimal authentication checks and robust error handling.""" + + def __init__(self, token: Optional[str] = None): + self.logger = logging.getLogger(__name__) + self.token = token + self._min_rate_limit_buffer = MIN_RATE_LIMIT_BUFFER + self._initial_backoff = INITIAL_BACKOFF + if not self.token: + raise ValueError("GitHub token not provided") + # Create the GitHub client *within* the class + self.gh = self._create_github_client() + + def _create_github_client(self) -> Github: + """Create GitHub client with enhanced error handling""" + try: + # Create Github instance with basic configuration + gh = Github( + self.token, + retry=3, # Number of retries for failed requests + timeout=30, # Timeout in seconds + per_page=100 # Maximum items per page + ) + + # Verify authentication + try: + user = gh.get_user() + self.logger.info(f"Authenticated as: {user.login}") + except GithubException as e: + if e.status == 401: + raise ValueError("Invalid GitHub token - authentication failed") + elif e.status == 403: + raise ValueError("GitHub token lacks required permissions or rate limit exceeded") + else: + raise ValueError(f"GitHub initialization failed: {str(e)}") + + return gh # Return the authenticated client + except Exception as e: + raise ValueError(f"Failed to initialize GitHub client: {str(e)}") # More informative error + + + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((RateLimitExceededException, GithubException)), + before_sleep=lambda retry_state: logging.info( + f"Rate limited, retrying in {retry_state.next_action.sleep} seconds..."), + ) + def get_repository(self, repo_url: str) -> Any: + """Get repository object using PyGithub, with error handling and validation.""" + try: + parts = repo_url.rstrip('/').split('/') + if len(parts) < 2: + raise ValueError(f"Invalid repository URL format: {repo_url}") + + owner = parts[-2] + repo_name = parts[-1] + + # Using PyGithub's get_repo method + repo = self.gh.get_repo(f"{owner}/{repo_name}") + return repo # Return the repo object + + except GithubException as e: # Specifically handle Github exceptions + if e.status == 404: + raise ValueError(f"Repository not found: {owner}/{repo_name}") + elif e.status == 403: + self._handle_forbidden_error() # Handle forbidden access (rate limits, etc.) + raise #Re raise the exception so program doesn't continue + + else: + raise ValueError(f"Failed to access repository: {str(e)}") + except Exception as e: #catch all other exception. + raise ValueError(f"Failed to access repository(An unexpected error occurred):{str(e)}") + + def _check_rate_limits(self): + """Enhanced rate limit checking with predictive waiting.""" + try: + rate_limit = self.gh.get_rate_limit() + remaining = rate_limit.core.remaining + reset_time = rate_limit.core.reset.timestamp() + + self.logger.info(f"Rate limit - Remaining: {remaining}, Reset: {datetime.fromtimestamp(reset_time)}") + + if remaining < self._min_rate_limit_buffer: + wait_time = self._get_rate_limit_wait_time() + if wait_time > 0: # Only log if there's a wait. + self.logger.warning(f"Approaching rate limit. Waiting {wait_time:.2f} seconds.") + time.sleep(wait_time) # Wait before hitting the limit + + except GithubException as e: # Be specific about the exceptions you handle + self.logger.error(f"Error checking rate limits: {str(e)}") + time.sleep(60) # Wait a reasonable amount of time even if you cannot check + except Exception as e: # Always have general exception to handle + self.logger.error(f"Unexpected Error: {str(e)}") #General unexpected Error handle. + time.sleep(60) + + def _get_rate_limit_wait_time(self) -> float: + """Calculate the time to wait until the rate limit resets.""" + try: + rate_limit = self.gh.get_rate_limit() + reset_time = rate_limit.core.reset.timestamp() + current_time = time.time() + return max(0, reset_time - current_time + 1) # Add 1 second buffer + except Exception: + return self._initial_backoff # Fallback on any error in getting rate limits + + def _handle_forbidden_error(self): + """Handle a 403 Forbidden error from the GitHub API.""" + try: + # Check if it's a rate limit issue. + rate_limit = self.gh.get_rate_limit() + if rate_limit.core.remaining == 0: + wait_time = self._get_rate_limit_wait_time() + self.logger.warning(f"Rate limit exceeded. Waiting {wait_time:.2f} seconds.") + time.sleep(wait_time) + else: + # If not rate limited, then likely a permissions issue + self.logger.error("Access forbidden. Token may lack required permissions.") + + except Exception as e: #handling other errors. + self.logger.error(f"Error handling forbidden response: {str(e)}") + + @retry( + stop=stop_after_attempt(3), # Maximum 3 retries + wait=wait_exponential(multiplier=1, min=4, max=10), #exponential backoff. + reraise=True # Reraise exception after retries. + ) + def get_file_content(self, repo: Any, path: str) -> Optional[str]: + """Get content of a file, with retries, rate limit check and error handling.""" + + try: + self._check_rate_limits() # Check rate limits *before* each attempt. + content = repo.get_contents(path) + return content + except GithubException as e: + if e.status == 404: + self.logger.warning(f"File not found: {path}") # 404 is not critical. + return None # explicitly return None + elif e.status == 403: # Explicitly handle forbidden + self._handle_forbidden_error() # Rate limiting or other access problem + raise # Raise after handling (waiting, logging). + # Any other GitHub error is an issue - log and re-raise + self.logger.error(f"Error getting file content: {str(e)}") #handle + raise #re-raise after loggng + except Exception as e: # General exception for unexpected issue. + self.logger.error(f"Unexpected Error : {str(e)}") #General exception handelling + raise + +class CodeMetricsAnalyzer: + """Handles detailed code metrics analysis with proper error handling.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.size_metrics_cache = {} # Consider if needed with parallelization + + def calculate_halstead_metrics(self, content: str, language: str = "Unknown") -> Dict[str, float]: + """ + Calculate Halstead complexity metrics for code. + """ + try: + # Define language-specific operators (more comprehensive) + operators = { + "Python": set([ + '+', '-', '*', '/', '//', '**', '%', '==', '!=', '>', '<', '>=', '<=', + 'and', 'or', 'not', 'is', 'in', '+=', '-=', '*=', '/=', '=', + 'if', 'elif', 'else', 'for', 'while', 'def', 'class', 'return', + 'yield', 'raise', 'break', 'continue', 'pass', 'assert', + 'import', 'from', 'as', 'try', 'except', 'finally', 'with', 'async', 'await' + ]), + "JavaScript": set([ + '+', '-', '*', '/', '%', '**', '==', '===', '!=', '!==', '>', '<', + '>=', '<=', '&&', '||', '!', '=', '+=', '-=', '*=', '/=', + 'if', 'else', 'for', 'while', 'function', 'return', 'class', + 'new', 'delete', 'typeof', 'instanceof', 'void', 'try', 'catch', + 'finally', 'throw', 'break', 'continue', 'default', 'case', 'async', 'await' + ]), + "Java": set([ # Added Java operators + '+', '-', '*', '/', '%', '++', '--', '==', '!=', '>', '<', '>=', '<=', + '&&', '||', '!', '=', '+=', '-=', '*=', '/=', '%=', + 'if', 'else', 'for', 'while', 'do', 'switch', 'case', 'default', + 'break', 'continue', 'return', 'try', 'catch', 'finally', 'throw', 'throws', + 'class', 'interface', 'extends', 'implements', 'new', 'instanceof', 'this', 'super' + ]), + + }.get(language, set(['+', '-', '*', '/', '=', '==', '>', '<', '>=', '<='])) + + unique_operators = set() + unique_operands = set() + total_operators = 0 + total_operands = 0 + + lines = content.splitlines() + for line in lines: + line = line.strip() + if line.startswith(('#', '//', '/*', '*')): # Handle comments + continue + + for operator in operators: + if operator in line: + unique_operators.add(operator) + total_operators += line.count(operator) + + # Improved operand counting (numbers, strings, identifiers) + numbers = re.findall(r'\b\d+(?:\.\d+)?\b', line) + unique_operands.update(numbers) + total_operands += len(numbers) + + strings = re.findall(r'["\'][^"\']*["\']', line) + unique_operands.update(strings) + total_operands += len(strings) + + identifiers = re.findall(r'\b[a-zA-Z_]\w*\b', line) + for ident in identifiers: + if ident not in operators: + unique_operands.add(ident) + total_operands += 1 + + n1 = len(unique_operators) + n2 = len(unique_operands) + N1 = total_operators + N2 = total_operands + + # Handle edge cases to avoid division by zero + if n1 > 0 and n2 > 0: + program_length = N1 + N2 + vocabulary = n1 + n2 + volume = program_length * (math.log2(vocabulary) if vocabulary > 0 else 0) + difficulty = (n1 * N2) / (2 * n2) if n2 > 0 else 0 + effort = volume * difficulty + time = effort / 18 # Standard Halstead time estimation + else: + program_length = vocabulary = volume = difficulty = effort = time = 0 + + return { + "halstead_unique_operators": n1, + "halstead_unique_operands": n2, + "halstead_total_operators": N1, + "halstead_total_operands": N2, + "halstead_program_length": program_length, + "halstead_vocabulary": vocabulary, + "halstead_volume": volume, + "halstead_difficulty": difficulty, + "halstead_effort": effort, + "halstead_time": time + } + + except Exception as e: + self.logger.error(f"Error calculating Halstead metrics: {str(e)}") + # Return default 0 values for all metrics on error + return {metric: 0 for metric in [ + "halstead_unique_operators", "halstead_unique_operands", + "halstead_total_operators", "halstead_total_operands", + "halstead_program_length", "halstead_vocabulary", + "halstead_volume", "halstead_difficulty", "halstead_effort", "halstead_time" + ]} + + def calculate_comment_density(self, content: str, language: str = "Unknown") -> Dict[str, Any]: + + try: + metrics = { + "comment_lines": 0, + "code_lines": 0, + "blank_lines": 0, + "comment_density": 0.0, + "docstring_lines": 0, # Docstrings (Python) + "total_lines": 0, #Total no of line. + "inline_comments": 0 + } + + patterns = { + "Python": { + "single_line": ["#"], + "multi_start": ['"""', "'''"], + "multi_end": ['"""', "'''"], + "inline_start": "#" + }, + "JavaScript": { + "single_line": ["//"], + "multi_start": ["/*"], + "multi_end": ["*/"], + "inline_start": "//" + }, + "Java": { # Added Java comment patterns + "single_line": ["//"], + "multi_start": ["/*"], + "multi_end": ["*/"], + "inline_start": "//" + } + }.get(language, { + "single_line": ["//", "#"], + "multi_start": ["/*", '"""', "'''"], + "multi_end": ["*/", '"""', "'''"], + "inline_start": ["//", "#"] + }) + + lines = content.splitlines() + in_multiline_comment = False + current_multiline_delimiter = None + + for line in lines: + stripped = line.strip() + metrics["total_lines"] += 1 + + if not stripped: + metrics["blank_lines"] += 1 + continue + + if not in_multiline_comment: + for delimiter in patterns["multi_start"]: + if stripped.startswith(delimiter): + in_multiline_comment = True + current_multiline_delimiter = delimiter + metrics["comment_lines"] += 1 + if delimiter in ['"""', "'''"]: + metrics["docstring_lines"] += 1 + break + elif delimiter in stripped: # Handle same-line multi-line comments + end_delimiter = "*/" if delimiter == "/*" else delimiter + if end_delimiter in stripped[stripped.index(delimiter) + len(delimiter):]: + metrics["comment_lines"] += 1 + if delimiter in ['"""', "'''"]: + metrics["docstring_lines"] += 1 + break + + + if not in_multiline_comment: + is_comment = False + for prefix in patterns["single_line"]: + if stripped.startswith(prefix): + metrics["comment_lines"] += 1 + is_comment = True + break + elif prefix in stripped: # Count inline comments + metrics["inline_comments"] += 1 + break + + + if not is_comment: + metrics["code_lines"] += 1 + + else: + metrics["comment_lines"] += 1 + if current_multiline_delimiter in ['"""', "'''"]: + metrics["docstring_lines"] += 1 + #checking current multi line delimeter stripped + if current_multiline_delimiter in stripped: + # Handle triple quotes properly + if current_multiline_delimiter in ['"""', "'''"] and \ + stripped.count(current_multiline_delimiter) == 1: + continue # + in_multiline_comment = False + current_multiline_delimiter = None + + + non_blank_lines = metrics["total_lines"] - metrics["blank_lines"] #non blank lines calculating. + if non_blank_lines > 0: + metrics["comment_density"] = (metrics["comment_lines"] + metrics["inline_comments"]) / non_blank_lines * 100 + metrics["docstring_density"] = metrics["docstring_lines"] / non_blank_lines * 100 + + if language == "Python": + # Check for module-level docstring + if len(lines) > 0 and (lines[0].strip().startswith('"""') or lines[0].strip().startswith("'''")): + metrics["has_module_docstring"] = True + metrics["module_docstring_lines"] = sum(1 for line in lines + if '"""' not in line and "'''" not in line + and bool(line.strip()))#counts the number of lines within a module-level docstring that are not the delimiters themselves and contain actual text + else: + metrics["has_module_docstring"] = False + metrics["module_docstring_lines"] = 0 + + return metrics + + except Exception as e: + self.logger.error(f"Error calculating comment density: {str(e)}") + # Return 0s for all density metrics on error + return { + "comment_lines": 0, + "code_lines": 0, + "blank_lines": 0, + "comment_density": 0.0, + "docstring_lines": 0, + "total_lines": 0, + "inline_comments": 0, + "error": str(e) # Include the error message + } + + def calculate_cyclomatic_complexity(self, content: str, language: str = "Unknown") -> Dict[str, Any]: + """Calculate cyclomatic complexity metrics for code with language-specific handling.""" + metrics = { + "complexity": 1, # Base complexity (always start at 1) + "cognitive_complexity": 0, + "max_nesting_depth": 0 + } + + try: + lines = content.splitlines() + current_depth = 0 + + # Language-specific complexity indicators (expanded) + complexity_keywords = { + "Python": { + "if", "else", "elif", "for", "while", "try", "except", "with", + "async for", "async with", "break", "continue" + }, + "JavaScript": { + "if", "else", "for", "while", "try", "catch", "switch", "case", + "break", "continue", "&&", "||", "?", "async", "await" # Add async/await + }, + "Java": { # Added Java keywords + "if", "else", "for", "while", "do", "switch", "case", "default", + "break", "continue", "try", "catch", "finally" + } + # Add more language-specific keywords as needed + }.get(language, { + # Default keywords for unknown languages + "if", "else", "elif", "for", "while", "try", "catch", "case", "switch", + "&&", "||", "?", "except", "finally", "with" + }) + + + for line in lines: + # Calculate nesting depth + opens = line.count('{') - line.count('}') + current_depth += opens + metrics["max_nesting_depth"] = max(metrics["max_nesting_depth"], current_depth) + + # Increment complexity for control structures + stripped_line = line.strip() + for keyword in complexity_keywords: + if keyword in stripped_line and not stripped_line.startswith(("//", "#", "/*", "*")): # Exclude comments + metrics["complexity"] += 1 + metrics["cognitive_complexity"] += (1 + current_depth) # Cognitive complexity increase + + + if language == "Python": + # Add complexity for list/dict comprehensions + if "for" in stripped_line and ("[" in stripped_line or "{" in stripped_line): + metrics["complexity"] += 1 + metrics["cognitive_complexity"] += 1 # Also add to cognitive + + return metrics + + except Exception as e: + self.logger.error(f"Error calculating complexity: {str(e)}") + # Return defaults, not just an error string, but also include 1 as base. + return { + "complexity": 1, # Ensure baseline complexity + "cognitive_complexity": 0, + "max_nesting_depth": 0 + } + + def detect_code_duplication(self, content: str, min_lines: int = 6) -> Dict[str, Any]: + """Detect code duplication within the content""" + + try: + metrics = { + "duplicate_blocks": 0, + "duplicate_lines": 0, + "duplication_percentage": 0.0 + } + + lines = content.splitlines() + total_lines = len(lines) + + # Return early if there are not enough lines + if total_lines < min_lines: + return metrics + + blocks = {} + for i in range(total_lines - min_lines + 1): + block = '\n'.join(lines[i:i + min_lines]) + normalized_block = self._normalize_code_block(block) + if normalized_block.strip(): # Ignore all-whitespace blocks + if normalized_block in blocks: + blocks[normalized_block].append(i) + else: + blocks[normalized_block] = [i] + + duplicate_line_set = set() # Track duplicate line indices using a *set* + for block, positions in blocks.items(): + if len(positions) > 1: + metrics["duplicate_blocks"] += 1 # Count duplicate blocks + for pos in positions: + for i in range(pos, pos + min_lines): # Add all lines in duplicate block + duplicate_line_set.add(i) + + metrics["duplicate_lines"] = len(duplicate_line_set) # Total count of duplicated lines + + if total_lines > 0: + metrics["duplication_percentage"] = (metrics["duplicate_lines"] / total_lines) * 100 # Duplication metrics calcutation. + + return metrics + + except Exception as e: + self.logger.error(f"Error detecting code duplication: {str(e)}") + # Return 0 for all duplication metrics in case of error + return { + "duplicate_blocks": 0, + "duplicate_lines": 0, + "duplication_percentage": 0.0 + } + def _normalize_code_block(self, block: str) -> str: + """Normalize a block of code for comparison by removing comments, whitespace, etc.""" + lines = [] + for line in block.splitlines(): + # Remove comments (handle both Python and JavaScript/Java comments) + line = re.sub(r'#.*$', '', line) # Python comments + line = re.sub(r'//.*$', '', line) # JavaScript comments + line = re.sub(r'/\*.*?\*/', '', line) # Multi-line comments + + # Normalize whitespace + line = re.sub(r'\s+', ' ', line.strip()) + + if line: # Add non-empty lines + lines.append(line) + + return '\n'.join(lines) + + def calculate_size_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: + + try: + metrics = { + "size_bytes": len(content), + "total_lines": 0, + "code_lines": 0, + "blank_lines": 0, + "comment_lines": 0, + "avg_line_length": 0, + "max_line_length": 0, + "file_entropy": 0, # Added file entropy. + } + + comments = { # handling diff comments. + "Python": { + "line_comment": "#", + "block_start": ['"""', "'''"], + "block_end": ['"""', "'''"] + }, + "JavaScript": { + "line_comment": "//", + "block_start": ["/*"], + "block_end": ["*/"] + }, + "Java": { # Added Java comment definitions + "line_comment": "//", + "block_start": ["/*"], + "block_end": ["*/"] + } + }.get(language, { + "line_comment": "#", + "block_start": ["/*", '"""', "'''"], + "block_end": ["*/", '"""', "'''"] + }) + + lines = content.splitlines() + total_length = 0 # Track the total character count of all lines + char_counts = {} #count the occurance of characters in file + in_block_comment = False + + for line in lines: + metrics["total_lines"] += 1 + line_length = len(line) #length of lines + total_length += line_length + metrics["max_line_length"] = max(metrics["max_line_length"], line_length) + + + for char in line: + char_counts[char] = char_counts.get(char, 0) + 1 + + stripped = line.strip() # Remove the strip function here. + + if not stripped: + metrics["blank_lines"] += 1 + continue + + if not in_block_comment: + is_comment = False + for start in comments["block_start"]: + if stripped.startswith(start): # Use startswith on the stripped line. + in_block_comment = True + metrics["comment_lines"] += 1 + is_comment = True # + break #must add break otherwise count may vary. + if not is_comment: # Out of block_start scope so we have more appropriate behaviour. + if stripped.startswith(comments["line_comment"]): # check if line is comment or code. + metrics["comment_lines"] += 1 + else: + metrics["code_lines"] += 1 + else: + metrics["comment_lines"] += 1 #comment lines + for end in comments["block_end"]: # Block end condition. + if end in stripped: # check comment block ends + in_block_comment = False # + break # + + if metrics["total_lines"] > 0: + metrics["avg_line_length"] = total_length / metrics["total_lines"] + + # Calculate entropy. + total_chars = sum(char_counts.values()) + if total_chars > 0: + entropy = 0 + for count in char_counts.values(): + prob = count / total_chars + entropy -= prob * math.log2(prob) + metrics["file_entropy"] = entropy + + # These aren't always in 'comment_density', so calculate here. + metrics["source_lines"] = metrics["code_lines"] + metrics["comment_lines"] + metrics["comment_ratio"] = (metrics["comment_lines"] / metrics["source_lines"] * 100 + if metrics["source_lines"] > 0 else 0) # Handle potential division by zero. + return metrics + + except Exception as e: + self.logger.error(f"Error calculating size metrics: {str(e)}") + # Return 0s and basic size info on error. Still provide content length + return { + "size_bytes": len(content) if content else 0, # File Size is valuable,even in error. + "total_lines": 0, + "code_lines": 0, + "blank_lines": 0, + "comment_lines": 0, + "avg_line_length": 0, + "max_line_length": 0, + "file_entropy": 0, # file_entropy added to default values. + "source_lines": 0, # return metrics initialized 0 for other metrices. + "comment_ratio": 0 #Return default values on errors + } + + + def analyze_function_metrics(self, content: str, language: str = "Unknown") -> Dict[str, Any]: + + try: + metrics = { + "total_functions": 0, + "avg_function_length": 0, + "max_function_length": 0, + "avg_function_complexity": 0, + "max_function_complexity": 0, + "documented_functions": 0, + "function_lengths": [], # Collect all lengths + "function_complexities": [], # Collect all complexities + "function_details": [] # Store details of each function + } + + # Language-specific function patterns + patterns = { + "Python": r"(?:async\s+)?def\s+(\w+)\s*\([^)]*\)\s*(?:->.*?)?:", + "JavaScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", + "TypeScript": r"(?:async\s+)?function\s+(\w+)\s*\([^)]*\)|(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", + "Java": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:throws\s+[^{]+)?\s*\{", + "C#": r"(?:public|private|protected|\s)\s+(?:static\s+)?[a-zA-Z_<>[\]]+\s+(\w+)\s*\([^)]*\)\s*(?:where\s+[^{]+)?\s*\{", + }.get(language, r"function\s+(\w+)\s*\([^)]*\)") + + lines = content.splitlines() + current_function = None + function_start = 0 + in_function = False + function_content = [] + + brace_count = 0 #for count braces. + + for i, line in enumerate(lines): + stripped = line.strip() + + if not stripped or stripped.startswith(('/', '#')): #handle empty lines + continue + + if re.search(patterns, line): + current_function = { + "name": re.search(patterns, line).group(1), # Extract function name + "start_line": i + 1, # 1-based line numbers + "has_docstring": False, + "complexity": 1, #base complexity is one. + "nested_depth": 0, + "parameters": len(re.findall(r',', line)) + 1 if '(' in line else 0 # Count parameters + } + function_start = i #starting function line number. + in_function = True + function_content = [line] # Start collecting content + continue + + if in_function: + function_content.append(line) #add the functions to function content. + brace_count += line.count('{') - line.count('}') + + if language == "Python" and i == function_start + 1: # Check for docstring right after def + if stripped.startswith('"""') or stripped.startswith("'''"): + current_function["has_docstring"] = True + + # More robust function end detection + if (language in ["Python"] and brace_count == 0 and not line.startswith(' ')) or \ + (language not in ["Python"] and brace_count == 0 and line.rstrip().endswith('}')): #Robust function end check + + func_content = '\n'.join(function_content) #join content function for metrics + current_function["length"] = len(function_content) # lines of function + complexity_metrics = self.calculate_cyclomatic_complexity(func_content, language) + current_function["complexity"] = complexity_metrics["complexity"] # Cyclomatic complexity + + metrics["total_functions"] += 1 # Total Number of functions count. + metrics["function_lengths"].append(current_function["length"]) + metrics["function_complexities"].append(current_function["complexity"]) + metrics["max_function_length"] = max(metrics["max_function_length"],current_function["length"])# Compare current max value and store greater one. + metrics["max_function_complexity"] = max(metrics["max_function_complexity"], + current_function["complexity"]) # compare and find the max + + if current_function["has_docstring"]: + metrics["documented_functions"] += 1 # count Document function + + metrics["function_details"].append(current_function) + in_function = False + current_function = None + function_content = [] # Clear all collected datas. + + + if metrics["total_functions"] > 0: + metrics["avg_function_length"] = sum(metrics["function_lengths"]) / metrics["total_functions"] + metrics["avg_function_complexity"] = sum(metrics["function_complexities"]) / metrics["total_functions"] + metrics["documentation_ratio"] = metrics["documented_functions"] / metrics["total_functions"] + + return metrics + + except Exception as e: + self.logger.error(f"Error analyzing function metrics: {str(e)}") + # Return default values for all metrics in case of error. + return { + "total_functions": 0, + "avg_function_length": 0, + "max_function_length": 0, + "avg_function_complexity": 0, + "max_function_complexity": 0, + "documented_functions": 0, + "function_lengths": [], + "function_complexities": [], + "function_details": [], + "error": str(e) # Include the error for debugging. + } + + def _analyze_file_metrics(self, file_content) -> Optional[Dict[str, Any]]: + """Analyze metrics for a single file with proper error handling.""" + try: + # Decode the file content (assuming it's base64 encoded) + content = base64.b64decode(file_content.content).decode('utf-8') + language = RELEVANT_EXTENSIONS.get(Path(file_content.path).suffix.lower(), "Unknown") + + metrics = { + "path": file_content.path, + "metrics": {} + } + + # Size metrics (always calculated) + try: + size_metrics = self.calculate_size_metrics(content, language) + metrics["metrics"].update(size_metrics) # Store results, handling None. + except Exception as e: + self.logger.error(f"Error calculating size metrics for {file_content.path}: {str(e)}") + # Provide default values even if there is error + metrics["metrics"].update({ + "size_bytes": len(content), #we have this data even in errors. + "total_lines": len(content.splitlines()), + "code_lines": 0, + "blank_lines": 0, + "comment_lines": 0 + }) + + + # Complexity metrics (only for supported languages) + if language != "Unknown": + try: + complexity = self.calculate_cyclomatic_complexity(content, language) + metrics["metrics"]["complexity"] = complexity.get("complexity", 0) + metrics["metrics"]["cognitive_complexity"] = complexity.get("cognitive_complexity", 0) # Store cognitive. + except Exception as e: + self.logger.error(f"Error calculating complexity for {file_content.path}: {str(e)}") + metrics["metrics"].update({ + "complexity": 0, + "cognitive_complexity": 0 # Default to 0 if error. + }) + + # Halstead metrics (for supported languages) + if language in ["Python", "JavaScript", "Java"]: # Check if language is supported + try: + halstead = self.calculate_halstead_metrics(content, language) + metrics["metrics"].update(halstead) # Add the results to file data. + except Exception as e: + self.logger.error(f"Error calculating Halstead metrics for {file_content.path}: {str(e)}") + # No defaults needed, halstead already returns 0s. + + # Duplication metrics (always calculate) + try: + duplication = self.detect_code_duplication(content) + metrics["metrics"]["duplicate_segments"] = len(duplication.get("duplicate_segments", [])) + except Exception as e: + self.logger.error(f"Error detecting duplication for {file_content.path}: {str(e)}") + metrics["metrics"]["duplicate_segments"] = 0 # Set to 0 on error + + + # Function-level metrics (for supported languages). + if language != "Unknown": + try: + function_metrics = self.analyze_function_metrics(content, language) + if function_metrics and "error" not in function_metrics: # Check for None AND no error + metrics["metrics"].update(function_metrics) # + except Exception as e: + self.logger.error(f"Error analyzing functions for {file_content.path}: {str(e)}") + # no default to add as function metrics handles defaults. + + # Comment density (always calculated). + try: + comment_metrics = self.calculate_comment_density(content, language) + metrics["metrics"].update(comment_metrics) # Merge + except Exception as e: + self.logger.error(f"Error calculating comment density for {file_content.path}: {str(e)}") + metrics["metrics"].update({ + "comment_density": 0, # Defaults on error + "docstring_lines": 0 # Add other relevant metrics + }) + + + return metrics #Returns calculated data + + except Exception as e: # General Exception to prevent crash. + self.logger.error(f"Error analyzing file {file_content.path}: {str(e)}") + # Return minimal error metrics (important) + return { + "path": file_content.path, + "metrics": { + "size_bytes": 0, # Important basic metric, try to preserve. + "total_lines": 0, # and total lines + "error": str(e) + } + } +class DependencyAnalyzer: + """Handles dependency analysis with improved error handling.""" + + def __init__(self, repo): + self.repo = repo + self.logger = logging.getLogger(__name__) + self.dependency_files = { + "python": ["requirements.txt", "setup.py", "Pipfile", "pyproject.toml"], + "javascript": ["package.json", "yarn.lock", "package-lock.json"], + "java": ["pom.xml", "build.gradle"], + "ruby": ["Gemfile"], + "php": ["composer.json"], + "go": ["go.mod"], + "rust": ["Cargo.toml"], + "dotnet": ["*.csproj", "*.fsproj", "*.vbproj"] # .NET project files + } + + async def analyze_dependencies(self) -> Dict[str, Any]: + """Analyze project dependencies (async for aiohttp).""" + results = { + "dependency_files": [], # Files that specify the dependencies. + "dependencies": defaultdict(list), # Parsed dependencies. + "dependency_graph": defaultdict(list), # Relationship b/w Dependencies. + "outdated_dependencies": [], # + "security_alerts": [] # Placeholder for future security checks + } + + try: + contents = self.repo.get_contents("") + while contents: + file_content = contents.pop(0) + if file_content.type == "dir": + contents.extend(self.repo.get_contents(file_content.path)) + else: + for lang, patterns in self.dependency_files.items(): + if any(self._matches_pattern(file_content.path, pattern) for pattern in patterns): # + try: + file_text = base64.b64decode(file_content.content).decode('utf-8') # + deps = await self._parse_dependency_file(file_content.path, file_text) #parsing the files to find dependency. + if deps: #check deps is not none. + results["dependencies"][file_content.path] = deps + results["dependency_files"].append(file_content.path) # add current file to list of dependency files. + except Exception as e: + self.logger.error(f"Error parsing {file_content.path}: {str(e)}") + + + results["outdated_dependencies"] = await self._check_outdated_dependencies(results["dependencies"])# + results["dependency_graph"] = self._build_dependency_graph(results["dependencies"]) + + except Exception as e: + self.logger.error(f"Error analyzing dependencies: {str(e)}") + # No need to return default values here, as the initialized 'results' dict is sufficient + + return results + + def _matches_pattern(self, filename: str, pattern: str) -> bool: + """Check if a filename matches a given pattern (supports wildcards).""" + if pattern.startswith("*"): + return filename.endswith(pattern[1:]) # Simple wildcard match + return filename.endswith(pattern) + + async def _parse_dependency_file(self, filepath: str, content: str) -> List[Dict[str, str]]: + """Parse different dependency file formats and extract dependencies.""" + deps = [] # Initialize an empty list to hold dependencies + try: + if filepath.endswith(('requirements.txt', 'Pipfile')): #requirements.txt or pipfile + for line in content.split('\n'): + if '==' in line: + name, version = line.strip().split('==') + deps.append({"name": name, "version": version, "type": "python"}) + + elif filepath.endswith('package.json'): #package.json + data = json.loads(content) + for dep_type in ['dependencies', 'devDependencies']: # Check both dependencies and devDependencies + if dep_type in data: + for name, version in data[dep_type].items(): + # Remove semver characters like ^ and ~ for accurate comparisons + deps.append({ + "name": name, + "version": version.replace('^', '').replace('~', ''), # Remove ^ and ~ + "type": "npm" + }) + + # Add more file type parsing as needed (e.g., pom.xml for Java, Gemfile for Ruby) + + except Exception as e: + self.logger.error(f"Error parsing {filepath}: {str(e)}") + # Don't add any dependencies if parsing fails + + return deps # Always return the list, even if empty + + + async def _check_outdated_dependencies(self, dependencies: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]: + """Check for outdated dependencies using respective package registries (async).""" + outdated = [] + + async with aiohttp.ClientSession() as session: #use aiotthp for faster http requests. + for filepath, deps in dependencies.items(): + for dep in deps: + try: + if dep["type"] == "python": + async with session.get(f"https://pypi.org/pypi/{dep['name']}/json") as response: + if response.status == 200: + data = await response.json() + latest_version = data["info"]["version"] + # Use packaging.version for robust version comparison + if version.parse(latest_version) > version.parse(dep["version"]): + outdated.append({ + "name": dep["name"], + "current_version": dep["version"], + "latest_version": latest_version, + "type": "python" + }) + elif dep["type"] == "npm": + # Use npm registry API + async with session.get(f"https://registry.npmjs.org/{dep['name']}") as response: + if response.status == 200: + data = await response.json() + latest_version = data["dist-tags"]["latest"] + if version.parse(latest_version) > version.parse(dep['version']): + outdated.append({ + "name": dep['name'], + "current_version": dep["version"], + "latest_version": latest_version, + "type": "npm" + }) + # Add checks for other package types (Java, Ruby, etc.) + + except Exception as e: + self.logger.error(f"Error checking version for {dep['name']}: {str(e)}") + # Continue checking other dependencies even if one fails + + return outdated # Return the list, even if empty + + def _build_dependency_graph(self, dependencies: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[str]]: + """Build a dependency graph to visualize relationships (using networkx).""" + + graph = nx.DiGraph() # directed graph. + + try: + for dep_file, deps in dependencies.items(): + for dep in deps: + # Add edges to represent dependencies + graph.add_edge(dep_file, dep["name"]) # Dep file depends on individual libraries. + + # Convert to a dictionary of lists for easier handling + return nx.to_dict_of_lists(graph) + + except Exception as e: + self.logger.error(f"Error building dependency graph: {str(e)}") + return defaultdict(list) # Return an empty graph in case of error +class TestAnalyzer: + """Handles test analysis.""" + + def __init__(self, repo): + self.repo = repo + self.logger = logging.getLogger(__name__) # Add logger + self.test_patterns = { + "python": ["test_*.py", "*_test.py", "tests/*.py"], + "javascript": ["*.test.js", "*.spec.js", "__tests__/*.js"], + "java": ["*Test.java", "*Tests.java"], + "ruby": ["*_test.rb", "*_spec.rb"], + "go": ["*_test.go"] + } + + def analyze_tests(self) -> Dict[str, Any]: + """Analyze test files, test counts, and (if possible) coverage information.""" + results = { + "test_files": [], + "test_count": 0, + "coverage_data": {}, # Dictionary to hold any parsed coverage information. + "test_patterns": defaultdict(list) # Store the information about diff. testing pattern. + } + + try: + contents = self.repo.get_contents("") + while contents: + content = contents.pop(0) + if content.type == "dir": + contents.extend(self.repo.get_contents(content.path)) + elif self._is_test_file(content.path): + results["test_files"].append(content.path) + test_metrics = self._analyze_test_file(content) #metrics of single files. + results["test_patterns"][content.path] = test_metrics # Store results. + results["test_count"] += test_metrics.get("test_count", 0) # Safely get test_count + + results["coverage_data"] = self._find_coverage_data() # Get any coverage. + + except Exception as e: + self.logger.error(f"Error analyzing tests: {str(e)}") # Use logger + + return results # Always return results + + + def _is_test_file(self, filepath: str) -> bool: + """Check if a file is likely to be a test file, based on common patterns.""" + for patterns in self.test_patterns.values(): + for pattern in patterns: + if Path(filepath).match(pattern): # Use Path.match for wildcard matching + return True + return False + + def _analyze_test_file(self, file_content) -> Dict[str, Any]: + """Analyze an individual test file to count tests, assertions, etc.""" + try: + content = base64.b64decode(file_content.content).decode('utf-8') + metrics = { + "test_count": 0, + "assertions": 0, + "test_classes": 0 # If using class-based tests + } + + # Count test cases (using regex for common patterns) + metrics["test_count"] += len(re.findall(r'def test_', content)) # Python + metrics["test_count"] += len(re.findall(r'it\s*\([\'""]', content)) # JavaScript (Jest/Mocha) + metrics["assertions"] += len(re.findall(r'assert', content)) # General assertions + metrics["test_classes"] += len(re.findall(r'class\s+\w+Test', content)) # test class patterns. + + + return metrics + + except Exception as e: + self.logger.error(f"Error analyzing test file: {str(e)}") # Use logger + return {} # Return empty dict on error + + def _find_coverage_data(self) -> Dict[str, Any]: + """Try to find coverage information (if available, e.g., from coverage reports).""" + coverage_data = { + "total_coverage": None, + "file_coverage": {}, # If file-level data available. + "coverage_report_found": False # for indicating we find coverage files. + } + try: + # Look for common coverage report files + coverage_files = [ + ".coverage", # Python coverage.py + "coverage.xml", # Cobertura (Python, Java) + "coverage.json", # Jest, other JavaScript + "coverage/lcov.info", # LCOV (C/C++, others) + "coverage/coverage-final.json" # Istanbul (JavaScript) + ] -def respond( - message, - history: list[tuple[str, str]], - system_message, - max_tokens, - temperature, - top_p, -): - messages = [{"role": "system", "content": system_message}] + contents = self.repo.get_contents("") + while contents: + content = contents.pop(0) + if content.type == "dir": + contents.extend(self.repo.get_contents(content.path)) + elif any(content.path.endswith(f) for f in coverage_files): + coverage_data["coverage_report_found"] = True # set covarage to True, Indicate report present. + parsed_coverage = self._parse_coverage_file(content) # Try to parse. + if parsed_coverage: #check parse_coverage is present + coverage_data.update(parsed_coverage) # Merge into result - for val in history: - if val[0]: - messages.append({"role": "user", "content": val[0]}) - if val[1]: - messages.append({"role": "assistant", "content": val[1]}) + except Exception as e: + self.logger.error(f"Error finding coverage data: {str(e)}") - messages.append({"role": "user", "content": message}) + return coverage_data - response = "" + def _parse_coverage_file(self, file_content) -> Dict[str, Any]: + """Parse a coverage report file (handles multiple formats).""" + try: + content = base64.b64decode(file_content.content).decode('utf-8') - for message in client.chat_completion( - messages, - max_tokens=max_tokens, - stream=True, - temperature=temperature, - top_p=top_p, - ): - token = message.choices[0].delta.content + if file_content.path.endswith('.json'): + data = json.loads(content) + # Handle different JSON formats (e.g., coverage.py, Istanbul) + if 'total' in data: # coverage.py format + return { + 'total_coverage': data['total'].get('lines', {}).get('percent', 0), + 'file_coverage': { + file: stats.get('lines', {}).get('percent', 0) + for file, stats in data.get('files', {}).items() + } + } + # Add handling for other JSON formats (e.g., Istanbul) as needed - response += token - yield response + elif file_content.path.endswith('.xml'): + # Parse XML (Cobertura format) + from xml.etree import ElementTree #for parse XML format + root = ElementTree.fromstring(content) + total = float(root.get('line-rate', 0)) * 100 # Overall coverage + file_coverage = {} + # Extract coverage per class/file + for class_elem in root.findall('.//class'): + filename = class_elem.get('filename', '') + line_rate = float(class_elem.get('line-rate', 0)) * 100 + file_coverage[filename] = line_rate + + return { + 'total_coverage': total, + 'file_coverage': file_coverage + } + + elif file_content.path.endswith('lcov.info'): + # Parse LCOV format + total_lines = 0 + covered_lines = 0 + current_file = None + file_coverage = {} + + for line in content.split('\n'): + if line.startswith('SF:'): # Source file + current_file = line[3:].strip() + elif line.startswith('LH:'): # Lines hit + covered = int(line[3:]) + covered_lines += covered + elif line.startswith('LF:'): # Lines found + total = int(line[3:]) + total_lines += total + if current_file and total > 0: # calculate coverage. + file_coverage[current_file] = (covered / total) * 100 + + return { + 'total_coverage': (covered_lines / total_lines * 100) if total_lines > 0 else 0, # handle Total lines may be 0 + 'file_coverage': file_coverage + } + + except Exception as e: + self.logger.error(f"Error parsing coverage file: {str(e)}") + + return {} # Return empty dict on error + + def analyze_test_quality(self, content: str) -> Dict[str, Any]: + """ + Analyze the quality of the tests themselves. + """ + try: + metrics = { + "assertion_density": 0, # Assertions per line of test code + "test_setup_complexity": 0, # How complex is the test setup? + "mock_usage": 0, # How frequently are mocks used? + "test_patterns": [], # List of identified test patterns and best practices. + "anti_patterns": [] # list of identified Anti patterns + } + + lines = content.splitlines() + assertion_count = sum(1 for line in lines if 'assert' in line) # check assertion present. + metrics["assertion_density"] = assertion_count / len(lines) if lines else 0 + + setup_lines = [] + in_setup = False + for line in lines: + if 'def setUp' in line or 'def setup' in line: + in_setup = True + elif in_setup and line.strip() and not line.startswith(' '): # if present it has any leading space of not. + in_setup = False + if in_setup: + setup_lines.append(line) + + metrics["test_setup_complexity"] = len(setup_lines) + + mock_count = sum(1 for line in lines if 'mock' in line.lower()) # count mock if present + metrics["mock_usage"] = mock_count + + #detect patterns. + if any('parameterized' in line for line in lines): + metrics["test_patterns"].append("parameterized_tests") # + if any('fixture' in line for line in lines): + metrics["test_patterns"].append("fixture_usage")# + + # Identify potential anti-patterns + if any('time.sleep' in line for line in lines): + metrics["anti_patterns"].append("sleep_in_tests") + if any('test' not in line.lower() for line in lines if line.strip().startswith('def')): # all method related to test or not. + metrics["anti_patterns"].append("non_test_methods") # anti_patterns if other extra methods there. + + return metrics + except Exception as e: + self.logger.error(f"Error analyzing test quality: {str(e)}") + return { # Return default 0 values on error. + "assertion_density": 0, + "test_setup_complexity": 0, + "mock_usage": 0, + "test_patterns": [], + "anti_patterns": [] + } +class DocumentationAnalyzer: + """Handles documentation analysis.""" + + def __init__(self, repo): + self.repo = repo + self.logger = logging.getLogger(__name__) # Add logger + self.doc_patterns = [ + "README.md", + "CONTRIBUTING.md", + "CHANGELOG.md", + "LICENSE", + "docs/", # Common documentation directories + "documentation/", + "wiki/" # Consider wiki as documentation + ] + + def analyze_documentation(self) -> Dict[str, Any]: + """Analyze repository documentation (README, CONTRIBUTING, API docs, etc.).""" + results = { + "readme_analysis": None, + "contributing_guidelines": None, + "api_documentation": None, # Placeholder - can be expanded + "documentation_files": [], # All documantation. + "wiki_pages": [], # If the repo has a wiki + "documentation_coverage": 0.0 # Overall score + } + + try: + # Analyze README + readme = self._get_file_content("README.md") + if readme: + results["readme_analysis"] = self._analyze_readme(readme) + + # Check contributing guidelines + contributing = self._get_file_content("CONTRIBUTING.md") + if contributing: + results["contributing_guidelines"] = self._analyze_contributing(contributing) + + + contents = self.repo.get_contents("") + while contents: + content = contents.pop(0) + if content.type == "dir": + # Check for dedicated documentation directories + if content.path.lower() in ["docs", "documentation"]: + results["documentation_files"].extend(self._analyze_doc_directory(content.path)) + contents.extend(self.repo.get_contents(content.path)) + + # Check for specific documentation files + elif any(content.path.endswith(pattern) for pattern in self.doc_patterns): + results["documentation_files"].append(content.path) + + results["documentation_coverage"] = self._calculate_doc_coverage() + + # Get wiki pages if available + try: + wiki_pages = self.repo.get_wiki_pages() # Requires PyGithub 2.x + results["wiki_pages"] = [page.title for page in wiki_pages] + except: # GitHub API might raise an exception if no wiki + pass + + except Exception as e: + self.logger.error(f"Error analyzing documentation: {str(e)}") # Use logger + + return results # Always return results + + def _get_file_content(self, filepath: str) -> Optional[str]: + """Helper to get the content of a specific file (handles not found).""" + try: + content = self.repo.get_contents(filepath) + return base64.b64decode(content.content).decode('utf-8') + except: + return None # File not found + + def _analyze_readme(self, content: str) -> Dict[str, Any]: + """Analyze the README content for completeness and key information.""" + analysis = { + "sections": [], # List of identified sections (e.g., from headings) + "has_quickstart": False, # Quick start guide + "has_installation": False, # Installation instructions + "has_usage": False, # Basic usage examples + "has_api_docs": False, # Link to API docs? + "has_examples": False, # Code examples + "word_count": len(content.split()), + "completeness_score": 0.0 + } + + # Extract sections (using regex for headings) + sections = re.findall(r'^#+\s+(.+)$', content, re.MULTILINE) # match and return the content. + analysis["sections"] = sections + + # Check for key components (using regex for robustness) + analysis["has_quickstart"] = bool(re.search(r'quick\s*start', content, re.I)) # Case-insensitive + analysis["has_installation"] = bool(re.search(r'install|setup', content, re.I)) + analysis["has_usage"] = bool(re.search(r'usage|how\s+to\s+use', content, re.I)) # More flexible matching. + analysis["has_api_docs"] = bool(re.search(r'api|documentation', content, re.I)) + analysis["has_examples"] = bool(re.search(r'example|demo', content, re.I)) # Broader example terms + + # Calculate a simple completeness score + key_elements = [ + analysis["has_quickstart"], + analysis["has_installation"], + analysis["has_usage"], + analysis["has_api_docs"], + analysis["has_examples"] + ] + analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 + + return analysis + + def _analyze_contributing(self, content: str) -> Dict[str, Any]: + """Analyze CONTRIBUTING.md for guidelines.""" + analysis = { + "has_code_style": False, # Code Style Guide + "has_pr_process": False, # How to make PR + "has_issue_guidelines": False, #Guidelines for reporting issue. + "has_setup_instructions": False, # setup environment Instructions. + "completeness_score": 0.0 + } + analysis["has_code_style"] = bool(re.search(r'code\s+style|coding\s+standards', content, re.I)) + analysis["has_pr_process"] = bool(re.search(r'pull\s+request|PR', content, re.I)) # checking pull request + analysis["has_issue_guidelines"] = bool(re.search(r'issue|bug\s+report', content, re.I)) #issue and bug report. + analysis["has_setup_instructions"] = bool(re.search(r'setup|getting\s+started', content, re.I))# Setup. + + key_elements = [ #key components present or not. + analysis["has_code_style"], + analysis["has_pr_process"], + analysis["has_issue_guidelines"], + analysis["has_setup_instructions"] + ] + analysis["completeness_score"] = sum(key_elements) / len(key_elements) * 100 # calculate + return analysis + + def _analyze_doc_directory(self, directory: str) -> List[str]: + """Analyze a dedicated documentation directory (if present).""" + doc_files = [] + try: + contents = self.repo.get_contents(directory) + for content in contents: + if content.type == "file": + doc_files.append(content.path) + except Exception as e: + self.logger.error(f"Error analyzing doc directory: {str(e)}") # Use logger + return doc_files + + def _calculate_doc_coverage(self) -> float: + """Calculate an overall documentation coverage score (heuristic).""" + # This is a simplified scoring system and should be customized + score = 0.0 + total_points = 0 + + # Check README presence and quality + readme = self._get_file_content("README.md") + if readme: + readme_analysis = self._analyze_readme(readme) + score += readme_analysis["completeness_score"] / 100 * 40 # README is worth 40% + total_points += 40 + + # Check contributing guidelines + contributing = self._get_file_content("CONTRIBUTING.md") + if contributing: + contributing_analysis = self._analyze_contributing(contributing) + score += contributing_analysis["completeness_score"] / 100 * 20 # Contributing is worth 20% + total_points += 20 + + # Check API documentation (basic presence check) + if any(f.endswith(('.md', '.rst')) for f in self.doc_patterns): + score += 20 # API docs are worth 20% + total_points += 20 + + # Check for examples (this is simplified - could be improved) + if any('example' in f.lower() for f in self.doc_patterns): # Case-insensitive check + score += 20 # Examples are worth 20% + total_points += 20 + + return (score / total_points * 100) if total_points > 0 else 0.0 # Avoid division by 0 + +class CommunityAnalyzer: + """Handles community metrics analysis.""" + + def __init__(self, repo): + self.repo = repo + self.logger = logging.getLogger(__name__) # Add logger + + async def analyze_community(self) -> Dict[str, Any]: + """Analyze community engagement, health, and contribution patterns.""" + results = { + "engagement_metrics": await self._get_engagement_metrics(), # Await async calls + "issue_metrics": await self._analyze_issues(), # Await for analysis + "pr_metrics": await self._analyze_pull_requests(), # Await for PR + "contributor_metrics": self._analyze_contributors(), + "discussion_metrics": await self._analyze_discussions() # If discussions are enabled + } + + return results # Returns Calculated community metrics. + + async def _get_engagement_metrics(self) -> Dict[str, Any]: + """Get basic repository engagement metrics (stars, forks, watchers).""" + metrics = { + "stars": self.repo.stargazers_count, + "forks": self.repo.forks_count, + "watchers": self.repo.subscribers_count, + "star_history": [], # Historical star data + "fork_history": [] # Historical fork data + } + + try: + # Get star history (last 100 stars for efficiency) + stargazers = self.repo.get_stargazers_with_dates() + metrics["star_history"] = [ + {"date": star.starred_at.isoformat(), "count": i + 1} # count: i+1 to show progression. + for i, star in enumerate(stargazers) + ] + + # Get fork history + forks = self.repo.get_forks() # No need for with_date. + metrics["fork_history"] = [ + {"date": fork.created_at.isoformat(), "count": i + 1} + for i, fork in enumerate(forks) + ] + except Exception as e: + self.logger.error(f"Error getting engagement metrics: {str(e)}") # Use logger + + return metrics # Return calculated metrics data. + + async def _analyze_issues(self) -> Dict[str, Any]: + """Analyze repository issues (open, closed, response times, labels).""" + metrics = { + "total_issues": 0, + "open_issues": 0, + "closed_issues": 0, + "avg_time_to_close": None, # Average time to close an issue + "issue_categories": defaultdict(int), # Categorize issues by label + "response_times": [] # List of response times + } + + try: + issues = self.repo.get_issues(state='all') # Get all issues (open and closed) + for issue in issues: + metrics["total_issues"] += 1 + if issue.state == 'open': + metrics["open_issues"] += 1 + else: + metrics["closed_issues"] += 1 + # Calculate time to close (if closed_at is available) + if issue.closed_at and issue.created_at: #Calculate time,if issue closed. + time_to_close = (issue.closed_at - issue.created_at).total_seconds() + metrics["response_times"].append(time_to_close) + + # Categorize issues by labels + for label in issue.labels: + metrics["issue_categories"][label.name] += 1 + + # Calculate average response time + if metrics["response_times"]: # Calculate Avg_response only if any time available. + metrics["avg_time_to_close"] = sum(metrics["response_times"]) / len(metrics["response_times"]) #avg = tot / no. + + except Exception as e: + self.logger.error(f"Error analyzing issues: {str(e)}") # Use logger + + return metrics + + async def _analyze_pull_requests(self) -> Dict[str, Any]: + """Analyze pull requests (open, closed, merged, review times, sizes).""" + metrics = { + "total_prs": 0, + "open_prs": 0, + "merged_prs": 0, + "closed_prs": 0, + "avg_time_to_merge": None, # Average time to merge a PR + "pr_sizes": defaultdict(int), # Categorize PRs by size (lines of code) + "review_times": [] # List of review times + } + + try: + pulls = self.repo.get_pulls(state='all') # Get all PRs (open, closed, merged) + for pr in pulls: + metrics["total_prs"] += 1 + if pr.state == 'open': + metrics["open_prs"] += 1 + elif pr.merged: + metrics["merged_prs"] += 1 + # Calculate time to merge + if pr.merged_at and pr.created_at: + time_to_merge = (pr.merged_at - pr.created_at).total_seconds() + metrics["review_times"].append(time_to_merge) #store calculated value + else: + metrics["closed_prs"] += 1 # + + # Categorize PR sizes (simplified, based on additions + deletions) + if pr.additions + pr.deletions < 10: + metrics["pr_sizes"]["xs"] += 1 # Extra small + elif pr.additions + pr.deletions < 50: + metrics["pr_sizes"]["s"] += 1 # Small + elif pr.additions + pr.deletions < 250: + metrics["pr_sizes"]["m"] += 1 # Medium + elif pr.additions + pr.deletions < 1000: + metrics["pr_sizes"]["l"] += 1 # Large + else: + metrics["pr_sizes"]["xl"] += 1 # Extra large + + # Calculate average review time + if metrics["review_times"]: #calculate Avg_time to merge if review times available. + metrics["avg_time_to_merge"] = sum(metrics["review_times"]) / len(metrics["review_times"]) #calculate Average. + + except Exception as e: + self.logger.error(f"Error analyzing pull requests: {str(e)}") # Use logger + + return metrics # retrun calculated metrics value. + + def _analyze_contributors(self) -> Dict[str, Any]: + """Analyze contributor patterns and engagement.""" + metrics = { + "total_contributors": 0, + "active_contributors": 0, # Contributors active in the last 90 days + "contributor_types": defaultdict(int), # User, Organization, Bot + "contribution_frequency": defaultdict(int), # High, medium, low + "core_contributors": [] # List of core contributors (e.g., top 10%) + } + + try: + contributors = self.repo.get_contributors() + for contributor in contributors: + metrics["total_contributors"] += 1 + + # Check for recent activity (last 90 days) + recent_commits = contributor.get_commits(since=datetime.now() - timedelta(days=90)) # active since + if recent_commits.totalCount > 0: + metrics["active_contributors"] += 1 + + # Categorize contributor types + metrics["contributor_types"][contributor.type] += 1 # increment by type. + + # Analyze contribution frequency (simplified) + if contributor.contributions > 100: #Contribution level checking. + metrics["contribution_frequency"]["high"] += 1 + # Consider contributors with >100 contributions as "core" + metrics["core_contributors"].append({ + "login": contributor.login, + "contributions": contributor.contributions, # store + "type": contributor.type #Store. + }) + elif contributor.contributions > 20: + metrics["contribution_frequency"]["medium"] += 1 # store in medium if condition satisfy. + else: + metrics["contribution_frequency"]["low"] += 1# + + except Exception as e: + self.logger.error(f"Error analyzing contributors: {str(e)}") # Use logger + + return metrics #return Calculated Contributer metrics + + async def _analyze_discussions(self) -> Dict[str, Any]: + """Analyze repository discussions (if enabled).""" + metrics = { + "total_discussions": 0, + "active_discussions": 0, # Discussions with recent activity + "categories": defaultdict(int), # Discussion categories + "avg_responses": 0, # Average number of responses per discussion + "response_times": [] # List of response times + } + + try: + # Check if discussions are enabled + if self.repo.has_discussions: # first check for discussion enabled. + discussions = self.repo.get_discussions() # retrive all the discussion using get_discussions. + total_responses = 0 + + for discussion in discussions: + metrics["total_discussions"] += 1 + # Check for active discussions (simplified: any comments = active) + if discussion.comments > 0: + metrics["active_discussions"] += 1 + total_responses += discussion.comments # Calculate Total no of comments. + + # Categorize discussions + metrics["categories"][discussion.category.name] += 1 + + # Calculate response times (time to first response) + if discussion.comments > 0: + first_response = discussion.get_comments().reversed[0] # Get first comment + response_time = (first_response.created_at - discussion.created_at).total_seconds() # time calcualtion. + metrics["response_times"].append(response_time) # append that. + + # Calculate average responses per discussion + if metrics["active_discussions"] > 0: # Calculate only if value present. + metrics["avg_responses"] = total_responses / metrics["active_discussions"] + except Exception as e: + self.logger.error(f"Error analyzing discussions: {str(e)}") # Use logger + + return metrics +class RepositoryAnalyzer: + """Main class to analyze a GitHub repository.""" + + def __init__(self, repo_url: str, github_token: str): + self.logger = logging.getLogger(__name__) + self.gh = Github(github_token) # Keep for some top-level calls + self.gh_handler = GitHubAPIHandler(github_token) # Use the handler + self.code_metrics = CodeMetricsAnalyzer() + + parts = repo_url.rstrip('/').split('/') + if len(parts) < 2: + raise ValueError("Invalid repository URL format") + + self.repo_name = parts[-1] + self.owner = parts[-2] + self.analysis_data = { # Initialize data here + "basic_info": {}, + "structure": {}, + "code_metrics": {}, + "dependencies": {}, + "tests": {}, + "documentation": {}, + "community": {}, + "visualizations": {} + } + + try: + self.repo = self.gh_handler.get_repository(repo_url) # Use handler + + # Initialize other analyzers *after* successfully getting the repo + self.dependency_analyzer = DependencyAnalyzer(self.repo) + self.test_analyzer = TestAnalyzer(self.repo) + self.doc_analyzer = DocumentationAnalyzer(self.repo) + self.community_analyzer = CommunityAnalyzer(self.repo) + except Exception as e: + self.logger.error(f"Failed to initialize repository analyzer: {str(e)}") + raise + + async def analyze(self) -> Dict[str, Any]: + """Perform the full repository analysis.""" + try: + # Basic repository information + self.analysis_data["basic_info"] = { + "name": self.repo.name, + "owner": self.repo.owner.login, + "description": self.repo.description or "No description available", # Handle None + "stars": self.repo.stargazers_count, + "forks": self.repo.forks_count, + "created_at": self.repo.created_at.isoformat(), # Use isoformat() + "last_updated": self.repo.updated_at.isoformat(), + "primary_language": self.repo.language or "Not specified", + } + + # Analyze repository structure with sampling + self.analysis_data["structure"] = await self._analyze_structure() + + # Analyze code patterns and metrics + self.analysis_data["code_metrics"] = await self._analyze_code_metrics() + + # Analyze dependencies + self.analysis_data["dependencies"] = await self.dependency_analyzer.analyze_dependencies() + + # Analyze tests and coverage + self.analysis_data["tests"] = self.test_analyzer.analyze_tests() + + # Analyze documentation + self.analysis_data["documentation"] = self.doc_analyzer.analyze_documentation() + + # Analyze community health + self.analysis_data["community"] = await self.community_analyzer.analyze_community() + + # Generate visualizations + self.analysis_data["visualizations"] = await self._generate_visualizations() + + return self.analysis_data # Return the populated dict + + except Exception as e: + self.logger.error(f"Error during analysis: {str(e)}") + raise + async def _analyze_structure(self) -> Dict[str, Any]: + """Analyze the repository's file and directory structure, with sampling.""" + structure = { + "files": defaultdict(int), # File type counts (e.g., .py, .js) + "directories": set(), # Unique directory paths + "total_size": 0, # Total size in bytes + "directory_tree": defaultdict(list), # Parent -> [children] + "file_samples": [] # Sample files for detailed analysis + } + + try: + all_files = [] # Store all relevant files first + contents = self.repo.get_contents("") + + while contents: + content = contents.pop(0) + if content.type == "dir": + structure["directories"].add(content.path) + # Build directory tree structure + structure["directory_tree"][os.path.dirname(content.path)].append(content.path) #correct way + contents.extend(self.repo.get_contents(content.path)) + else: + ext = Path(content.path).suffix.lower() # Get lowercase extension + # Only consider relevant files + if ext in RELEVANT_EXTENSIONS: + structure["files"][ext] += 1 # Increment count for the file type + structure["total_size"] += content.size + all_files.append(content) + + # Smart sampling of files + if all_files: + # Stratified sampling based on file types + samples_per_type = min(5, max(1, len(all_files) // len(structure["files"]) if structure["files"] else 1)) # At least one sample + for ext in structure["files"].keys(): + ext_files = [f for f in all_files if f.path.endswith(ext)] #select the all file + if ext_files: + # Sort by size, and select a diverse sample + ext_files.sort(key=lambda x: x.size) + total_samples = min(samples_per_type, len(ext_files)) + # Take samples evenly across the size range + step = max(1, len(ext_files) // total_samples) + for i in range(0, len(ext_files), step)[:total_samples]:# Select diverse files from list. + structure["file_samples"].append({ + "path": ext_files[i].path, + "size": ext_files[i].size, + "type": RELEVANT_EXTENSIONS.get(ext, "Unknown") # Get language + }) + + + except Exception as e: + self.logger.error(f"Error analyzing structure: {str(e)}") + # Don't need to return defaults if 'structure' dict is initialized. + + return { + "file_types": dict(structure["files"]), # Convert defaultdict to dict + "directory_count": len(structure["directories"]), + "total_size": structure["total_size"], + "file_count": sum(structure["files"].values()), # Total relevant files + "directory_tree": dict(structure["directory_tree"]), # convert + "file_samples": structure["file_samples"] + } + + async def _analyze_code_metrics(self) -> Dict[str, Any]: + """Analyze code metrics for a sample of files, with parallel processing.""" + metrics = { + "complexity_metrics": defaultdict(list), # Cyclomatic/cognitive, nesting + "duplication_metrics": defaultdict(list), + "function_metrics": defaultdict(list), # From function analysis + "comment_metrics": defaultdict(list), # Comment density + "language_metrics": defaultdict(dict) # Aggregate by language + } + + try: + # Get all relevant files + contents = self.repo.get_contents("") + files_to_analyze = [] + + while contents: + content = contents.pop(0) + if content.type == "dir": + contents.extend(self.repo.get_contents(content.path)) + elif Path(content.path).suffix.lower() in RELEVANT_EXTENSIONS: # Check file. + files_to_analyze.append(content) + + # Use parallel processing for file analysis + with ThreadPoolExecutor(max_workers=min(10, len(files_to_analyze))) as executor: # Limit max worker upto 10. + futures = [] + for file_content in files_to_analyze: + futures.append(executor.submit(self.code_metrics._analyze_file_metrics, file_content)) # passing arguments + + for future in futures: # + try: + file_metrics = future.result() # Collect the results from the File Analysis + if file_metrics: + language = RELEVANT_EXTENSIONS.get(Path(file_metrics["path"]).suffix.lower(), "Unknown") + + # Aggregate metrics (by language, for example) + # Correctly handle string keys for metrics + for metric_type, value in file_metrics["metrics"].items(): + if isinstance(value, (int, float)): + metrics.setdefault(f"{metric_type}_metrics", defaultdict(list))[language].append(value) # store + + # Update language-specific metrics + if language not in metrics["language_metrics"]: + metrics["language_metrics"][language] = { + "file_count": 0, + "total_lines": 0, + "total_complexity": 0 + } + lang_metrics = metrics["language_metrics"][language] #get value based on language. + lang_metrics["file_count"] += 1 + lang_metrics["total_lines"] += file_metrics["metrics"].get("total_lines", 0) # Total lines addition. + lang_metrics["total_complexity"] += file_metrics["metrics"].get("complexity", 0) #complexity count + + + except Exception as e: + self.logger.error(f"Error processing file metrics: {str(e)}") + + return metrics # return aggregated + + except Exception as e: + self.logger.error(f"Error analyzing code metrics: {str(e)}") + return metrics # Return the initialized dict (possibly empty) + + + async def _generate_visualizations(self) -> Dict[str, Any]: + """Generate visualizations from the analyzed data (using matplotlib, seaborn, etc.).""" + visualizations = {} + + try: + # Language distribution pie chart + if self.analysis_data.get("structure", {}).get("file_types"): + fig, ax = plt.subplots() + languages = self.analysis_data["structure"]["file_types"] + plt.pie(languages.values(), labels=languages.keys(), autopct='%1.1f%%') + plt.title("Language Distribution") + from io import BytesIO + buffer = BytesIO() # convert bytes + plt.savefig(buffer, format='png') + visualizations["language_distribution"] = base64.b64encode(buffer.getvalue()).decode() + plt.close() + + + # Code complexity heatmap (example using average complexity) + if self.analysis_data.get("code_metrics", {}).get("complexity_metrics"): + complexity_data = [] + for lang, values in self.analysis_data["code_metrics"]["complexity_metrics"].items(): + if values: # Ensure there are values to average + complexity_data.append({ + "language": lang, + "avg_complexity": sum(values) / len(values) + }) + + if complexity_data: # If Data present generate graph. + df = pd.DataFrame(complexity_data) + plt.figure(figsize=(10, 6)) + sns.barplot(data=df, x="language", y="avg_complexity") + plt.title("Average Code Complexity by Language") + plt.xticks(rotation=45) # Rotate x-axis labels + buffer = BytesIO() + plt.savefig(buffer, format='png', bbox_inches='tight') # Improve layout + visualizations["complexity_distribution"] = base64.b64encode(buffer.getvalue()).decode() + plt.close() + + # Commit activity heatmap (example) + if self.analysis_data.get("community", {}).get("commit_history"): #check whether community & commit-history metrics + commit_data = self.analysis_data["community"]["commit_history"] + df = pd.DataFrame(commit_data) + df['date'] = pd.to_datetime(df['date']) # change into date time for visualization + df = df.set_index('date') + # Resample to daily counts + df = df.resample('D').count() + + plt.figure(figsize=(12, 4)) # fixed size. + sns.heatmap(df.pivot_table(index=df.index.dayofweek, columns=df.index.month, values='count', aggfunc='sum')) # cretae heat map + plt.title("Commit Activity Heatmap") #tile. + buffer = BytesIO() # + plt.savefig(buffer, format='png', bbox_inches='tight') + visualizations["commit_heatmap"] = base64.b64encode(buffer.getvalue()).decode() # + plt.close() # + + # Add more visualizations as needed (e.g., dependency graph, test coverage) + + except Exception as e: + self.logger.error(f"Error generating visualizations: {str(e)}") + + return visualizations # Even if empty + + +# --- Prompt Creation and LLM Interaction --- + +def create_enhanced_analysis_prompt(analysis_data: Dict[str, Any]) -> str: + """Create an enhanced prompt for the LLM analysis.""" + return f"""You are an expert code analyst with deep experience in software architecture, development practices, and team dynamics. +Analyze the provided repository data and create a detailed, insightful analysis using the following sections: + +# Repository Analysis for {analysis_data['basic_info']['name']} + +## 📊 Project Overview +[Analyze the basic repository information, including: +- Project purpose and description +- Repository age and activity level +- Key metrics (stars, forks, contributors) +- Primary technologies used +- Overall project health indicators] + +## 🏗️ Architecture and Code Organization +[Analyze the repository structure and code organization: +- Directory structure and organization patterns +- Code distribution across languages +- File organization and modularity +- Architectural patterns +- Development standards and practices +- Code complexity distribution +- Potential architectural improvements] + +## 💻 Code Quality and Metrics +[Provide detailed analysis of code quality metrics: +- Cyclomatic complexity trends +- Code duplication patterns +- Function length and complexity +- Comment density and documentation quality +- Test coverage and quality +- Areas for potential improvement] + +## 📦 Dependencies and Security +[Analyze the project's dependencies: +- Major dependencies and their versions +- Outdated dependencies +- Security vulnerabilities +- Dependency graph complexity +- Licensing considerations] + +## 📚 Documentation Assessment +[Evaluate the project's documentation: +- README completeness and quality +- API documentation coverage +- Contributing guidelines +- Code comments and inline documentation +- Examples and tutorials +- Documentation maintenance status] + +## 🧪 Testing and Quality Assurance +[Analyze testing practices: +- Test coverage metrics +- Testing patterns and approaches +- CI/CD implementation +- Quality assurance processes +- Areas needing additional testing] + +## 👥 Community Health and Engagement +[Evaluate community aspects: +- Contributor demographics and activity +- Issue and PR response times +- Community engagement metrics +- Communication patterns +- Governance model] + +## 📈 Development Trends +[Analyze development patterns: +- Commit frequency and distribution +- Code change patterns +- Release cycle analysis +- Development velocity +- Team collaboration patterns] + +## 🚀 Performance and Scalability +[Assess technical characteristics: +- Code performance indicators +- Scalability considerations +- Resource usage patterns +- Technical debt indicators +- Optimization opportunities] + +## 💡 Key Insights +[Summarize the most important findings: +- Top 3 strengths +- Top 3 areas for improvement +- Unique characteristics +- Notable patterns or practices +- Risk factors] + +## 📋 Recommendations +[Provide actionable recommendations: +- Immediate improvement opportunities +- Long-term strategic suggestions +- Specific tools or practices to consider +- Priority areas for focus +- Resource allocation suggestions] + +Please analyze the following repository data thoroughly and provide detailed insights for each section: + +{json.dumps(analysis_data, indent=2)} """ -For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface + + +async def analyze_repository(repo_url: str, github_token: str, gemini_key: str, progress=gr.Progress()) -> Tuple[str, str, str]: + """Analyze repository and generate LLM summary (async, with progress).""" + try: + # Re-initialize tokens each time + initialize_tokens(github_token, gemini_key) # Ensure fresh tokens + + progress(0, desc="Initializing repository analysis...") + analyzer = RepositoryAnalyzer(repo_url, github_token) + + progress(0.3, desc="Analyzing repository structure and patterns...") + analysis_data = await analyzer.analyze() # Await the analysis + + progress(0.7, desc="Generating comprehensive analysis...") + + # Use the more powerful Gemini 1.5 Pro model + model = genai.GenerativeModel( + model_name="gemini-1.5-pro", # Use 1.5 Pro + generation_config={ + "temperature": 0.7, + "top_p": 0.95, # Use nucleus sampling + "top_k": 40, + "max_output_tokens": 8192, # Increased token limit + } + ) + + prompt = create_enhanced_analysis_prompt(analysis_data) # Use a better, sectioned prompt. + + # Use streaming for a better user experience + chat = model.start_chat(history=[]) # Start fresh + response = chat.send_message(prompt) + + + progress(0.9, desc="Saving analysis results...") + # Save analysis data to a temporary file (for follow-up Q&A) + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: + json.dump(analysis_data, f, indent=2) + analysis_file = f.name + + progress(1.0, desc="Analysis complete!") + return response.text, analysis_file, "✅ Analysis completed successfully!" + + except Exception as e: + error_message = f"❌ Error analyzing repository: {str(e)}" + return "", "", error_message # Return empty strings for Markdown and file + + +async def ask_question(question: str, analysis_file: str, chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: + """Process a follow-up question about the analysis with enhanced context.""" + if not analysis_file: + return chat_history + [(question, "Please analyze a repository first before asking questions.")] + + try: + with open(analysis_file, 'r') as f: + analysis_data = json.load(f) + + + # Initialize chat with system prompt and history + model = genai.GenerativeModel( + model_name="gemini-1.5-pro", # Use 1.5 Pro + generation_config={ + "temperature": 0.7, + "top_p": 0.8, # More focused sampling + "top_k": 40, + "max_output_tokens": 4096, # Increased token limit + } + ) + + # Build the context + context = """You are an expert code analyst helping users understand repository analysis results. +Provide detailed, technical, and actionable insights based on the analysis data. When appropriate, +reference specific metrics and patterns from the analysis. If making recommendations, be specific +and explain the reasoning behind them. + +Repository Analysis Data: """ -demo = gr.ChatInterface( - respond, - additional_inputs=[ - gr.Textbox(value="You are a friendly Chatbot.", label="System message"), - gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), - gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), - gr.Slider( - minimum=0.1, - maximum=1.0, - value=0.95, - step=0.05, - label="Top-p (nucleus sampling)", - ), - ], -) + context += json.dumps(analysis_data, indent=2) + "\n\n" + + if chat_history: # Previous Chat history if have any. + context += "Previous conversation:\n" + for user_msg, assistant_msg in chat_history[-3:]: # Only include last 3 exchanges for relevance. + context += f"User: {user_msg}\nAssistant: {assistant_msg}\n" + + prompt = f"""{context} + +User's Question: {question} + +Please provide a detailed analysis that: +1. Directly addresses the user's question +2. References relevant metrics and data from the analysis +3. Provides context and explanations for technical concepts +4. Suggests actionable next steps or recommendations when appropriate +5. Maintains technical accuracy while being clear and understandable + +Your response:""" + + chat = model.start_chat(history=[]) # Start a new chat + response = chat.send_message(prompt) + + + return chat_history + [(question, response.text)] # Store new + + except Exception as e: + error_message = f"Error processing question: {str(e)}" + return chat_history + [(question, error_message)] + + + +# --- Gradio Interface --- + +def create_interface(): + with gr.Blocks(theme=gr.themes.Soft()) as app: # Use a theme + gr.Markdown(""" + # 🔍 GitHub Repository Analyzer (Colab Version) + + Analyze any public GitHub repository using AI. + """) + + # API tokens + with gr.Row(): + github_token = gr.Textbox( + label="GitHub Token", + type="password", + placeholder="Enter your GitHub token" + ) + gemini_key = gr.Textbox( + label="Gemini API Key", + type="password", + placeholder="Enter your Gemini API key" + ) + init_btn = gr.Button("Initialize Tokens", variant="secondary") + + # Repository URL and analysis button + with gr.Row(): + repo_url = gr.Textbox( + label="GitHub Repository URL", + placeholder="https://github.com/owner/repo", + scale=4 # Larger input box + ) + analyze_btn = gr.Button("🔍 Analyze", variant="primary", scale=1) + + # Status message + status_msg = gr.Markdown("") # Display Error Status. + + # Analysis results + with gr.Tabs(): + with gr.Tab("📝 Analysis Report"): # report Analysis. + summary = gr.Markdown("") # output report. + + with gr.Tab("💭 Q&A"): # Improved label + chatbot = gr.Chatbot( + [], + label="Ask questions about the analysis", + height=400 + ) + with gr.Row(): + question = gr.Textbox( + label="Your Question", + placeholder="Ask about specific aspects of the analysis...", + scale=4 + ) + ask_btn = gr.Button("Ask", scale=1) + clear_btn = gr.Button("Clear", scale=1) + + + # Hidden state to store the analysis data file path + analysis_file = gr.State("") + + + async def safe_analyze(repo_url: str, github_token: str, gemini_key: str): + """Wrapper function to handle analysis and errors gracefully.""" + try: + if not repo_url: + return None, None, "❌ Please enter a GitHub repository URL" + + if not github_token or not gemini_key: + return None, None, "❌ Please initialize tokens first" + + if not re.match(r'https?://github\.com/[\w-]+/[\w-]+/?$', repo_url): + return None, None, "❌ Invalid GitHub repository URL format" + + summary, analysis_file, status = await analyze_repository(repo_url, github_token, gemini_key) + return summary, analysis_file, status + + except Exception as e: + return None, None, f"❌ Analysis failed: {str(e)}" + + + # Event handlers + init_btn.click( + initialize_tokens, + inputs=[github_token, gemini_key], + outputs=status_msg + ) + + analyze_btn.click( + fn=lambda: "⏳ Analysis in progress...", # Immediate feedback + inputs=None, + outputs=status_msg, + queue=False # Don't queue this click + ).then( + safe_analyze, # Call the wrapper + inputs=[repo_url, github_token, gemini_key], + outputs=[summary, analysis_file, status_msg] + ) + + ask_btn.click( + ask_question, + inputs=[question, analysis_file, chatbot], # Include chatbot history + outputs=[chatbot] + ).then( + lambda: "", # Clear the question box after asking + None, + question, + queue=False + ) + clear_btn.click( + lambda: ([], ""), # Clear chatbot and question + outputs=[chatbot, question] + ) + return app +# Run the interface if __name__ == "__main__": - demo.launch() + app = create_interface() + app.launch(debug=True, share=True) \ No newline at end of file