code-review-agent / src /services /code_analyzer.py
c1r3x's picture
Review Agent: Uploaded remaining files
aa300a4 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Code Analyzer Service
This module provides functionality for analyzing code quality across different languages.
"""
import os
import subprocess
import logging
import json
import tempfile
from collections import defaultdict
logger = logging.getLogger(__name__)
class CodeAnalyzer:
"""
Service for analyzing code quality across different languages.
"""
def __init__(self):
"""
Initialize the CodeAnalyzer.
"""
logger.info("Initialized CodeAnalyzer")
self.analyzers = {
'Python': self._analyze_python,
'JavaScript': self._analyze_javascript,
'TypeScript': self._analyze_typescript,
'Java': self._analyze_java,
'Go': self._analyze_go,
'Rust': self._analyze_rust,
}
def analyze_repository(self, repo_path, languages):
"""
Analyze code quality in a repository for the specified languages.
Args:
repo_path (str): The path to the repository.
languages (list): A list of programming languages to analyze.
Returns:
dict: A dictionary containing analysis results for each language.
"""
logger.info(f"Analyzing repository at {repo_path} for languages: {languages}")
results = {}
for language in languages:
if language in self.analyzers:
try:
logger.info(f"Analyzing {language} code in {repo_path}")
results[language] = self.analyzers[language](repo_path)
except Exception as e:
logger.error(f"Error analyzing {language} code: {e}")
results[language] = {
'status': 'error',
'error': str(e),
'issues': [],
}
else:
logger.warning(f"No analyzer available for {language}")
results[language] = {
'status': 'not_supported',
'message': f"Analysis for {language} is not supported yet.",
'issues': [],
}
return results
def _analyze_python(self, repo_path):
"""
Analyze Python code using pylint.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing Python code in {repo_path}")
# Find Python files
python_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.py'):
python_files.append(os.path.join(root, file))
if not python_files:
return {
'status': 'no_files',
'message': 'No Python files found in the repository.',
'issues': [],
}
# Create a temporary file to store pylint output
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file:
temp_path = temp_file.name
try:
# Run pylint with JSON reporter
cmd = [
'pylint',
'--output-format=json',
'--reports=n',
] + python_files
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse pylint output
if process.stdout.strip():
try:
issues = json.loads(process.stdout)
except json.JSONDecodeError:
logger.error(f"Error parsing pylint output: {process.stdout}")
issues = []
else:
issues = []
# Group issues by type
issues_by_type = defaultdict(list)
for issue in issues:
issue_type = issue.get('type', 'unknown')
issues_by_type[issue_type].append(issue)
return {
'status': 'success',
'issues': issues,
'issues_by_type': dict(issues_by_type),
'issue_count': len(issues),
'files_analyzed': len(python_files),
}
except Exception as e:
logger.error(f"Error running pylint: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}
finally:
# Clean up the temporary file
if os.path.exists(temp_path):
os.unlink(temp_path)
def _analyze_javascript(self, repo_path):
"""
Analyze JavaScript code using ESLint.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing JavaScript code in {repo_path}")
# Find JavaScript files
js_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith(('.js', '.jsx')) and not 'node_modules' in root:
js_files.append(os.path.join(root, file))
if not js_files:
return {
'status': 'no_files',
'message': 'No JavaScript files found in the repository.',
'issues': [],
}
# Create a temporary ESLint configuration file
eslint_config = {
"env": {
"browser": True,
"es2021": True,
"node": True
},
"extends": "eslint:recommended",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module",
"ecmaFeatures": {
"jsx": True
}
},
"rules": {}
}
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config:
json.dump(eslint_config, temp_config)
temp_config_path = temp_config.name
try:
# Run ESLint with JSON formatter
cmd = [
'npx',
'eslint',
'--config', temp_config_path,
'--format', 'json',
] + js_files
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse ESLint output
if process.stdout.strip():
try:
eslint_results = json.loads(process.stdout)
# Extract issues from ESLint results
issues = []
for result in eslint_results:
file_path = result.get('filePath', '')
for message in result.get('messages', []):
issues.append({
'path': file_path,
'line': message.get('line', 0),
'column': message.get('column', 0),
'message': message.get('message', ''),
'severity': message.get('severity', 0),
'ruleId': message.get('ruleId', ''),
})
except json.JSONDecodeError:
logger.error(f"Error parsing ESLint output: {process.stdout}")
issues = []
else:
issues = []
# Group issues by severity
issues_by_severity = defaultdict(list)
for issue in issues:
severity = issue.get('severity', 0)
severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown')
issues_by_severity[severity_name].append(issue)
return {
'status': 'success',
'issues': issues,
'issues_by_severity': dict(issues_by_severity),
'issue_count': len(issues),
'files_analyzed': len(js_files),
}
except Exception as e:
logger.error(f"Error running ESLint: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}
finally:
# Clean up the temporary configuration file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)
def _analyze_typescript(self, repo_path):
"""
Analyze TypeScript code using ESLint and TSC.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing TypeScript code in {repo_path}")
# Find TypeScript files
ts_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith(('.ts', '.tsx')) and not 'node_modules' in root:
ts_files.append(os.path.join(root, file))
if not ts_files:
return {
'status': 'no_files',
'message': 'No TypeScript files found in the repository.',
'issues': [],
}
# Create a temporary ESLint configuration file for TypeScript
eslint_config = {
"env": {
"browser": True,
"es2021": True,
"node": True
},
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/recommended"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module",
"ecmaFeatures": {
"jsx": True
}
},
"plugins": [
"@typescript-eslint"
],
"rules": {}
}
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config:
json.dump(eslint_config, temp_config)
temp_config_path = temp_config.name
# Create a temporary tsconfig.json file
tsconfig = {
"compilerOptions": {
"target": "es2020",
"module": "commonjs",
"strict": True,
"esModuleInterop": True,
"skipLibCheck": True,
"forceConsistentCasingInFileNames": True,
"noEmit": True
},
"include": ts_files
}
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_tsconfig:
json.dump(tsconfig, temp_tsconfig)
temp_tsconfig_path = temp_tsconfig.name
try:
# Run ESLint with TypeScript support
eslint_cmd = [
'npx',
'eslint',
'--config', temp_config_path,
'--format', 'json',
'--ext', '.ts,.tsx',
] + ts_files
eslint_process = subprocess.run(
eslint_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse ESLint output
eslint_issues = []
if eslint_process.stdout.strip():
try:
eslint_results = json.loads(eslint_process.stdout)
# Extract issues from ESLint results
for result in eslint_results:
file_path = result.get('filePath', '')
for message in result.get('messages', []):
eslint_issues.append({
'path': file_path,
'line': message.get('line', 0),
'column': message.get('column', 0),
'message': message.get('message', ''),
'severity': message.get('severity', 0),
'ruleId': message.get('ruleId', ''),
'source': 'eslint',
})
except json.JSONDecodeError:
logger.error(f"Error parsing ESLint output: {eslint_process.stdout}")
# Run TypeScript compiler for type checking
tsc_cmd = [
'npx',
'tsc',
'--project', temp_tsconfig_path,
'--noEmit',
]
tsc_process = subprocess.run(
tsc_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse TSC output
tsc_issues = []
if tsc_process.stderr.strip():
# TSC error format: file.ts(line,col): error TS2551: message
for line in tsc_process.stderr.splitlines():
if ': error ' in line or ': warning ' in line:
try:
file_info, error_info = line.split(':', 1)
file_path, line_col = file_info.rsplit('(', 1)
line_num, col_num = line_col.rstrip(')').split(',')
error_type, error_message = error_info.split(':', 1)
error_type = error_type.strip()
error_message = error_message.strip()
tsc_issues.append({
'path': file_path,
'line': int(line_num),
'column': int(col_num),
'message': error_message,
'severity': 2 if 'error' in error_type else 1,
'ruleId': error_type,
'source': 'tsc',
})
except Exception as e:
logger.warning(f"Error parsing TSC output line: {line}, error: {e}")
# Combine issues from both tools
all_issues = eslint_issues + tsc_issues
# Group issues by source and severity
issues_by_source = defaultdict(list)
issues_by_severity = defaultdict(list)
for issue in all_issues:
source = issue.get('source', 'unknown')
issues_by_source[source].append(issue)
severity = issue.get('severity', 0)
severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown')
issues_by_severity[severity_name].append(issue)
return {
'status': 'success',
'issues': all_issues,
'issues_by_source': dict(issues_by_source),
'issues_by_severity': dict(issues_by_severity),
'issue_count': len(all_issues),
'files_analyzed': len(ts_files),
}
except Exception as e:
logger.error(f"Error analyzing TypeScript code: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}
finally:
# Clean up temporary files
for temp_file in [temp_config_path, temp_tsconfig_path]:
if os.path.exists(temp_file):
os.unlink(temp_file)
def _analyze_java(self, repo_path):
"""
Analyze Java code using PMD.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing Java code in {repo_path}")
# Find Java files
java_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.java'):
java_files.append(os.path.join(root, file))
if not java_files:
return {
'status': 'no_files',
'message': 'No Java files found in the repository.',
'issues': [],
}
# Create a temporary file to store PMD output
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file:
temp_path = temp_file.name
try:
# Run PMD with JSON reporter
cmd = [
'pmd',
'check',
'--dir', repo_path,
'--format', 'json',
'--rulesets', 'category/java/bestpractices.xml,category/java/codestyle.xml,category/java/design.xml,category/java/errorprone.xml,category/java/multithreading.xml,category/java/performance.xml,category/java/security.xml',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse PMD output
if process.stdout.strip():
try:
pmd_results = json.loads(process.stdout)
# Extract issues from PMD results
issues = []
for file_result in pmd_results.get('files', []):
file_path = file_result.get('filename', '')
for violation in file_result.get('violations', []):
issues.append({
'path': file_path,
'line': violation.get('beginline', 0),
'endLine': violation.get('endline', 0),
'column': violation.get('begincolumn', 0),
'endColumn': violation.get('endcolumn', 0),
'message': violation.get('description', ''),
'rule': violation.get('rule', ''),
'ruleset': violation.get('ruleset', ''),
'priority': violation.get('priority', 0),
})
except json.JSONDecodeError:
logger.error(f"Error parsing PMD output: {process.stdout}")
issues = []
else:
issues = []
# Group issues by ruleset
issues_by_ruleset = defaultdict(list)
for issue in issues:
ruleset = issue.get('ruleset', 'unknown')
issues_by_ruleset[ruleset].append(issue)
return {
'status': 'success',
'issues': issues,
'issues_by_ruleset': dict(issues_by_ruleset),
'issue_count': len(issues),
'files_analyzed': len(java_files),
}
except Exception as e:
logger.error(f"Error running PMD: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}
finally:
# Clean up the temporary file
if os.path.exists(temp_path):
os.unlink(temp_path)
def _analyze_go(self, repo_path):
"""
Analyze Go code using golangci-lint.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing Go code in {repo_path}")
# Find Go files
go_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.go'):
go_files.append(os.path.join(root, file))
if not go_files:
return {
'status': 'no_files',
'message': 'No Go files found in the repository.',
'issues': [],
}
try:
# Run golangci-lint with JSON output
cmd = [
'golangci-lint',
'run',
'--out-format=json',
repo_path,
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=repo_path, # Run in the repository directory
)
# Parse golangci-lint output
if process.stdout.strip():
try:
lint_results = json.loads(process.stdout)
# Extract issues from golangci-lint results
issues = []
for issue in lint_results.get('Issues', []):
issues.append({
'path': issue.get('Pos', {}).get('Filename', ''),
'line': issue.get('Pos', {}).get('Line', 0),
'column': issue.get('Pos', {}).get('Column', 0),
'message': issue.get('Text', ''),
'linter': issue.get('FromLinter', ''),
'severity': 'error' if issue.get('Severity', '') == 'error' else 'warning',
})
except json.JSONDecodeError:
logger.error(f"Error parsing golangci-lint output: {process.stdout}")
issues = []
else:
issues = []
# Group issues by linter
issues_by_linter = defaultdict(list)
for issue in issues:
linter = issue.get('linter', 'unknown')
issues_by_linter[linter].append(issue)
return {
'status': 'success',
'issues': issues,
'issues_by_linter': dict(issues_by_linter),
'issue_count': len(issues),
'files_analyzed': len(go_files),
}
except Exception as e:
logger.error(f"Error running golangci-lint: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}
def _analyze_rust(self, repo_path):
"""
Analyze Rust code using clippy.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Analysis results.
"""
logger.info(f"Analyzing Rust code in {repo_path}")
# Find Rust files
rust_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.rs'):
rust_files.append(os.path.join(root, file))
if not rust_files:
return {
'status': 'no_files',
'message': 'No Rust files found in the repository.',
'issues': [],
}
try:
# Run clippy with JSON output
cmd = [
'cargo',
'clippy',
'--message-format=json',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=repo_path, # Run in the repository directory
)
# Parse clippy output
issues = []
if process.stdout.strip():
for line in process.stdout.splitlines():
try:
message = json.loads(line)
if message.get('reason') == 'compiler-message':
msg = message.get('message', {})
spans = msg.get('spans', [])
if spans:
primary_span = next((s for s in spans if s.get('is_primary')), spans[0])
file_path = primary_span.get('file_name', '')
line_num = primary_span.get('line_start', 0)
column = primary_span.get('column_start', 0)
issues.append({
'path': file_path,
'line': line_num,
'column': column,
'message': msg.get('message', ''),
'level': msg.get('level', ''),
'code': msg.get('code', {}).get('code', ''),
})
except json.JSONDecodeError:
continue
# Group issues by level
issues_by_level = defaultdict(list)
for issue in issues:
level = issue.get('level', 'unknown')
issues_by_level[level].append(issue)
return {
'status': 'success',
'issues': issues,
'issues_by_level': dict(issues_by_level),
'issue_count': len(issues),
'files_analyzed': len(rust_files),
}
except Exception as e:
logger.error(f"Error running clippy: {e}")
return {
'status': 'error',
'error': str(e),
'issues': [],
}