Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Code Analyzer Service | |
This module provides functionality for analyzing code quality across different languages. | |
""" | |
import os | |
import subprocess | |
import logging | |
import json | |
import tempfile | |
from collections import defaultdict | |
logger = logging.getLogger(__name__) | |
class CodeAnalyzer: | |
""" | |
Service for analyzing code quality across different languages. | |
""" | |
def __init__(self): | |
""" | |
Initialize the CodeAnalyzer. | |
""" | |
logger.info("Initialized CodeAnalyzer") | |
self.analyzers = { | |
'Python': self._analyze_python, | |
'JavaScript': self._analyze_javascript, | |
'TypeScript': self._analyze_typescript, | |
'Java': self._analyze_java, | |
'Go': self._analyze_go, | |
'Rust': self._analyze_rust, | |
} | |
def analyze_repository(self, repo_path, languages): | |
""" | |
Analyze code quality in a repository for the specified languages. | |
Args: | |
repo_path (str): The path to the repository. | |
languages (list): A list of programming languages to analyze. | |
Returns: | |
dict: A dictionary containing analysis results for each language. | |
""" | |
logger.info(f"Analyzing repository at {repo_path} for languages: {languages}") | |
results = {} | |
for language in languages: | |
if language in self.analyzers: | |
try: | |
logger.info(f"Analyzing {language} code in {repo_path}") | |
results[language] = self.analyzers[language](repo_path) | |
except Exception as e: | |
logger.error(f"Error analyzing {language} code: {e}") | |
results[language] = { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
else: | |
logger.warning(f"No analyzer available for {language}") | |
results[language] = { | |
'status': 'not_supported', | |
'message': f"Analysis for {language} is not supported yet.", | |
'issues': [], | |
} | |
return results | |
def _analyze_python(self, repo_path): | |
""" | |
Analyze Python code using pylint. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing Python code in {repo_path}") | |
# Find Python files | |
python_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.py'): | |
python_files.append(os.path.join(root, file)) | |
if not python_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Python files found in the repository.', | |
'issues': [], | |
} | |
# Create a temporary file to store pylint output | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: | |
temp_path = temp_file.name | |
try: | |
# Run pylint with JSON reporter | |
cmd = [ | |
'pylint', | |
'--output-format=json', | |
'--reports=n', | |
] + python_files | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse pylint output | |
if process.stdout.strip(): | |
try: | |
issues = json.loads(process.stdout) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing pylint output: {process.stdout}") | |
issues = [] | |
else: | |
issues = [] | |
# Group issues by type | |
issues_by_type = defaultdict(list) | |
for issue in issues: | |
issue_type = issue.get('type', 'unknown') | |
issues_by_type[issue_type].append(issue) | |
return { | |
'status': 'success', | |
'issues': issues, | |
'issues_by_type': dict(issues_by_type), | |
'issue_count': len(issues), | |
'files_analyzed': len(python_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running pylint: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
finally: | |
# Clean up the temporary file | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
def _analyze_javascript(self, repo_path): | |
""" | |
Analyze JavaScript code using ESLint. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing JavaScript code in {repo_path}") | |
# Find JavaScript files | |
js_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith(('.js', '.jsx')) and not 'node_modules' in root: | |
js_files.append(os.path.join(root, file)) | |
if not js_files: | |
return { | |
'status': 'no_files', | |
'message': 'No JavaScript files found in the repository.', | |
'issues': [], | |
} | |
# Create a temporary ESLint configuration file | |
eslint_config = { | |
"env": { | |
"browser": True, | |
"es2021": True, | |
"node": True | |
}, | |
"extends": "eslint:recommended", | |
"parserOptions": { | |
"ecmaVersion": 12, | |
"sourceType": "module", | |
"ecmaFeatures": { | |
"jsx": True | |
} | |
}, | |
"rules": {} | |
} | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: | |
json.dump(eslint_config, temp_config) | |
temp_config_path = temp_config.name | |
try: | |
# Run ESLint with JSON formatter | |
cmd = [ | |
'npx', | |
'eslint', | |
'--config', temp_config_path, | |
'--format', 'json', | |
] + js_files | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse ESLint output | |
if process.stdout.strip(): | |
try: | |
eslint_results = json.loads(process.stdout) | |
# Extract issues from ESLint results | |
issues = [] | |
for result in eslint_results: | |
file_path = result.get('filePath', '') | |
for message in result.get('messages', []): | |
issues.append({ | |
'path': file_path, | |
'line': message.get('line', 0), | |
'column': message.get('column', 0), | |
'message': message.get('message', ''), | |
'severity': message.get('severity', 0), | |
'ruleId': message.get('ruleId', ''), | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing ESLint output: {process.stdout}") | |
issues = [] | |
else: | |
issues = [] | |
# Group issues by severity | |
issues_by_severity = defaultdict(list) | |
for issue in issues: | |
severity = issue.get('severity', 0) | |
severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') | |
issues_by_severity[severity_name].append(issue) | |
return { | |
'status': 'success', | |
'issues': issues, | |
'issues_by_severity': dict(issues_by_severity), | |
'issue_count': len(issues), | |
'files_analyzed': len(js_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running ESLint: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
finally: | |
# Clean up the temporary configuration file | |
if os.path.exists(temp_config_path): | |
os.unlink(temp_config_path) | |
def _analyze_typescript(self, repo_path): | |
""" | |
Analyze TypeScript code using ESLint and TSC. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing TypeScript code in {repo_path}") | |
# Find TypeScript files | |
ts_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith(('.ts', '.tsx')) and not 'node_modules' in root: | |
ts_files.append(os.path.join(root, file)) | |
if not ts_files: | |
return { | |
'status': 'no_files', | |
'message': 'No TypeScript files found in the repository.', | |
'issues': [], | |
} | |
# Create a temporary ESLint configuration file for TypeScript | |
eslint_config = { | |
"env": { | |
"browser": True, | |
"es2021": True, | |
"node": True | |
}, | |
"extends": [ | |
"eslint:recommended", | |
"plugin:@typescript-eslint/recommended" | |
], | |
"parser": "@typescript-eslint/parser", | |
"parserOptions": { | |
"ecmaVersion": 12, | |
"sourceType": "module", | |
"ecmaFeatures": { | |
"jsx": True | |
} | |
}, | |
"plugins": [ | |
"@typescript-eslint" | |
], | |
"rules": {} | |
} | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: | |
json.dump(eslint_config, temp_config) | |
temp_config_path = temp_config.name | |
# Create a temporary tsconfig.json file | |
tsconfig = { | |
"compilerOptions": { | |
"target": "es2020", | |
"module": "commonjs", | |
"strict": True, | |
"esModuleInterop": True, | |
"skipLibCheck": True, | |
"forceConsistentCasingInFileNames": True, | |
"noEmit": True | |
}, | |
"include": ts_files | |
} | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_tsconfig: | |
json.dump(tsconfig, temp_tsconfig) | |
temp_tsconfig_path = temp_tsconfig.name | |
try: | |
# Run ESLint with TypeScript support | |
eslint_cmd = [ | |
'npx', | |
'eslint', | |
'--config', temp_config_path, | |
'--format', 'json', | |
'--ext', '.ts,.tsx', | |
] + ts_files | |
eslint_process = subprocess.run( | |
eslint_cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse ESLint output | |
eslint_issues = [] | |
if eslint_process.stdout.strip(): | |
try: | |
eslint_results = json.loads(eslint_process.stdout) | |
# Extract issues from ESLint results | |
for result in eslint_results: | |
file_path = result.get('filePath', '') | |
for message in result.get('messages', []): | |
eslint_issues.append({ | |
'path': file_path, | |
'line': message.get('line', 0), | |
'column': message.get('column', 0), | |
'message': message.get('message', ''), | |
'severity': message.get('severity', 0), | |
'ruleId': message.get('ruleId', ''), | |
'source': 'eslint', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing ESLint output: {eslint_process.stdout}") | |
# Run TypeScript compiler for type checking | |
tsc_cmd = [ | |
'npx', | |
'tsc', | |
'--project', temp_tsconfig_path, | |
'--noEmit', | |
] | |
tsc_process = subprocess.run( | |
tsc_cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse TSC output | |
tsc_issues = [] | |
if tsc_process.stderr.strip(): | |
# TSC error format: file.ts(line,col): error TS2551: message | |
for line in tsc_process.stderr.splitlines(): | |
if ': error ' in line or ': warning ' in line: | |
try: | |
file_info, error_info = line.split(':', 1) | |
file_path, line_col = file_info.rsplit('(', 1) | |
line_num, col_num = line_col.rstrip(')').split(',') | |
error_type, error_message = error_info.split(':', 1) | |
error_type = error_type.strip() | |
error_message = error_message.strip() | |
tsc_issues.append({ | |
'path': file_path, | |
'line': int(line_num), | |
'column': int(col_num), | |
'message': error_message, | |
'severity': 2 if 'error' in error_type else 1, | |
'ruleId': error_type, | |
'source': 'tsc', | |
}) | |
except Exception as e: | |
logger.warning(f"Error parsing TSC output line: {line}, error: {e}") | |
# Combine issues from both tools | |
all_issues = eslint_issues + tsc_issues | |
# Group issues by source and severity | |
issues_by_source = defaultdict(list) | |
issues_by_severity = defaultdict(list) | |
for issue in all_issues: | |
source = issue.get('source', 'unknown') | |
issues_by_source[source].append(issue) | |
severity = issue.get('severity', 0) | |
severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') | |
issues_by_severity[severity_name].append(issue) | |
return { | |
'status': 'success', | |
'issues': all_issues, | |
'issues_by_source': dict(issues_by_source), | |
'issues_by_severity': dict(issues_by_severity), | |
'issue_count': len(all_issues), | |
'files_analyzed': len(ts_files), | |
} | |
except Exception as e: | |
logger.error(f"Error analyzing TypeScript code: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
finally: | |
# Clean up temporary files | |
for temp_file in [temp_config_path, temp_tsconfig_path]: | |
if os.path.exists(temp_file): | |
os.unlink(temp_file) | |
def _analyze_java(self, repo_path): | |
""" | |
Analyze Java code using PMD. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing Java code in {repo_path}") | |
# Find Java files | |
java_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.java'): | |
java_files.append(os.path.join(root, file)) | |
if not java_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Java files found in the repository.', | |
'issues': [], | |
} | |
# Create a temporary file to store PMD output | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: | |
temp_path = temp_file.name | |
try: | |
# Run PMD with JSON reporter | |
cmd = [ | |
'pmd', | |
'check', | |
'--dir', repo_path, | |
'--format', 'json', | |
'--rulesets', 'category/java/bestpractices.xml,category/java/codestyle.xml,category/java/design.xml,category/java/errorprone.xml,category/java/multithreading.xml,category/java/performance.xml,category/java/security.xml', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse PMD output | |
if process.stdout.strip(): | |
try: | |
pmd_results = json.loads(process.stdout) | |
# Extract issues from PMD results | |
issues = [] | |
for file_result in pmd_results.get('files', []): | |
file_path = file_result.get('filename', '') | |
for violation in file_result.get('violations', []): | |
issues.append({ | |
'path': file_path, | |
'line': violation.get('beginline', 0), | |
'endLine': violation.get('endline', 0), | |
'column': violation.get('begincolumn', 0), | |
'endColumn': violation.get('endcolumn', 0), | |
'message': violation.get('description', ''), | |
'rule': violation.get('rule', ''), | |
'ruleset': violation.get('ruleset', ''), | |
'priority': violation.get('priority', 0), | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing PMD output: {process.stdout}") | |
issues = [] | |
else: | |
issues = [] | |
# Group issues by ruleset | |
issues_by_ruleset = defaultdict(list) | |
for issue in issues: | |
ruleset = issue.get('ruleset', 'unknown') | |
issues_by_ruleset[ruleset].append(issue) | |
return { | |
'status': 'success', | |
'issues': issues, | |
'issues_by_ruleset': dict(issues_by_ruleset), | |
'issue_count': len(issues), | |
'files_analyzed': len(java_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running PMD: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
finally: | |
# Clean up the temporary file | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
def _analyze_go(self, repo_path): | |
""" | |
Analyze Go code using golangci-lint. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing Go code in {repo_path}") | |
# Find Go files | |
go_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.go'): | |
go_files.append(os.path.join(root, file)) | |
if not go_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Go files found in the repository.', | |
'issues': [], | |
} | |
try: | |
# Run golangci-lint with JSON output | |
cmd = [ | |
'golangci-lint', | |
'run', | |
'--out-format=json', | |
repo_path, | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=repo_path, # Run in the repository directory | |
) | |
# Parse golangci-lint output | |
if process.stdout.strip(): | |
try: | |
lint_results = json.loads(process.stdout) | |
# Extract issues from golangci-lint results | |
issues = [] | |
for issue in lint_results.get('Issues', []): | |
issues.append({ | |
'path': issue.get('Pos', {}).get('Filename', ''), | |
'line': issue.get('Pos', {}).get('Line', 0), | |
'column': issue.get('Pos', {}).get('Column', 0), | |
'message': issue.get('Text', ''), | |
'linter': issue.get('FromLinter', ''), | |
'severity': 'error' if issue.get('Severity', '') == 'error' else 'warning', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing golangci-lint output: {process.stdout}") | |
issues = [] | |
else: | |
issues = [] | |
# Group issues by linter | |
issues_by_linter = defaultdict(list) | |
for issue in issues: | |
linter = issue.get('linter', 'unknown') | |
issues_by_linter[linter].append(issue) | |
return { | |
'status': 'success', | |
'issues': issues, | |
'issues_by_linter': dict(issues_by_linter), | |
'issue_count': len(issues), | |
'files_analyzed': len(go_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running golangci-lint: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} | |
def _analyze_rust(self, repo_path): | |
""" | |
Analyze Rust code using clippy. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Analysis results. | |
""" | |
logger.info(f"Analyzing Rust code in {repo_path}") | |
# Find Rust files | |
rust_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.rs'): | |
rust_files.append(os.path.join(root, file)) | |
if not rust_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Rust files found in the repository.', | |
'issues': [], | |
} | |
try: | |
# Run clippy with JSON output | |
cmd = [ | |
'cargo', | |
'clippy', | |
'--message-format=json', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=repo_path, # Run in the repository directory | |
) | |
# Parse clippy output | |
issues = [] | |
if process.stdout.strip(): | |
for line in process.stdout.splitlines(): | |
try: | |
message = json.loads(line) | |
if message.get('reason') == 'compiler-message': | |
msg = message.get('message', {}) | |
spans = msg.get('spans', []) | |
if spans: | |
primary_span = next((s for s in spans if s.get('is_primary')), spans[0]) | |
file_path = primary_span.get('file_name', '') | |
line_num = primary_span.get('line_start', 0) | |
column = primary_span.get('column_start', 0) | |
issues.append({ | |
'path': file_path, | |
'line': line_num, | |
'column': column, | |
'message': msg.get('message', ''), | |
'level': msg.get('level', ''), | |
'code': msg.get('code', {}).get('code', ''), | |
}) | |
except json.JSONDecodeError: | |
continue | |
# Group issues by level | |
issues_by_level = defaultdict(list) | |
for issue in issues: | |
level = issue.get('level', 'unknown') | |
issues_by_level[level].append(issue) | |
return { | |
'status': 'success', | |
'issues': issues, | |
'issues_by_level': dict(issues_by_level), | |
'issue_count': len(issues), | |
'files_analyzed': len(rust_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running clippy: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'issues': [], | |
} |