|
import os
|
|
import git
|
|
from pathlib import Path
|
|
from openai import OpenAI
|
|
from anthropic import Anthropic
|
|
from dotenv import load_dotenv
|
|
from pydantic_model import ImpactAnalysis
|
|
import tiktoken
|
|
import json
|
|
from typing import List, Tuple, Dict, Any
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
anthropic_client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
|
|
|
|
def clone_repository(repo_url, temp_dir):
|
|
"""Clone a git repository to a temporary directory."""
|
|
try:
|
|
git.Repo.clone_from(repo_url, temp_dir)
|
|
return True, None
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def read_code_files(directory):
|
|
"""Read all code files from the directory."""
|
|
code_files = []
|
|
code_extensions = {'.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.cpp', '.c', '.cs', '.go', '.rb', '.php', '.cls', '.object','.page'}
|
|
warnings = []
|
|
|
|
for root, _, files in os.walk(directory):
|
|
for file in files:
|
|
if Path(file).suffix in code_extensions:
|
|
file_path = os.path.join(root, file)
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
relative_path = os.path.relpath(file_path, directory)
|
|
code_files.append({
|
|
'path': relative_path,
|
|
'content': content
|
|
})
|
|
except Exception as e:
|
|
warnings.append(f"Could not read file {file_path}: {str(e)}")
|
|
|
|
return code_files, warnings
|
|
|
|
def count_tokens(text: str, model: str = "gpt-4") -> int:
|
|
"""Count the number of tokens in a text string."""
|
|
encoding = tiktoken.encoding_for_model(model)
|
|
return len(encoding.encode(text))
|
|
|
|
def chunk_files(code_files: List[Dict[str, str]], model: str = "gpt-4", max_tokens: int = 120000) -> List[List[Dict[str, str]]]:
|
|
"""Split files into chunks that fit within the context window."""
|
|
chunks = []
|
|
current_chunk = []
|
|
current_tokens = 0
|
|
|
|
for file in code_files:
|
|
file_content = f"File: {file['path']}\nContent:\n{file['content']}\n"
|
|
file_tokens = count_tokens(file_content, model)
|
|
|
|
|
|
if file_tokens > max_tokens:
|
|
print(f"Warning: File {file['path']} is too large ({file_tokens} tokens) and will be skipped")
|
|
continue
|
|
|
|
|
|
if current_tokens + file_tokens > max_tokens:
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
current_chunk = [file]
|
|
current_tokens = file_tokens
|
|
else:
|
|
current_chunk.append(file)
|
|
current_tokens += file_tokens
|
|
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
return chunks
|
|
|
|
def analyze_code_chunk(chunk: List[Dict[str, str]], prompt: str, model: str) -> Tuple[str, str]:
|
|
"""Analyze a chunk of code files."""
|
|
try:
|
|
|
|
context = "Here are the relevant code files:\n\n"
|
|
for file in chunk:
|
|
context += f"File: {file['path']}\n```\n{file['content']}\n```\n"
|
|
|
|
if model == "gpt-4":
|
|
json_schema = ImpactAnalysis.model_json_schema()
|
|
messages = [
|
|
{"role": "system", "content": "You are a code analysis expert. Analyze the provided code based on the user's prompt."},
|
|
{"role": "user", "content": f"Please check the impact of performing the below code/configuration changes on the above codebase. Provide only the summary of the impact in a table with aggregate analysis that outputs a JSON object with the following schema : {json_schema} . Pls note : Do not add the characters ``` json anywhere in the response. Do not respond with messages like 'Here is the response in the required JSON format:'.\n\nCode or configuration changes: {prompt}\n\n{context}"}
|
|
]
|
|
|
|
response = openai_client.chat.completions.create(
|
|
model="gpt-4o",
|
|
messages=messages,
|
|
temperature=0.7,
|
|
max_tokens=2000
|
|
)
|
|
return response.choices[0].message.content, ""
|
|
else:
|
|
|
|
system_message = "You are a code analysis expert. Analyze the provided code based on the user's prompt."
|
|
user_message = f"Please check the impact of performing the below code/configuration changes on the above codebase. Provide only the summary of the impact in a table with aggregate analysis that includes 1) List of files impacted. 2) No of files impacted 3) Impactd etail on each file impacted . Surface a 'Severity Level' at the top of table with possible values: Low, Medium, High based on the 'Number of impacted files' impacted. E.g. if 'Number of impacted files' > 0 but < 3 then LOW, if 'Number of impacted files' > 3 but < 8 then MEDIUM, if 'Number of impacted files' > 8 then HIGH.\n\nCode or configuration changes: {prompt}\n\n{context}"
|
|
|
|
response = anthropic_client.messages.create(
|
|
model="claude-3-7-sonnet-20250219",
|
|
max_tokens=2000,
|
|
temperature=0.7,
|
|
system=system_message,
|
|
messages=[{"role": "user", "content": user_message}]
|
|
)
|
|
return response.content[0].text, ""
|
|
except Exception as e:
|
|
return "", str(e)
|
|
|
|
def analyze_code(code_files: List[Dict[str, str]], prompt: str, model: str) -> Tuple[str, str]:
|
|
"""Analyze code files with chunking to handle large codebases."""
|
|
try:
|
|
|
|
chunks = chunk_files(code_files, model)
|
|
|
|
if not chunks:
|
|
return "", "No valid files to analyze"
|
|
|
|
|
|
all_analyses = []
|
|
for i, chunk in enumerate(chunks):
|
|
analysis, error = analyze_code_chunk(chunk, prompt, model)
|
|
if error:
|
|
return "", f"Error analyzing chunk {i+1}: {error}"
|
|
if analysis:
|
|
all_analyses.append(analysis)
|
|
|
|
if not all_analyses:
|
|
return "", "No analysis results generated"
|
|
|
|
|
|
combined_analysis = {
|
|
"severity_level": "LOW",
|
|
"number_of_files_impacted": 0,
|
|
"files_impacted": []
|
|
}
|
|
|
|
|
|
for analysis in all_analyses:
|
|
try:
|
|
chunk_data = json.loads(analysis)
|
|
combined_analysis["number_of_files_impacted"] += chunk_data.get("number_of_files_impacted", 0)
|
|
combined_analysis["files_impacted"].extend(chunk_data.get("files_impacted", []))
|
|
|
|
|
|
severity_map = {"LOW": 1, "MEDIUM": 2, "HIGH": 3}
|
|
current_severity = severity_map.get(combined_analysis["severity_level"], 0)
|
|
chunk_severity = severity_map.get(chunk_data.get("severity_level", "LOW"), 0)
|
|
if chunk_severity > current_severity:
|
|
combined_analysis["severity_level"] = chunk_data["severity_level"]
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return json.dumps(combined_analysis), ""
|
|
|
|
except Exception as e:
|
|
return "", str(e)
|
|
|
|
def check_api_keys():
|
|
"""Check if required API keys are set."""
|
|
openai_key = os.getenv("OPENAI_API_KEY") is not None
|
|
anthropic_key = os.getenv("ANTHROPIC_API_KEY") is not None
|
|
return {
|
|
"gpt-4": openai_key,
|
|
"claude-sonnet": anthropic_key
|
|
}
|
|
|