Spaces:
Sleeping
Sleeping
from typing import Dict, List, Optional | |
import logging | |
from datetime import datetime, timedelta | |
from .model_manager import ModelManager | |
from .config import Config | |
import gc | |
logger = logging.getLogger(__name__) | |
class CodeReview: | |
def __init__(self, code: str, language: str, review_id: str): | |
self.code = code | |
self.language = language | |
self.review_id = review_id | |
self.timestamp = datetime.now() | |
self.suggestions: List[Dict] = [] | |
self.metrics: Dict = {} | |
class CodeReviewer: | |
def __init__(self, model_manager: ModelManager): | |
self.model_manager = model_manager | |
self.review_history: List[CodeReview] = [] | |
self._last_cleanup = datetime.now() | |
def _create_review_prompt(self, code: str, language: str) -> str: | |
"""Create a structured prompt for code review.""" | |
# More concise prompt to reduce token usage | |
return f"""Review this {language} code. List specific points in these sections: | |
Issues: | |
Improvements: | |
Best Practices: | |
Security: | |
Code: | |
```{language} | |
{code} | |
```""" | |
def review_code(self, code: str, language: str, review_id: str) -> CodeReview: | |
"""Perform code review using the LLM.""" | |
try: | |
start_time = datetime.now() | |
# Clean up old reviews periodically | |
self._cleanup_old_reviews() | |
# Create review instance | |
review = CodeReview(code, language, review_id) | |
# Truncate code if too long | |
max_code_length = Config.MAX_INPUT_LENGTH - 200 # Reserve tokens for prompt | |
if len(code) > max_code_length: | |
code = code[:max_code_length] + "\n# ... (code truncated for length)" | |
# Generate review prompt | |
prompt = self._create_review_prompt(code, language) | |
# Get model response | |
response = self.model_manager.generate_text( | |
prompt, | |
max_new_tokens=Config.MAX_OUTPUT_LENGTH | |
) | |
# Parse and structure the response | |
sections = self._parse_review_response(response) | |
# Store suggestions | |
review.suggestions = sections | |
# Calculate metrics | |
end_time = datetime.now() | |
review.metrics = { | |
'response_time': (end_time - start_time).total_seconds(), | |
'code_length': len(code), | |
'suggestion_count': sum(len(section['items']) for section in sections) | |
} | |
# Store review in history | |
self._add_to_history(review) | |
# Force garbage collection | |
gc.collect() | |
return review | |
except Exception as e: | |
logger.error(f"Error during code review: {str(e)}") | |
raise | |
def _parse_review_response(self, response: str) -> List[Dict]: | |
"""Parse the LLM response into structured sections.""" | |
sections = [] | |
current_section = None | |
required_sections = ['Issues', 'Improvements', 'Best Practices', 'Security'] | |
try: | |
# Split response into lines and process each line | |
lines = response.split('\n') | |
for line in lines: | |
line = line.strip() | |
if not line: | |
continue | |
# Check for section headers | |
for section in required_sections: | |
if line.lower().startswith(section.lower()): | |
current_section = { | |
'type': section, | |
'items': [] | |
} | |
sections.append(current_section) | |
break | |
# Add items to current section if not a section header | |
if current_section and line.strip('-* ') and not any( | |
line.lower().startswith(s.lower()) for s in required_sections | |
): | |
item = line.strip('-* ') | |
if item and not any(item == existing for existing in current_section['items']): | |
current_section['items'].append(item) | |
except Exception as e: | |
logger.error(f"Error parsing response: {str(e)}") | |
# Ensure all required sections exist | |
result = [] | |
for section_type in required_sections: | |
found_section = next((s for s in sections if s['type'] == section_type), None) | |
if found_section: | |
result.append(found_section) | |
else: | |
result.append({ | |
'type': section_type, | |
'items': [] | |
}) | |
return result | |
def _add_to_history(self, review: CodeReview): | |
"""Add review to history and maintain size limit.""" | |
self.review_history.append(review) | |
while len(self.review_history) > Config.MAX_HISTORY_ITEMS: | |
self.review_history.pop(0) | |
def _cleanup_old_reviews(self): | |
"""Clean up reviews older than retention period.""" | |
if (datetime.now() - self._last_cleanup) > timedelta(hours=1): | |
cutoff_date = datetime.now() - timedelta(days=Config.HISTORY_RETENTION_DAYS) | |
self.review_history = [r for r in self.review_history if r.timestamp > cutoff_date] | |
self._last_cleanup = datetime.now() | |
gc.collect() | |
def get_review_metrics(self) -> Dict: | |
"""Calculate aggregate metrics from review history.""" | |
if not self.review_history: | |
return { | |
'total_reviews': 0, | |
'avg_response_time': 0.0, | |
'avg_suggestions': 0.0, | |
'reviews_today': 0 | |
} | |
total_reviews = len(self.review_history) | |
avg_response_time = sum(r.metrics['response_time'] for r in self.review_history) / total_reviews | |
avg_suggestions = sum(r.metrics['suggestion_count'] for r in self.review_history) / total_reviews | |
return { | |
'total_reviews': total_reviews, | |
'avg_response_time': avg_response_time, | |
'avg_suggestions': avg_suggestions, | |
'reviews_today': sum(1 for r in self.review_history if r.timestamp.date() == datetime.now().date()) | |
} | |
def get_review_history(self, limit: Optional[int] = None) -> List[CodeReview]: | |
"""Get review history with optional limit.""" | |
if limit: | |
return self.review_history[-limit:] | |
return self.review_history.copy() # Return copy to prevent external modifications | |