daqc's picture
Upload 36 files
52b089b verified
"""Tool for querying the National Vulnerability Database (NVD) of NIST.
This tool allows obtaining detailed information about specific vulnerabilities.
"""
from smolagents import Tool
import requests
import json
from urllib.parse import quote
class NvdTool(Tool):
"""
Tool for querying the National Vulnerability Database (NVD) of NIST.
"""
name = "nvd_search"
description = """Tool for querying the National Vulnerability Database (NVD) of NIST.
This tool allows searching for vulnerabilities in three ways:
1. By CVE ID: nvd_search(search_type="cve", identifier="CVE-2021-44228")
2. By keyword: nvd_search(search_type="keyword", identifier="log4j")
3. By keyword with exact match: nvd_search(search_type="keyword", identifier="log4j", exact_match=True)
CRITICAL - Product name format for keyword search:
- ALWAYS use ONLY the base product name, NEVER include versions
- CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx"
- INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109"
- When searching, strip version numbers and descriptive words
- Example: "Apache Tomcat 9.0.65" → search for "tomcat"
- Example: "Google Chrome 120.0.6099.109" → search for "chrome"
- Example: "Log4j 2.14.1" → search for "log4j"
IMPORTANT: search_type must be EXACTLY 'cve' or 'keyword', not 'product' or any other value.
CORRECT: nvd_search(search_type="keyword", identifier="mobaxterm")
WRONG: nvd_search(search_type="product", identifier="mobaxterm")
The exact_match parameter:
- When True: Searches for CVEs that contain the EXACT phrase in the description
- When False: Searches for CVEs that contain ANY of the words in the description
"""
inputs = {
"search_type": {
"description": "Type of search to perform. Must be 'cve' for specific CVE ID or 'keyword' for keyword search.",
"type": "string",
},
"identifier": {
"description": "The CVE ID (e.g., 'CVE-2021-44228') or keyword (e.g., 'log4j'). For keywords, use ONLY base product names without versions (e.g., 'log4j' not 'log4j 2.14.1').",
"type": "string",
},
"exact_match": {
"description": "For keyword searches, whether to match the exact phrase (True) or any word (False).",
"type": "boolean",
"default": False,
"nullable": True,
},
}
output_type = "string"
def __init__(self):
super().__init__()
self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def forward(self, search_type: str, identifier: str, exact_match: bool = False) -> str:
"""Search for vulnerabilities in NVD."""
try:
if search_type == "cve":
return self._search_by_cve(identifier)
elif search_type == "keyword":
return self._search_by_keyword(identifier, exact_match)
else:
return "Error: search_type must be 'cve' or 'keyword'"
except Exception as e:
return f"Error searching NVD: {str(e)}"
def _search_by_cve(self, cve_id: str) -> str:
"""Search for a specific CVE."""
url = f"{self.base_url}?cveId={cve_id}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data.get('vulnerabilities'):
return f"No vulnerabilities found for {cve_id}"
vuln_data = data['vulnerabilities'][0]['cve']
return self._format_cve_data(vuln_data)
except requests.exceptions.RequestException as e:
return f"Error accessing NVD API: {str(e)}"
def _search_by_keyword(self, keyword: str, exact_match: bool = False) -> str:
"""Search for vulnerabilities by keyword."""
# Convert keyword to lowercase and encode spaces properly
keyword = keyword.lower()
encoded_keyword = quote(keyword)
# Build the URL based on exact_match parameter
if exact_match:
# For exact match, use keywordExactMatch parameter
url = f"{self.base_url}?keywordSearch={encoded_keyword}&keywordExactMatch"
else:
# For partial match, just use keywordSearch
url = f"{self.base_url}?keywordSearch={encoded_keyword}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data.get('vulnerabilities'):
return f"No vulnerabilities found for '{keyword}'"
vulnerabilities = data['vulnerabilities']
context = []
context.append(f"Found {len(vulnerabilities)} vulnerabilities for search '{keyword}'")
context.append("\nVulnerabilities found:")
# Show all vulnerabilities
for i, vuln in enumerate(vulnerabilities):
vuln_data = vuln['cve']
context.append(f"\n--- Vulnerability {i+1} ---")
context.append(f"CVE ID: {vuln_data.get('id', 'Not available')}")
context.append(f"Source Identifier: {vuln_data.get('sourceIdentifier', 'Not available')}")
context.append(f"Vulnerability Status: {vuln_data.get('vulnStatus', 'Not available')}")
# CVE Tags
cve_tags = vuln_data.get('cveTags', [])
if cve_tags:
context.append(f"CVE Tags: {', '.join(cve_tags)}")
else:
context.append("CVE Tags: None")
# Description - show all languages
descriptions = vuln_data.get('descriptions', [])
if descriptions:
for desc in descriptions:
lang = desc.get('lang', 'unknown')
value = desc.get('value', 'Not available')
context.append(f"Description ({lang}): {value}")
else:
context.append("Description: Not available")
# CVSS Metrics - show all available versions
metrics = vuln_data.get('metrics', {})
if metrics:
context.append("\nCVSS Metrics:")
# CVSS V3.1
if 'cvssMetricV31' in metrics:
cvss_data = metrics['cvssMetricV31'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V3.1:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Attack Vector: {cvss_info['attackVector']}")
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
context.append(f" User Interaction: {cvss_info['userInteraction']}")
context.append(f" Scope: {cvss_info['scope']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
# CVSS V3.0
if 'cvssMetricV30' in metrics:
cvss_data = metrics['cvssMetricV30'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V3.0:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Attack Vector: {cvss_info['attackVector']}")
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
context.append(f" User Interaction: {cvss_info['userInteraction']}")
context.append(f" Scope: {cvss_info['scope']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
# CVSS V2
if 'cvssMetricV2' in metrics:
cvss_data = metrics['cvssMetricV2'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V2:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_data['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Access Vector: {cvss_info['accessVector']}")
context.append(f" Access Complexity: {cvss_info['accessComplexity']}")
context.append(f" Authentication: {cvss_info['authentication']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
else:
context.append("CVSS Metrics: Not available")
# Weaknesses - show all CWEs
weaknesses = vuln_data.get('weaknesses', [])
if weaknesses:
context.append("\nWeaknesses:")
for weakness in weaknesses:
source = weakness.get('source', 'Unknown')
w_type = weakness.get('type', 'Unknown')
context.append(f" Source: {source}, Type: {w_type}")
descriptions = weakness.get('description', [])
for desc in descriptions:
lang = desc.get('lang', 'unknown')
value = desc.get('value', 'Not available')
context.append(f" CWE ({lang}): {value}")
else:
context.append("\nWeaknesses: Not available")
# Configurations
configurations = vuln_data.get('configurations', [])
if configurations:
context.append("\nAffected Configurations:")
for config in configurations:
nodes = config.get('nodes', [])
for node in nodes:
operator = node.get('operator', 'Unknown')
negate = node.get('negate', False)
context.append(f" Operator: {operator}, Negate: {negate}")
cpe_matches = node.get('cpeMatch', [])
for match in cpe_matches:
vulnerable = match.get('vulnerable', False)
criteria = match.get('criteria', 'Not available')
version_end = match.get('versionEndIncluding', 'Not specified')
context.append(f" CPE: {criteria}")
context.append(f" Vulnerable: {vulnerable}")
if version_end != 'Not specified':
context.append(f" Version End Including: {version_end}")
else:
context.append("\nAffected Configurations: Not available")
# Dates
context.append(f"\nPublished: {vuln_data.get('published', 'Not available')}")
context.append(f"Last Modified: {vuln_data.get('lastModified', 'Not available')}")
# References - show all with tags
references = vuln_data.get('references', [])
if references:
context.append("\nReferences:")
for ref in references:
url = ref.get('url', 'Not available')
source = ref.get('source', 'Unknown')
tags = ref.get('tags', [])
context.append(f" URL: {url}")
context.append(f" Source: {source}")
if tags:
context.append(f" Tags: {', '.join(tags)}")
context.append("")
else:
context.append("\nReferences: Not available")
context.append("-" * 50) # Separator between vulnerabilities
return "\n".join(context)
except requests.exceptions.RequestException as e:
return f"Error accessing NVD API: {str(e)}"
def _format_cve_data(self, vuln_data: dict) -> str:
"""Format CVE data for display."""
context = []
context.append(f"CVE ID: {vuln_data.get('id', 'Not available')}")
context.append(f"Source Identifier: {vuln_data.get('sourceIdentifier', 'Not available')}")
context.append(f"Vulnerability Status: {vuln_data.get('vulnStatus', 'Not available')}")
# CVE Tags
cve_tags = vuln_data.get('cveTags', [])
if cve_tags:
context.append(f"CVE Tags: {', '.join(cve_tags)}")
else:
context.append("CVE Tags: None")
# Description - show all languages
descriptions = vuln_data.get('descriptions', [])
if descriptions:
for desc in descriptions:
lang = desc.get('lang', 'unknown')
value = desc.get('value', 'Not available')
context.append(f"Description ({lang}): {value}")
else:
context.append("Description: Not available")
# CVSS Metrics - show all available versions
metrics = vuln_data.get('metrics', {})
if metrics:
context.append("\nCVSS Metrics:")
# CVSS V3.1
if 'cvssMetricV31' in metrics:
cvss_data = metrics['cvssMetricV31'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V3.1:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Attack Vector: {cvss_info['attackVector']}")
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
context.append(f" User Interaction: {cvss_info['userInteraction']}")
context.append(f" Scope: {cvss_info['scope']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
# CVSS V3.0
if 'cvssMetricV30' in metrics:
cvss_data = metrics['cvssMetricV30'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V3.0:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Attack Vector: {cvss_info['attackVector']}")
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
context.append(f" User Interaction: {cvss_info['userInteraction']}")
context.append(f" Scope: {cvss_info['scope']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
# CVSS V2
if 'cvssMetricV2' in metrics:
cvss_data = metrics['cvssMetricV2'][0]
cvss_info = cvss_data['cvssData']
context.append(f"CVSS V2:")
context.append(f" Base Score: {cvss_info['baseScore']}")
context.append(f" Base Severity: {cvss_data['baseSeverity']}")
context.append(f" Vector String: {cvss_info['vectorString']}")
context.append(f" Access Vector: {cvss_info['accessVector']}")
context.append(f" Access Complexity: {cvss_info['accessComplexity']}")
context.append(f" Authentication: {cvss_info['authentication']}")
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
context.append(f" Impact Score: {cvss_data['impactScore']}")
else:
context.append("CVSS Metrics: Not available")
# Weaknesses - show all CWEs
weaknesses = vuln_data.get('weaknesses', [])
if weaknesses:
context.append("\nWeaknesses:")
for weakness in weaknesses:
source = weakness.get('source', 'Unknown')
w_type = weakness.get('type', 'Unknown')
context.append(f" Source: {source}, Type: {w_type}")
descriptions = weakness.get('description', [])
for desc in descriptions:
lang = desc.get('lang', 'unknown')
value = desc.get('value', 'Not available')
context.append(f" CWE ({lang}): {value}")
else:
context.append("\nWeaknesses: Not available")
# Configurations
configurations = vuln_data.get('configurations', [])
if configurations:
context.append("\nAffected Configurations:")
for config in configurations:
nodes = config.get('nodes', [])
for node in nodes:
operator = node.get('operator', 'Unknown')
negate = node.get('negate', False)
context.append(f" Operator: {operator}, Negate: {negate}")
cpe_matches = node.get('cpeMatch', [])
for match in cpe_matches:
vulnerable = match.get('vulnerable', False)
criteria = match.get('criteria', 'Not available')
version_end = match.get('versionEndIncluding', 'Not specified')
context.append(f" CPE: {criteria}")
context.append(f" Vulnerable: {vulnerable}")
if version_end != 'Not specified':
context.append(f" Version End Including: {version_end}")
else:
context.append("\nAffected Configurations: Not available")
# Dates
context.append(f"\nPublished: {vuln_data.get('published', 'Not available')}")
context.append(f"Last Modified: {vuln_data.get('lastModified', 'Not available')}")
# References - show all with tags
references = vuln_data.get('references', [])
if references:
context.append("\nReferences:")
for ref in references:
url = ref.get('url', 'Not available')
source = ref.get('source', 'Unknown')
tags = ref.get('tags', [])
context.append(f" URL: {url}")
context.append(f" Source: {source}")
if tags:
context.append(f" Tags: {', '.join(tags)}")
context.append("")
else:
context.append("\nReferences: Not available")
return "\n".join(context)