Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Security Scanner Service | |
This module provides functionality for scanning code for security vulnerabilities. | |
""" | |
import os | |
import subprocess | |
import logging | |
import json | |
import tempfile | |
from collections import defaultdict | |
logger = logging.getLogger(__name__) | |
class SecurityScanner: | |
""" | |
Service for scanning code for security vulnerabilities. | |
""" | |
def __init__(self): | |
""" | |
Initialize the SecurityScanner. | |
""" | |
logger.info("Initialized SecurityScanner") | |
self.scanners = { | |
'Python': self._scan_python, | |
'JavaScript': self._scan_javascript, | |
'TypeScript': self._scan_javascript, # TypeScript uses the same scanner as JavaScript | |
'Java': self._scan_java, | |
'Go': self._scan_go, | |
'Rust': self._scan_rust, | |
} | |
def scan_repository(self, repo_path, languages): | |
""" | |
Scan a repository for security vulnerabilities in the specified languages. | |
Args: | |
repo_path (str): The path to the repository. | |
languages (list): A list of programming languages to scan. | |
Returns: | |
dict: A dictionary containing scan results for each language. | |
""" | |
logger.info(f"Scanning repository at {repo_path} for security vulnerabilities in languages: {languages}") | |
results = {} | |
# Scan dependencies first (language-agnostic) | |
results['dependencies'] = self._scan_dependencies(repo_path) | |
# Scan each language | |
for language in languages: | |
if language in self.scanners: | |
try: | |
logger.info(f"Scanning {language} code in {repo_path} for security vulnerabilities") | |
results[language] = self.scanners[language](repo_path) | |
except Exception as e: | |
logger.error(f"Error scanning {language} code for security vulnerabilities: {e}") | |
results[language] = { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
else: | |
logger.warning(f"No security scanner available for {language}") | |
results[language] = { | |
'status': 'not_supported', | |
'message': f"Security scanning for {language} is not supported yet.", | |
'vulnerabilities': [], | |
} | |
return results | |
def _scan_dependencies(self, repo_path): | |
""" | |
Scan dependencies for known vulnerabilities. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Dependency scan results. | |
""" | |
logger.info(f"Scanning dependencies in {repo_path}") | |
results = { | |
'python': self._scan_python_dependencies(repo_path), | |
'javascript': self._scan_javascript_dependencies(repo_path), | |
'java': self._scan_java_dependencies(repo_path), | |
'go': self._scan_go_dependencies(repo_path), | |
'rust': self._scan_rust_dependencies(repo_path), | |
} | |
# Aggregate vulnerabilities | |
all_vulnerabilities = [] | |
for lang_result in results.values(): | |
all_vulnerabilities.extend(lang_result.get('vulnerabilities', [])) | |
return { | |
'status': 'success', | |
'vulnerabilities': all_vulnerabilities, | |
'vulnerability_count': len(all_vulnerabilities), | |
'language_results': results, | |
} | |
def _scan_python_dependencies(self, repo_path): | |
""" | |
Scan Python dependencies for known vulnerabilities using safety. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Python dependencies. | |
""" | |
logger.info(f"Scanning Python dependencies in {repo_path}") | |
# Find requirements files | |
requirements_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file == 'requirements.txt' or file == 'Pipfile' or file == 'Pipfile.lock' or file == 'setup.py': | |
requirements_files.append(os.path.join(root, file)) | |
if not requirements_files: | |
return { | |
'status': 'no_dependencies', | |
'message': 'No Python dependency files found.', | |
'vulnerabilities': [], | |
} | |
vulnerabilities = [] | |
for req_file in requirements_files: | |
try: | |
# Run safety check | |
cmd = [ | |
'safety', | |
'check', | |
'--file', req_file, | |
'--json', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse safety output | |
if process.stdout.strip(): | |
try: | |
safety_results = json.loads(process.stdout) | |
for vuln in safety_results.get('vulnerabilities', []): | |
vulnerabilities.append({ | |
'package': vuln.get('package_name', ''), | |
'installed_version': vuln.get('installed_version', ''), | |
'affected_versions': vuln.get('vulnerable_spec', ''), | |
'description': vuln.get('advisory', ''), | |
'severity': vuln.get('severity', ''), | |
'file': req_file, | |
'language': 'Python', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing safety output: {process.stdout}") | |
except Exception as e: | |
logger.error(f"Error running safety on {req_file}: {e}") | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': requirements_files, | |
} | |
def _scan_javascript_dependencies(self, repo_path): | |
""" | |
Scan JavaScript/TypeScript dependencies for known vulnerabilities using npm audit. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for JavaScript dependencies. | |
""" | |
logger.info(f"Scanning JavaScript dependencies in {repo_path}") | |
# Find package.json files | |
package_files = [] | |
for root, _, files in os.walk(repo_path): | |
if 'package.json' in files: | |
package_files.append(os.path.join(root, 'package.json')) | |
if not package_files: | |
return { | |
'status': 'no_dependencies', | |
'message': 'No JavaScript dependency files found.', | |
'vulnerabilities': [], | |
} | |
vulnerabilities = [] | |
for pkg_file in package_files: | |
pkg_dir = os.path.dirname(pkg_file) | |
try: | |
# Run npm audit | |
cmd = [ | |
'npm', | |
'audit', | |
'--json', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=pkg_dir, # Run in the directory containing package.json | |
) | |
# Parse npm audit output | |
if process.stdout.strip(): | |
try: | |
audit_results = json.loads(process.stdout) | |
# Extract vulnerabilities from npm audit results | |
for vuln_id, vuln_info in audit_results.get('vulnerabilities', {}).items(): | |
vulnerabilities.append({ | |
'package': vuln_info.get('name', ''), | |
'installed_version': vuln_info.get('version', ''), | |
'affected_versions': vuln_info.get('range', ''), | |
'description': vuln_info.get('overview', ''), | |
'severity': vuln_info.get('severity', ''), | |
'file': pkg_file, | |
'language': 'JavaScript', | |
'cwe': vuln_info.get('cwe', ''), | |
'recommendation': vuln_info.get('recommendation', ''), | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing npm audit output: {process.stdout}") | |
except Exception as e: | |
logger.error(f"Error running npm audit on {pkg_file}: {e}") | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': package_files, | |
} | |
def _scan_java_dependencies(self, repo_path): | |
""" | |
Scan Java dependencies for known vulnerabilities. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Java dependencies. | |
""" | |
logger.info(f"Scanning Java dependencies in {repo_path}") | |
# Find pom.xml or build.gradle files | |
dependency_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file == 'pom.xml' or file == 'build.gradle': | |
dependency_files.append(os.path.join(root, file)) | |
if not dependency_files: | |
return { | |
'status': 'no_dependencies', | |
'message': 'No Java dependency files found.', | |
'vulnerabilities': [], | |
} | |
# For now, we'll just return a placeholder since we don't have a direct tool | |
# In a real implementation, you might use OWASP Dependency Check or similar | |
return { | |
'status': 'not_implemented', | |
'message': 'Java dependency scanning is not fully implemented yet.', | |
'vulnerabilities': [], | |
'files_scanned': dependency_files, | |
} | |
def _scan_go_dependencies(self, repo_path): | |
""" | |
Scan Go dependencies for known vulnerabilities using govulncheck. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Go dependencies. | |
""" | |
logger.info(f"Scanning Go dependencies in {repo_path}") | |
# Check if go.mod exists | |
go_mod_path = os.path.join(repo_path, 'go.mod') | |
if not os.path.exists(go_mod_path): | |
return { | |
'status': 'no_dependencies', | |
'message': 'No Go dependency files found.', | |
'vulnerabilities': [], | |
} | |
try: | |
# Run govulncheck | |
cmd = [ | |
'govulncheck', | |
'-json', | |
'./...', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=repo_path, # Run in the repository directory | |
) | |
# Parse govulncheck output | |
vulnerabilities = [] | |
if process.stdout.strip(): | |
for line in process.stdout.splitlines(): | |
try: | |
result = json.loads(line) | |
if 'vulnerability' in result: | |
vuln = result['vulnerability'] | |
vulnerabilities.append({ | |
'package': vuln.get('package', ''), | |
'description': vuln.get('details', ''), | |
'severity': 'high', # govulncheck doesn't provide severity | |
'file': go_mod_path, | |
'language': 'Go', | |
'cve': vuln.get('osv', {}).get('id', ''), | |
'affected_versions': vuln.get('osv', {}).get('affected', ''), | |
}) | |
except json.JSONDecodeError: | |
continue | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': [go_mod_path], | |
} | |
except Exception as e: | |
logger.error(f"Error running govulncheck: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
def _scan_rust_dependencies(self, repo_path): | |
""" | |
Scan Rust dependencies for known vulnerabilities using cargo-audit. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Rust dependencies. | |
""" | |
logger.info(f"Scanning Rust dependencies in {repo_path}") | |
# Check if Cargo.toml exists | |
cargo_toml_path = os.path.join(repo_path, 'Cargo.toml') | |
if not os.path.exists(cargo_toml_path): | |
return { | |
'status': 'no_dependencies', | |
'message': 'No Rust dependency files found.', | |
'vulnerabilities': [], | |
} | |
try: | |
# Run cargo-audit | |
cmd = [ | |
'cargo', | |
'audit', | |
'--json', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=repo_path, # Run in the repository directory | |
) | |
# Parse cargo-audit output | |
vulnerabilities = [] | |
if process.stdout.strip(): | |
try: | |
audit_results = json.loads(process.stdout) | |
for vuln in audit_results.get('vulnerabilities', {}).get('list', []): | |
vulnerabilities.append({ | |
'package': vuln.get('package', {}).get('name', ''), | |
'installed_version': vuln.get('package', {}).get('version', ''), | |
'description': vuln.get('advisory', {}).get('description', ''), | |
'severity': vuln.get('advisory', {}).get('severity', ''), | |
'file': cargo_toml_path, | |
'language': 'Rust', | |
'cve': vuln.get('advisory', {}).get('id', ''), | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing cargo-audit output: {process.stdout}") | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': [cargo_toml_path], | |
} | |
except Exception as e: | |
logger.error(f"Error running cargo-audit: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
def _scan_python(self, repo_path): | |
""" | |
Scan Python code for security vulnerabilities using bandit. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Python code. | |
""" | |
logger.info(f"Scanning Python code in {repo_path} for security vulnerabilities") | |
# Find Python files | |
python_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.py'): | |
python_files.append(os.path.join(root, file)) | |
if not python_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Python files found in the repository.', | |
'vulnerabilities': [], | |
} | |
try: | |
# Run bandit | |
cmd = [ | |
'bandit', | |
'-r', | |
'-f', 'json', | |
repo_path, | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse bandit output | |
vulnerabilities = [] | |
if process.stdout.strip(): | |
try: | |
bandit_results = json.loads(process.stdout) | |
for result in bandit_results.get('results', []): | |
vulnerabilities.append({ | |
'file': result.get('filename', ''), | |
'line': result.get('line_number', 0), | |
'code': result.get('code', ''), | |
'issue': result.get('issue_text', ''), | |
'severity': result.get('issue_severity', ''), | |
'confidence': result.get('issue_confidence', ''), | |
'cwe': result.get('cwe', ''), | |
'test_id': result.get('test_id', ''), | |
'test_name': result.get('test_name', ''), | |
'language': 'Python', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing bandit output: {process.stdout}") | |
# Group vulnerabilities by severity | |
vulns_by_severity = defaultdict(list) | |
for vuln in vulnerabilities: | |
severity = vuln.get('severity', 'unknown') | |
vulns_by_severity[severity].append(vuln) | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerabilities_by_severity': dict(vulns_by_severity), | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': len(python_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running bandit: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
def _scan_javascript(self, repo_path): | |
""" | |
Scan JavaScript/TypeScript code for security vulnerabilities using NodeJSScan. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for JavaScript/TypeScript code. | |
""" | |
logger.info(f"Scanning JavaScript/TypeScript code in {repo_path} for security vulnerabilities") | |
# Find JavaScript/TypeScript files | |
js_files = [] | |
for root, _, files in os.walk(repo_path): | |
if 'node_modules' in root: | |
continue | |
for file in files: | |
if file.endswith(('.js', '.jsx', '.ts', '.tsx')): | |
js_files.append(os.path.join(root, file)) | |
if not js_files: | |
return { | |
'status': 'no_files', | |
'message': 'No JavaScript/TypeScript files found in the repository.', | |
'vulnerabilities': [], | |
} | |
# For now, we'll use a simplified approach since NodeJSScan might not be available | |
# In a real implementation, you might use NodeJSScan or similar | |
# Create a temporary ESLint configuration file with security rules | |
eslint_config = { | |
"env": { | |
"browser": True, | |
"es2021": True, | |
"node": True | |
}, | |
"extends": [ | |
"eslint:recommended", | |
"plugin:security/recommended" | |
], | |
"plugins": [ | |
"security" | |
], | |
"parserOptions": { | |
"ecmaVersion": 12, | |
"sourceType": "module", | |
"ecmaFeatures": { | |
"jsx": True | |
} | |
}, | |
"rules": {} | |
} | |
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: | |
json.dump(eslint_config, temp_config) | |
temp_config_path = temp_config.name | |
try: | |
# Run ESLint with security plugin | |
cmd = [ | |
'npx', | |
'eslint', | |
'--config', temp_config_path, | |
'--format', 'json', | |
'--plugin', 'security', | |
] + js_files | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
) | |
# Parse ESLint output | |
vulnerabilities = [] | |
if process.stdout.strip(): | |
try: | |
eslint_results = json.loads(process.stdout) | |
for result in eslint_results: | |
file_path = result.get('filePath', '') | |
for message in result.get('messages', []): | |
# Only include security-related issues | |
rule_id = message.get('ruleId', '') | |
if rule_id and ('security' in rule_id or 'no-eval' in rule_id or 'no-implied-eval' in rule_id): | |
vulnerabilities.append({ | |
'file': file_path, | |
'line': message.get('line', 0), | |
'column': message.get('column', 0), | |
'issue': message.get('message', ''), | |
'severity': 'high' if message.get('severity', 0) == 2 else 'medium', | |
'rule': rule_id, | |
'language': 'JavaScript', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing ESLint output: {process.stdout}") | |
# Group vulnerabilities by severity | |
vulns_by_severity = defaultdict(list) | |
for vuln in vulnerabilities: | |
severity = vuln.get('severity', 'unknown') | |
vulns_by_severity[severity].append(vuln) | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerabilities_by_severity': dict(vulns_by_severity), | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': len(js_files), | |
} | |
except Exception as e: | |
logger.error(f"Error scanning JavaScript/TypeScript code: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
finally: | |
# Clean up the temporary configuration file | |
if os.path.exists(temp_config_path): | |
os.unlink(temp_config_path) | |
def _scan_java(self, repo_path): | |
""" | |
Scan Java code for security vulnerabilities. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Java code. | |
""" | |
logger.info(f"Scanning Java code in {repo_path} for security vulnerabilities") | |
# Find Java files | |
java_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.java'): | |
java_files.append(os.path.join(root, file)) | |
if not java_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Java files found in the repository.', | |
'vulnerabilities': [], | |
} | |
# For now, we'll just return a placeholder since we don't have a direct tool | |
# In a real implementation, you might use FindSecBugs or similar | |
return { | |
'status': 'not_implemented', | |
'message': 'Java security scanning is not fully implemented yet.', | |
'vulnerabilities': [], | |
'files_scanned': java_files, | |
} | |
def _scan_go(self, repo_path): | |
""" | |
Scan Go code for security vulnerabilities using gosec. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Go code. | |
""" | |
logger.info(f"Scanning Go code in {repo_path} for security vulnerabilities") | |
# Find Go files | |
go_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.go'): | |
go_files.append(os.path.join(root, file)) | |
if not go_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Go files found in the repository.', | |
'vulnerabilities': [], | |
} | |
try: | |
# Run gosec | |
cmd = [ | |
'gosec', | |
'-fmt', 'json', | |
'-quiet', | |
'./...', | |
] | |
process = subprocess.run( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
check=False, | |
cwd=repo_path, # Run in the repository directory | |
) | |
# Parse gosec output | |
vulnerabilities = [] | |
if process.stdout.strip(): | |
try: | |
gosec_results = json.loads(process.stdout) | |
for issue in gosec_results.get('Issues', []): | |
vulnerabilities.append({ | |
'file': issue.get('file', ''), | |
'line': issue.get('line', ''), | |
'code': issue.get('code', ''), | |
'issue': issue.get('details', ''), | |
'severity': issue.get('severity', ''), | |
'confidence': issue.get('confidence', ''), | |
'cwe': issue.get('cwe', {}).get('ID', ''), | |
'rule_id': issue.get('rule_id', ''), | |
'language': 'Go', | |
}) | |
except json.JSONDecodeError: | |
logger.error(f"Error parsing gosec output: {process.stdout}") | |
# Group vulnerabilities by severity | |
vulns_by_severity = defaultdict(list) | |
for vuln in vulnerabilities: | |
severity = vuln.get('severity', 'unknown') | |
vulns_by_severity[severity].append(vuln) | |
return { | |
'status': 'success', | |
'vulnerabilities': vulnerabilities, | |
'vulnerabilities_by_severity': dict(vulns_by_severity), | |
'vulnerability_count': len(vulnerabilities), | |
'files_scanned': len(go_files), | |
} | |
except Exception as e: | |
logger.error(f"Error running gosec: {e}") | |
return { | |
'status': 'error', | |
'error': str(e), | |
'vulnerabilities': [], | |
} | |
def _scan_rust(self, repo_path): | |
""" | |
Scan Rust code for security vulnerabilities. | |
Args: | |
repo_path (str): The path to the repository. | |
Returns: | |
dict: Scan results for Rust code. | |
""" | |
logger.info(f"Scanning Rust code in {repo_path} for security vulnerabilities") | |
# Find Rust files | |
rust_files = [] | |
for root, _, files in os.walk(repo_path): | |
for file in files: | |
if file.endswith('.rs'): | |
rust_files.append(os.path.join(root, file)) | |
if not rust_files: | |
return { | |
'status': 'no_files', | |
'message': 'No Rust files found in the repository.', | |
'vulnerabilities': [], | |
} | |
# For now, we'll just return a placeholder since we don't have a direct tool | |
# In a real implementation, you might use cargo-audit or similar for code scanning | |
return { | |
'status': 'not_implemented', | |
'message': 'Rust security scanning is not fully implemented yet.', | |
'vulnerabilities': [], | |
'files_scanned': rust_files, | |
} |