code-review-agent / src /services /security_scanner.py
c1r3x's picture
Review Agent: Uploaded remaining files
aa300a4 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Security Scanner Service
This module provides functionality for scanning code for security vulnerabilities.
"""
import os
import subprocess
import logging
import json
import tempfile
from collections import defaultdict
logger = logging.getLogger(__name__)
class SecurityScanner:
"""
Service for scanning code for security vulnerabilities.
"""
def __init__(self):
"""
Initialize the SecurityScanner.
"""
logger.info("Initialized SecurityScanner")
self.scanners = {
'Python': self._scan_python,
'JavaScript': self._scan_javascript,
'TypeScript': self._scan_javascript, # TypeScript uses the same scanner as JavaScript
'Java': self._scan_java,
'Go': self._scan_go,
'Rust': self._scan_rust,
}
def scan_repository(self, repo_path, languages):
"""
Scan a repository for security vulnerabilities in the specified languages.
Args:
repo_path (str): The path to the repository.
languages (list): A list of programming languages to scan.
Returns:
dict: A dictionary containing scan results for each language.
"""
logger.info(f"Scanning repository at {repo_path} for security vulnerabilities in languages: {languages}")
results = {}
# Scan dependencies first (language-agnostic)
results['dependencies'] = self._scan_dependencies(repo_path)
# Scan each language
for language in languages:
if language in self.scanners:
try:
logger.info(f"Scanning {language} code in {repo_path} for security vulnerabilities")
results[language] = self.scanners[language](repo_path)
except Exception as e:
logger.error(f"Error scanning {language} code for security vulnerabilities: {e}")
results[language] = {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
else:
logger.warning(f"No security scanner available for {language}")
results[language] = {
'status': 'not_supported',
'message': f"Security scanning for {language} is not supported yet.",
'vulnerabilities': [],
}
return results
def _scan_dependencies(self, repo_path):
"""
Scan dependencies for known vulnerabilities.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Dependency scan results.
"""
logger.info(f"Scanning dependencies in {repo_path}")
results = {
'python': self._scan_python_dependencies(repo_path),
'javascript': self._scan_javascript_dependencies(repo_path),
'java': self._scan_java_dependencies(repo_path),
'go': self._scan_go_dependencies(repo_path),
'rust': self._scan_rust_dependencies(repo_path),
}
# Aggregate vulnerabilities
all_vulnerabilities = []
for lang_result in results.values():
all_vulnerabilities.extend(lang_result.get('vulnerabilities', []))
return {
'status': 'success',
'vulnerabilities': all_vulnerabilities,
'vulnerability_count': len(all_vulnerabilities),
'language_results': results,
}
def _scan_python_dependencies(self, repo_path):
"""
Scan Python dependencies for known vulnerabilities using safety.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Python dependencies.
"""
logger.info(f"Scanning Python dependencies in {repo_path}")
# Find requirements files
requirements_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file == 'requirements.txt' or file == 'Pipfile' or file == 'Pipfile.lock' or file == 'setup.py':
requirements_files.append(os.path.join(root, file))
if not requirements_files:
return {
'status': 'no_dependencies',
'message': 'No Python dependency files found.',
'vulnerabilities': [],
}
vulnerabilities = []
for req_file in requirements_files:
try:
# Run safety check
cmd = [
'safety',
'check',
'--file', req_file,
'--json',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse safety output
if process.stdout.strip():
try:
safety_results = json.loads(process.stdout)
for vuln in safety_results.get('vulnerabilities', []):
vulnerabilities.append({
'package': vuln.get('package_name', ''),
'installed_version': vuln.get('installed_version', ''),
'affected_versions': vuln.get('vulnerable_spec', ''),
'description': vuln.get('advisory', ''),
'severity': vuln.get('severity', ''),
'file': req_file,
'language': 'Python',
})
except json.JSONDecodeError:
logger.error(f"Error parsing safety output: {process.stdout}")
except Exception as e:
logger.error(f"Error running safety on {req_file}: {e}")
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerability_count': len(vulnerabilities),
'files_scanned': requirements_files,
}
def _scan_javascript_dependencies(self, repo_path):
"""
Scan JavaScript/TypeScript dependencies for known vulnerabilities using npm audit.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for JavaScript dependencies.
"""
logger.info(f"Scanning JavaScript dependencies in {repo_path}")
# Find package.json files
package_files = []
for root, _, files in os.walk(repo_path):
if 'package.json' in files:
package_files.append(os.path.join(root, 'package.json'))
if not package_files:
return {
'status': 'no_dependencies',
'message': 'No JavaScript dependency files found.',
'vulnerabilities': [],
}
vulnerabilities = []
for pkg_file in package_files:
pkg_dir = os.path.dirname(pkg_file)
try:
# Run npm audit
cmd = [
'npm',
'audit',
'--json',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=pkg_dir, # Run in the directory containing package.json
)
# Parse npm audit output
if process.stdout.strip():
try:
audit_results = json.loads(process.stdout)
# Extract vulnerabilities from npm audit results
for vuln_id, vuln_info in audit_results.get('vulnerabilities', {}).items():
vulnerabilities.append({
'package': vuln_info.get('name', ''),
'installed_version': vuln_info.get('version', ''),
'affected_versions': vuln_info.get('range', ''),
'description': vuln_info.get('overview', ''),
'severity': vuln_info.get('severity', ''),
'file': pkg_file,
'language': 'JavaScript',
'cwe': vuln_info.get('cwe', ''),
'recommendation': vuln_info.get('recommendation', ''),
})
except json.JSONDecodeError:
logger.error(f"Error parsing npm audit output: {process.stdout}")
except Exception as e:
logger.error(f"Error running npm audit on {pkg_file}: {e}")
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerability_count': len(vulnerabilities),
'files_scanned': package_files,
}
def _scan_java_dependencies(self, repo_path):
"""
Scan Java dependencies for known vulnerabilities.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Java dependencies.
"""
logger.info(f"Scanning Java dependencies in {repo_path}")
# Find pom.xml or build.gradle files
dependency_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file == 'pom.xml' or file == 'build.gradle':
dependency_files.append(os.path.join(root, file))
if not dependency_files:
return {
'status': 'no_dependencies',
'message': 'No Java dependency files found.',
'vulnerabilities': [],
}
# For now, we'll just return a placeholder since we don't have a direct tool
# In a real implementation, you might use OWASP Dependency Check or similar
return {
'status': 'not_implemented',
'message': 'Java dependency scanning is not fully implemented yet.',
'vulnerabilities': [],
'files_scanned': dependency_files,
}
def _scan_go_dependencies(self, repo_path):
"""
Scan Go dependencies for known vulnerabilities using govulncheck.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Go dependencies.
"""
logger.info(f"Scanning Go dependencies in {repo_path}")
# Check if go.mod exists
go_mod_path = os.path.join(repo_path, 'go.mod')
if not os.path.exists(go_mod_path):
return {
'status': 'no_dependencies',
'message': 'No Go dependency files found.',
'vulnerabilities': [],
}
try:
# Run govulncheck
cmd = [
'govulncheck',
'-json',
'./...',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=repo_path, # Run in the repository directory
)
# Parse govulncheck output
vulnerabilities = []
if process.stdout.strip():
for line in process.stdout.splitlines():
try:
result = json.loads(line)
if 'vulnerability' in result:
vuln = result['vulnerability']
vulnerabilities.append({
'package': vuln.get('package', ''),
'description': vuln.get('details', ''),
'severity': 'high', # govulncheck doesn't provide severity
'file': go_mod_path,
'language': 'Go',
'cve': vuln.get('osv', {}).get('id', ''),
'affected_versions': vuln.get('osv', {}).get('affected', ''),
})
except json.JSONDecodeError:
continue
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerability_count': len(vulnerabilities),
'files_scanned': [go_mod_path],
}
except Exception as e:
logger.error(f"Error running govulncheck: {e}")
return {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
def _scan_rust_dependencies(self, repo_path):
"""
Scan Rust dependencies for known vulnerabilities using cargo-audit.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Rust dependencies.
"""
logger.info(f"Scanning Rust dependencies in {repo_path}")
# Check if Cargo.toml exists
cargo_toml_path = os.path.join(repo_path, 'Cargo.toml')
if not os.path.exists(cargo_toml_path):
return {
'status': 'no_dependencies',
'message': 'No Rust dependency files found.',
'vulnerabilities': [],
}
try:
# Run cargo-audit
cmd = [
'cargo',
'audit',
'--json',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=repo_path, # Run in the repository directory
)
# Parse cargo-audit output
vulnerabilities = []
if process.stdout.strip():
try:
audit_results = json.loads(process.stdout)
for vuln in audit_results.get('vulnerabilities', {}).get('list', []):
vulnerabilities.append({
'package': vuln.get('package', {}).get('name', ''),
'installed_version': vuln.get('package', {}).get('version', ''),
'description': vuln.get('advisory', {}).get('description', ''),
'severity': vuln.get('advisory', {}).get('severity', ''),
'file': cargo_toml_path,
'language': 'Rust',
'cve': vuln.get('advisory', {}).get('id', ''),
})
except json.JSONDecodeError:
logger.error(f"Error parsing cargo-audit output: {process.stdout}")
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerability_count': len(vulnerabilities),
'files_scanned': [cargo_toml_path],
}
except Exception as e:
logger.error(f"Error running cargo-audit: {e}")
return {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
def _scan_python(self, repo_path):
"""
Scan Python code for security vulnerabilities using bandit.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Python code.
"""
logger.info(f"Scanning Python code in {repo_path} for security vulnerabilities")
# Find Python files
python_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.py'):
python_files.append(os.path.join(root, file))
if not python_files:
return {
'status': 'no_files',
'message': 'No Python files found in the repository.',
'vulnerabilities': [],
}
try:
# Run bandit
cmd = [
'bandit',
'-r',
'-f', 'json',
repo_path,
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse bandit output
vulnerabilities = []
if process.stdout.strip():
try:
bandit_results = json.loads(process.stdout)
for result in bandit_results.get('results', []):
vulnerabilities.append({
'file': result.get('filename', ''),
'line': result.get('line_number', 0),
'code': result.get('code', ''),
'issue': result.get('issue_text', ''),
'severity': result.get('issue_severity', ''),
'confidence': result.get('issue_confidence', ''),
'cwe': result.get('cwe', ''),
'test_id': result.get('test_id', ''),
'test_name': result.get('test_name', ''),
'language': 'Python',
})
except json.JSONDecodeError:
logger.error(f"Error parsing bandit output: {process.stdout}")
# Group vulnerabilities by severity
vulns_by_severity = defaultdict(list)
for vuln in vulnerabilities:
severity = vuln.get('severity', 'unknown')
vulns_by_severity[severity].append(vuln)
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerabilities_by_severity': dict(vulns_by_severity),
'vulnerability_count': len(vulnerabilities),
'files_scanned': len(python_files),
}
except Exception as e:
logger.error(f"Error running bandit: {e}")
return {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
def _scan_javascript(self, repo_path):
"""
Scan JavaScript/TypeScript code for security vulnerabilities using NodeJSScan.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for JavaScript/TypeScript code.
"""
logger.info(f"Scanning JavaScript/TypeScript code in {repo_path} for security vulnerabilities")
# Find JavaScript/TypeScript files
js_files = []
for root, _, files in os.walk(repo_path):
if 'node_modules' in root:
continue
for file in files:
if file.endswith(('.js', '.jsx', '.ts', '.tsx')):
js_files.append(os.path.join(root, file))
if not js_files:
return {
'status': 'no_files',
'message': 'No JavaScript/TypeScript files found in the repository.',
'vulnerabilities': [],
}
# For now, we'll use a simplified approach since NodeJSScan might not be available
# In a real implementation, you might use NodeJSScan or similar
# Create a temporary ESLint configuration file with security rules
eslint_config = {
"env": {
"browser": True,
"es2021": True,
"node": True
},
"extends": [
"eslint:recommended",
"plugin:security/recommended"
],
"plugins": [
"security"
],
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module",
"ecmaFeatures": {
"jsx": True
}
},
"rules": {}
}
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config:
json.dump(eslint_config, temp_config)
temp_config_path = temp_config.name
try:
# Run ESLint with security plugin
cmd = [
'npx',
'eslint',
'--config', temp_config_path,
'--format', 'json',
'--plugin', 'security',
] + js_files
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
# Parse ESLint output
vulnerabilities = []
if process.stdout.strip():
try:
eslint_results = json.loads(process.stdout)
for result in eslint_results:
file_path = result.get('filePath', '')
for message in result.get('messages', []):
# Only include security-related issues
rule_id = message.get('ruleId', '')
if rule_id and ('security' in rule_id or 'no-eval' in rule_id or 'no-implied-eval' in rule_id):
vulnerabilities.append({
'file': file_path,
'line': message.get('line', 0),
'column': message.get('column', 0),
'issue': message.get('message', ''),
'severity': 'high' if message.get('severity', 0) == 2 else 'medium',
'rule': rule_id,
'language': 'JavaScript',
})
except json.JSONDecodeError:
logger.error(f"Error parsing ESLint output: {process.stdout}")
# Group vulnerabilities by severity
vulns_by_severity = defaultdict(list)
for vuln in vulnerabilities:
severity = vuln.get('severity', 'unknown')
vulns_by_severity[severity].append(vuln)
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerabilities_by_severity': dict(vulns_by_severity),
'vulnerability_count': len(vulnerabilities),
'files_scanned': len(js_files),
}
except Exception as e:
logger.error(f"Error scanning JavaScript/TypeScript code: {e}")
return {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
finally:
# Clean up the temporary configuration file
if os.path.exists(temp_config_path):
os.unlink(temp_config_path)
def _scan_java(self, repo_path):
"""
Scan Java code for security vulnerabilities.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Java code.
"""
logger.info(f"Scanning Java code in {repo_path} for security vulnerabilities")
# Find Java files
java_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.java'):
java_files.append(os.path.join(root, file))
if not java_files:
return {
'status': 'no_files',
'message': 'No Java files found in the repository.',
'vulnerabilities': [],
}
# For now, we'll just return a placeholder since we don't have a direct tool
# In a real implementation, you might use FindSecBugs or similar
return {
'status': 'not_implemented',
'message': 'Java security scanning is not fully implemented yet.',
'vulnerabilities': [],
'files_scanned': java_files,
}
def _scan_go(self, repo_path):
"""
Scan Go code for security vulnerabilities using gosec.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Go code.
"""
logger.info(f"Scanning Go code in {repo_path} for security vulnerabilities")
# Find Go files
go_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.go'):
go_files.append(os.path.join(root, file))
if not go_files:
return {
'status': 'no_files',
'message': 'No Go files found in the repository.',
'vulnerabilities': [],
}
try:
# Run gosec
cmd = [
'gosec',
'-fmt', 'json',
'-quiet',
'./...',
]
process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
cwd=repo_path, # Run in the repository directory
)
# Parse gosec output
vulnerabilities = []
if process.stdout.strip():
try:
gosec_results = json.loads(process.stdout)
for issue in gosec_results.get('Issues', []):
vulnerabilities.append({
'file': issue.get('file', ''),
'line': issue.get('line', ''),
'code': issue.get('code', ''),
'issue': issue.get('details', ''),
'severity': issue.get('severity', ''),
'confidence': issue.get('confidence', ''),
'cwe': issue.get('cwe', {}).get('ID', ''),
'rule_id': issue.get('rule_id', ''),
'language': 'Go',
})
except json.JSONDecodeError:
logger.error(f"Error parsing gosec output: {process.stdout}")
# Group vulnerabilities by severity
vulns_by_severity = defaultdict(list)
for vuln in vulnerabilities:
severity = vuln.get('severity', 'unknown')
vulns_by_severity[severity].append(vuln)
return {
'status': 'success',
'vulnerabilities': vulnerabilities,
'vulnerabilities_by_severity': dict(vulns_by_severity),
'vulnerability_count': len(vulnerabilities),
'files_scanned': len(go_files),
}
except Exception as e:
logger.error(f"Error running gosec: {e}")
return {
'status': 'error',
'error': str(e),
'vulnerabilities': [],
}
def _scan_rust(self, repo_path):
"""
Scan Rust code for security vulnerabilities.
Args:
repo_path (str): The path to the repository.
Returns:
dict: Scan results for Rust code.
"""
logger.info(f"Scanning Rust code in {repo_path} for security vulnerabilities")
# Find Rust files
rust_files = []
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.rs'):
rust_files.append(os.path.join(root, file))
if not rust_files:
return {
'status': 'no_files',
'message': 'No Rust files found in the repository.',
'vulnerabilities': [],
}
# For now, we'll just return a placeholder since we don't have a direct tool
# In a real implementation, you might use cargo-audit or similar for code scanning
return {
'status': 'not_implemented',
'message': 'Rust security scanning is not fully implemented yet.',
'vulnerabilities': [],
'files_scanned': rust_files,
}