#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Code Analyzer Service This module provides functionality for analyzing code quality across different languages. """ import os import subprocess import logging import json import tempfile from collections import defaultdict logger = logging.getLogger(__name__) class CodeAnalyzer: """ Service for analyzing code quality across different languages. """ def __init__(self): """ Initialize the CodeAnalyzer. """ logger.info("Initialized CodeAnalyzer") self.analyzers = { 'Python': self._analyze_python, 'JavaScript': self._analyze_javascript, 'TypeScript': self._analyze_typescript, 'Java': self._analyze_java, 'Go': self._analyze_go, 'Rust': self._analyze_rust, } def analyze_repository(self, repo_path, languages): """ Analyze code quality in a repository for the specified languages. Args: repo_path (str): The path to the repository. languages (list): A list of programming languages to analyze. Returns: dict: A dictionary containing analysis results for each language. """ logger.info(f"Analyzing repository at {repo_path} for languages: {languages}") results = {} for language in languages: if language in self.analyzers: try: logger.info(f"Analyzing {language} code in {repo_path}") results[language] = self.analyzers[language](repo_path) except Exception as e: logger.error(f"Error analyzing {language} code: {e}") results[language] = { 'status': 'error', 'error': str(e), 'issues': [], } else: logger.warning(f"No analyzer available for {language}") results[language] = { 'status': 'not_supported', 'message': f"Analysis for {language} is not supported yet.", 'issues': [], } return results def _analyze_python(self, repo_path): """ Analyze Python code using pylint. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing Python code in {repo_path}") # Find Python files python_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.py'): python_files.append(os.path.join(root, file)) if not python_files: return { 'status': 'no_files', 'message': 'No Python files found in the repository.', 'issues': [], } # Create a temporary file to store pylint output with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: temp_path = temp_file.name try: # Run pylint with JSON reporter cmd = [ 'pylint', '--output-format=json', '--reports=n', ] + python_files process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse pylint output if process.stdout.strip(): try: issues = json.loads(process.stdout) except json.JSONDecodeError: logger.error(f"Error parsing pylint output: {process.stdout}") issues = [] else: issues = [] # Group issues by type issues_by_type = defaultdict(list) for issue in issues: issue_type = issue.get('type', 'unknown') issues_by_type[issue_type].append(issue) return { 'status': 'success', 'issues': issues, 'issues_by_type': dict(issues_by_type), 'issue_count': len(issues), 'files_analyzed': len(python_files), } except Exception as e: logger.error(f"Error running pylint: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], } finally: # Clean up the temporary file if os.path.exists(temp_path): os.unlink(temp_path) def _analyze_javascript(self, repo_path): """ Analyze JavaScript code using ESLint. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing JavaScript code in {repo_path}") # Find JavaScript files js_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith(('.js', '.jsx')) and not 'node_modules' in root: js_files.append(os.path.join(root, file)) if not js_files: return { 'status': 'no_files', 'message': 'No JavaScript files found in the repository.', 'issues': [], } # Create a temporary ESLint configuration file eslint_config = { "env": { "browser": True, "es2021": True, "node": True }, "extends": "eslint:recommended", "parserOptions": { "ecmaVersion": 12, "sourceType": "module", "ecmaFeatures": { "jsx": True } }, "rules": {} } with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: json.dump(eslint_config, temp_config) temp_config_path = temp_config.name try: # Run ESLint with JSON formatter cmd = [ 'npx', 'eslint', '--config', temp_config_path, '--format', 'json', ] + js_files process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse ESLint output if process.stdout.strip(): try: eslint_results = json.loads(process.stdout) # Extract issues from ESLint results issues = [] for result in eslint_results: file_path = result.get('filePath', '') for message in result.get('messages', []): issues.append({ 'path': file_path, 'line': message.get('line', 0), 'column': message.get('column', 0), 'message': message.get('message', ''), 'severity': message.get('severity', 0), 'ruleId': message.get('ruleId', ''), }) except json.JSONDecodeError: logger.error(f"Error parsing ESLint output: {process.stdout}") issues = [] else: issues = [] # Group issues by severity issues_by_severity = defaultdict(list) for issue in issues: severity = issue.get('severity', 0) severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') issues_by_severity[severity_name].append(issue) return { 'status': 'success', 'issues': issues, 'issues_by_severity': dict(issues_by_severity), 'issue_count': len(issues), 'files_analyzed': len(js_files), } except Exception as e: logger.error(f"Error running ESLint: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], } finally: # Clean up the temporary configuration file if os.path.exists(temp_config_path): os.unlink(temp_config_path) def _analyze_typescript(self, repo_path): """ Analyze TypeScript code using ESLint and TSC. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing TypeScript code in {repo_path}") # Find TypeScript files ts_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith(('.ts', '.tsx')) and not 'node_modules' in root: ts_files.append(os.path.join(root, file)) if not ts_files: return { 'status': 'no_files', 'message': 'No TypeScript files found in the repository.', 'issues': [], } # Create a temporary ESLint configuration file for TypeScript eslint_config = { "env": { "browser": True, "es2021": True, "node": True }, "extends": [ "eslint:recommended", "plugin:@typescript-eslint/recommended" ], "parser": "@typescript-eslint/parser", "parserOptions": { "ecmaVersion": 12, "sourceType": "module", "ecmaFeatures": { "jsx": True } }, "plugins": [ "@typescript-eslint" ], "rules": {} } with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: json.dump(eslint_config, temp_config) temp_config_path = temp_config.name # Create a temporary tsconfig.json file tsconfig = { "compilerOptions": { "target": "es2020", "module": "commonjs", "strict": True, "esModuleInterop": True, "skipLibCheck": True, "forceConsistentCasingInFileNames": True, "noEmit": True }, "include": ts_files } with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_tsconfig: json.dump(tsconfig, temp_tsconfig) temp_tsconfig_path = temp_tsconfig.name try: # Run ESLint with TypeScript support eslint_cmd = [ 'npx', 'eslint', '--config', temp_config_path, '--format', 'json', '--ext', '.ts,.tsx', ] + ts_files eslint_process = subprocess.run( eslint_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse ESLint output eslint_issues = [] if eslint_process.stdout.strip(): try: eslint_results = json.loads(eslint_process.stdout) # Extract issues from ESLint results for result in eslint_results: file_path = result.get('filePath', '') for message in result.get('messages', []): eslint_issues.append({ 'path': file_path, 'line': message.get('line', 0), 'column': message.get('column', 0), 'message': message.get('message', ''), 'severity': message.get('severity', 0), 'ruleId': message.get('ruleId', ''), 'source': 'eslint', }) except json.JSONDecodeError: logger.error(f"Error parsing ESLint output: {eslint_process.stdout}") # Run TypeScript compiler for type checking tsc_cmd = [ 'npx', 'tsc', '--project', temp_tsconfig_path, '--noEmit', ] tsc_process = subprocess.run( tsc_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse TSC output tsc_issues = [] if tsc_process.stderr.strip(): # TSC error format: file.ts(line,col): error TS2551: message for line in tsc_process.stderr.splitlines(): if ': error ' in line or ': warning ' in line: try: file_info, error_info = line.split(':', 1) file_path, line_col = file_info.rsplit('(', 1) line_num, col_num = line_col.rstrip(')').split(',') error_type, error_message = error_info.split(':', 1) error_type = error_type.strip() error_message = error_message.strip() tsc_issues.append({ 'path': file_path, 'line': int(line_num), 'column': int(col_num), 'message': error_message, 'severity': 2 if 'error' in error_type else 1, 'ruleId': error_type, 'source': 'tsc', }) except Exception as e: logger.warning(f"Error parsing TSC output line: {line}, error: {e}") # Combine issues from both tools all_issues = eslint_issues + tsc_issues # Group issues by source and severity issues_by_source = defaultdict(list) issues_by_severity = defaultdict(list) for issue in all_issues: source = issue.get('source', 'unknown') issues_by_source[source].append(issue) severity = issue.get('severity', 0) severity_name = {0: 'off', 1: 'warning', 2: 'error'}.get(severity, 'unknown') issues_by_severity[severity_name].append(issue) return { 'status': 'success', 'issues': all_issues, 'issues_by_source': dict(issues_by_source), 'issues_by_severity': dict(issues_by_severity), 'issue_count': len(all_issues), 'files_analyzed': len(ts_files), } except Exception as e: logger.error(f"Error analyzing TypeScript code: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], } finally: # Clean up temporary files for temp_file in [temp_config_path, temp_tsconfig_path]: if os.path.exists(temp_file): os.unlink(temp_file) def _analyze_java(self, repo_path): """ Analyze Java code using PMD. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing Java code in {repo_path}") # Find Java files java_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.java'): java_files.append(os.path.join(root, file)) if not java_files: return { 'status': 'no_files', 'message': 'No Java files found in the repository.', 'issues': [], } # Create a temporary file to store PMD output with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_file: temp_path = temp_file.name try: # Run PMD with JSON reporter cmd = [ 'pmd', 'check', '--dir', repo_path, '--format', 'json', '--rulesets', 'category/java/bestpractices.xml,category/java/codestyle.xml,category/java/design.xml,category/java/errorprone.xml,category/java/multithreading.xml,category/java/performance.xml,category/java/security.xml', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) # Parse PMD output if process.stdout.strip(): try: pmd_results = json.loads(process.stdout) # Extract issues from PMD results issues = [] for file_result in pmd_results.get('files', []): file_path = file_result.get('filename', '') for violation in file_result.get('violations', []): issues.append({ 'path': file_path, 'line': violation.get('beginline', 0), 'endLine': violation.get('endline', 0), 'column': violation.get('begincolumn', 0), 'endColumn': violation.get('endcolumn', 0), 'message': violation.get('description', ''), 'rule': violation.get('rule', ''), 'ruleset': violation.get('ruleset', ''), 'priority': violation.get('priority', 0), }) except json.JSONDecodeError: logger.error(f"Error parsing PMD output: {process.stdout}") issues = [] else: issues = [] # Group issues by ruleset issues_by_ruleset = defaultdict(list) for issue in issues: ruleset = issue.get('ruleset', 'unknown') issues_by_ruleset[ruleset].append(issue) return { 'status': 'success', 'issues': issues, 'issues_by_ruleset': dict(issues_by_ruleset), 'issue_count': len(issues), 'files_analyzed': len(java_files), } except Exception as e: logger.error(f"Error running PMD: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], } finally: # Clean up the temporary file if os.path.exists(temp_path): os.unlink(temp_path) def _analyze_go(self, repo_path): """ Analyze Go code using golangci-lint. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing Go code in {repo_path}") # Find Go files go_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.go'): go_files.append(os.path.join(root, file)) if not go_files: return { 'status': 'no_files', 'message': 'No Go files found in the repository.', 'issues': [], } try: # Run golangci-lint with JSON output cmd = [ 'golangci-lint', 'run', '--out-format=json', repo_path, ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=repo_path, # Run in the repository directory ) # Parse golangci-lint output if process.stdout.strip(): try: lint_results = json.loads(process.stdout) # Extract issues from golangci-lint results issues = [] for issue in lint_results.get('Issues', []): issues.append({ 'path': issue.get('Pos', {}).get('Filename', ''), 'line': issue.get('Pos', {}).get('Line', 0), 'column': issue.get('Pos', {}).get('Column', 0), 'message': issue.get('Text', ''), 'linter': issue.get('FromLinter', ''), 'severity': 'error' if issue.get('Severity', '') == 'error' else 'warning', }) except json.JSONDecodeError: logger.error(f"Error parsing golangci-lint output: {process.stdout}") issues = [] else: issues = [] # Group issues by linter issues_by_linter = defaultdict(list) for issue in issues: linter = issue.get('linter', 'unknown') issues_by_linter[linter].append(issue) return { 'status': 'success', 'issues': issues, 'issues_by_linter': dict(issues_by_linter), 'issue_count': len(issues), 'files_analyzed': len(go_files), } except Exception as e: logger.error(f"Error running golangci-lint: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], } def _analyze_rust(self, repo_path): """ Analyze Rust code using clippy. Args: repo_path (str): The path to the repository. Returns: dict: Analysis results. """ logger.info(f"Analyzing Rust code in {repo_path}") # Find Rust files rust_files = [] for root, _, files in os.walk(repo_path): for file in files: if file.endswith('.rs'): rust_files.append(os.path.join(root, file)) if not rust_files: return { 'status': 'no_files', 'message': 'No Rust files found in the repository.', 'issues': [], } try: # Run clippy with JSON output cmd = [ 'cargo', 'clippy', '--message-format=json', ] process = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, cwd=repo_path, # Run in the repository directory ) # Parse clippy output issues = [] if process.stdout.strip(): for line in process.stdout.splitlines(): try: message = json.loads(line) if message.get('reason') == 'compiler-message': msg = message.get('message', {}) spans = msg.get('spans', []) if spans: primary_span = next((s for s in spans if s.get('is_primary')), spans[0]) file_path = primary_span.get('file_name', '') line_num = primary_span.get('line_start', 0) column = primary_span.get('column_start', 0) issues.append({ 'path': file_path, 'line': line_num, 'column': column, 'message': msg.get('message', ''), 'level': msg.get('level', ''), 'code': msg.get('code', {}).get('code', ''), }) except json.JSONDecodeError: continue # Group issues by level issues_by_level = defaultdict(list) for issue in issues: level = issue.get('level', 'unknown') issues_by_level[level].append(issue) return { 'status': 'success', 'issues': issues, 'issues_by_level': dict(issues_by_level), 'issue_count': len(issues), 'files_analyzed': len(rust_files), } except Exception as e: logger.error(f"Error running clippy: {e}") return { 'status': 'error', 'error': str(e), 'issues': [], }