Spaces:
Sleeping
Sleeping
File size: 6,771 Bytes
087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 087ce88 b4ae3b7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
from typing import Dict, List, Optional
import logging
from datetime import datetime, timedelta
from .model_manager import ModelManager
from .config import Config
import gc
logger = logging.getLogger(__name__)
class CodeReview:
def __init__(self, code: str, language: str, review_id: str):
self.code = code
self.language = language
self.review_id = review_id
self.timestamp = datetime.now()
self.suggestions: List[Dict] = []
self.metrics: Dict = {}
class CodeReviewer:
def __init__(self, model_manager: ModelManager):
self.model_manager = model_manager
self.review_history: List[CodeReview] = []
self._last_cleanup = datetime.now()
def _create_review_prompt(self, code: str, language: str) -> str:
"""Create a structured prompt for code review."""
# More concise prompt to reduce token usage
return f"""Review this {language} code. List specific points in these sections:
Issues:
Improvements:
Best Practices:
Security:
Code:
```{language}
{code}
```"""
def review_code(self, code: str, language: str, review_id: str) -> CodeReview:
"""Perform code review using the LLM."""
try:
start_time = datetime.now()
# Clean up old reviews periodically
self._cleanup_old_reviews()
# Create review instance
review = CodeReview(code, language, review_id)
# Truncate code if too long
max_code_length = Config.MAX_INPUT_LENGTH - 200 # Reserve tokens for prompt
if len(code) > max_code_length:
code = code[:max_code_length] + "\n# ... (code truncated for length)"
# Generate review prompt
prompt = self._create_review_prompt(code, language)
# Get model response
response = self.model_manager.generate_text(
prompt,
max_new_tokens=Config.MAX_OUTPUT_LENGTH
)
# Parse and structure the response
sections = self._parse_review_response(response)
# Store suggestions
review.suggestions = sections
# Calculate metrics
end_time = datetime.now()
review.metrics = {
'response_time': (end_time - start_time).total_seconds(),
'code_length': len(code),
'suggestion_count': sum(len(section['items']) for section in sections)
}
# Store review in history
self._add_to_history(review)
# Force garbage collection
gc.collect()
return review
except Exception as e:
logger.error(f"Error during code review: {str(e)}")
raise
def _parse_review_response(self, response: str) -> List[Dict]:
"""Parse the LLM response into structured sections."""
sections = []
current_section = None
required_sections = ['Issues', 'Improvements', 'Best Practices', 'Security']
try:
# Split response into lines and process each line
lines = response.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# Check for section headers
for section in required_sections:
if line.lower().startswith(section.lower()):
current_section = {
'type': section,
'items': []
}
sections.append(current_section)
break
# Add items to current section if not a section header
if current_section and line.strip('-* ') and not any(
line.lower().startswith(s.lower()) for s in required_sections
):
item = line.strip('-* ')
if item and not any(item == existing for existing in current_section['items']):
current_section['items'].append(item)
except Exception as e:
logger.error(f"Error parsing response: {str(e)}")
# Ensure all required sections exist
result = []
for section_type in required_sections:
found_section = next((s for s in sections if s['type'] == section_type), None)
if found_section:
result.append(found_section)
else:
result.append({
'type': section_type,
'items': []
})
return result
def _add_to_history(self, review: CodeReview):
"""Add review to history and maintain size limit."""
self.review_history.append(review)
while len(self.review_history) > Config.MAX_HISTORY_ITEMS:
self.review_history.pop(0)
def _cleanup_old_reviews(self):
"""Clean up reviews older than retention period."""
if (datetime.now() - self._last_cleanup) > timedelta(hours=1):
cutoff_date = datetime.now() - timedelta(days=Config.HISTORY_RETENTION_DAYS)
self.review_history = [r for r in self.review_history if r.timestamp > cutoff_date]
self._last_cleanup = datetime.now()
gc.collect()
def get_review_metrics(self) -> Dict:
"""Calculate aggregate metrics from review history."""
if not self.review_history:
return {
'total_reviews': 0,
'avg_response_time': 0.0,
'avg_suggestions': 0.0,
'reviews_today': 0
}
total_reviews = len(self.review_history)
avg_response_time = sum(r.metrics['response_time'] for r in self.review_history) / total_reviews
avg_suggestions = sum(r.metrics['suggestion_count'] for r in self.review_history) / total_reviews
return {
'total_reviews': total_reviews,
'avg_response_time': avg_response_time,
'avg_suggestions': avg_suggestions,
'reviews_today': sum(1 for r in self.review_history if r.timestamp.date() == datetime.now().date())
}
def get_review_history(self, limit: Optional[int] = None) -> List[CodeReview]:
"""Get review history with optional limit."""
if limit:
return self.review_history[-limit:]
return self.review_history.copy() # Return copy to prevent external modifications
|