|
"""Tool for querying the National Vulnerability Database (NVD) of NIST.
|
|
This tool allows obtaining detailed information about specific vulnerabilities.
|
|
"""
|
|
|
|
from smolagents import Tool
|
|
import requests
|
|
import json
|
|
from urllib.parse import quote
|
|
|
|
class NvdTool(Tool):
|
|
"""
|
|
Tool for querying the National Vulnerability Database (NVD) of NIST.
|
|
"""
|
|
|
|
name = "nvd_search"
|
|
description = """Tool for querying the National Vulnerability Database (NVD) of NIST.
|
|
This tool allows searching for vulnerabilities in three ways:
|
|
1. By CVE ID: nvd_search(search_type="cve", identifier="CVE-2021-44228")
|
|
2. By keyword: nvd_search(search_type="keyword", identifier="log4j")
|
|
3. By keyword with exact match: nvd_search(search_type="keyword", identifier="log4j", exact_match=True)
|
|
|
|
CRITICAL - Product name format for keyword search:
|
|
- ALWAYS use ONLY the base product name, NEVER include versions
|
|
- CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx"
|
|
- INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109"
|
|
- When searching, strip version numbers and descriptive words
|
|
- Example: "Apache Tomcat 9.0.65" → search for "tomcat"
|
|
- Example: "Google Chrome 120.0.6099.109" → search for "chrome"
|
|
- Example: "Log4j 2.14.1" → search for "log4j"
|
|
|
|
IMPORTANT: search_type must be EXACTLY 'cve' or 'keyword', not 'product' or any other value.
|
|
CORRECT: nvd_search(search_type="keyword", identifier="mobaxterm")
|
|
WRONG: nvd_search(search_type="product", identifier="mobaxterm")
|
|
|
|
The exact_match parameter:
|
|
- When True: Searches for CVEs that contain the EXACT phrase in the description
|
|
- When False: Searches for CVEs that contain ANY of the words in the description
|
|
"""
|
|
|
|
inputs = {
|
|
"search_type": {
|
|
"description": "Type of search to perform. Must be 'cve' for specific CVE ID or 'keyword' for keyword search.",
|
|
"type": "string",
|
|
},
|
|
"identifier": {
|
|
"description": "The CVE ID (e.g., 'CVE-2021-44228') or keyword (e.g., 'log4j'). For keywords, use ONLY base product names without versions (e.g., 'log4j' not 'log4j 2.14.1').",
|
|
"type": "string",
|
|
},
|
|
"exact_match": {
|
|
"description": "For keyword searches, whether to match the exact phrase (True) or any word (False).",
|
|
"type": "boolean",
|
|
"default": False,
|
|
"nullable": True,
|
|
},
|
|
}
|
|
|
|
output_type = "string"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
|
|
|
|
def forward(self, search_type: str, identifier: str, exact_match: bool = False) -> str:
|
|
"""Search for vulnerabilities in NVD."""
|
|
try:
|
|
if search_type == "cve":
|
|
return self._search_by_cve(identifier)
|
|
elif search_type == "keyword":
|
|
return self._search_by_keyword(identifier, exact_match)
|
|
else:
|
|
return "Error: search_type must be 'cve' or 'keyword'"
|
|
|
|
except Exception as e:
|
|
return f"Error searching NVD: {str(e)}"
|
|
|
|
def _search_by_cve(self, cve_id: str) -> str:
|
|
"""Search for a specific CVE."""
|
|
url = f"{self.base_url}?cveId={cve_id}"
|
|
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data.get('vulnerabilities'):
|
|
return f"No vulnerabilities found for {cve_id}"
|
|
|
|
vuln_data = data['vulnerabilities'][0]['cve']
|
|
return self._format_cve_data(vuln_data)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error accessing NVD API: {str(e)}"
|
|
|
|
def _search_by_keyword(self, keyword: str, exact_match: bool = False) -> str:
|
|
"""Search for vulnerabilities by keyword."""
|
|
|
|
keyword = keyword.lower()
|
|
encoded_keyword = quote(keyword)
|
|
|
|
|
|
if exact_match:
|
|
|
|
url = f"{self.base_url}?keywordSearch={encoded_keyword}&keywordExactMatch"
|
|
else:
|
|
|
|
url = f"{self.base_url}?keywordSearch={encoded_keyword}"
|
|
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data.get('vulnerabilities'):
|
|
return f"No vulnerabilities found for '{keyword}'"
|
|
|
|
vulnerabilities = data['vulnerabilities']
|
|
context = []
|
|
context.append(f"Found {len(vulnerabilities)} vulnerabilities for search '{keyword}'")
|
|
context.append("\nVulnerabilities found:")
|
|
|
|
|
|
for i, vuln in enumerate(vulnerabilities):
|
|
vuln_data = vuln['cve']
|
|
context.append(f"\n--- Vulnerability {i+1} ---")
|
|
context.append(f"CVE ID: {vuln_data.get('id', 'Not available')}")
|
|
context.append(f"Source Identifier: {vuln_data.get('sourceIdentifier', 'Not available')}")
|
|
context.append(f"Vulnerability Status: {vuln_data.get('vulnStatus', 'Not available')}")
|
|
|
|
|
|
cve_tags = vuln_data.get('cveTags', [])
|
|
if cve_tags:
|
|
context.append(f"CVE Tags: {', '.join(cve_tags)}")
|
|
else:
|
|
context.append("CVE Tags: None")
|
|
|
|
|
|
descriptions = vuln_data.get('descriptions', [])
|
|
if descriptions:
|
|
for desc in descriptions:
|
|
lang = desc.get('lang', 'unknown')
|
|
value = desc.get('value', 'Not available')
|
|
context.append(f"Description ({lang}): {value}")
|
|
else:
|
|
context.append("Description: Not available")
|
|
|
|
|
|
metrics = vuln_data.get('metrics', {})
|
|
if metrics:
|
|
context.append("\nCVSS Metrics:")
|
|
|
|
|
|
if 'cvssMetricV31' in metrics:
|
|
cvss_data = metrics['cvssMetricV31'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V3.1:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Attack Vector: {cvss_info['attackVector']}")
|
|
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
|
|
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
|
|
context.append(f" User Interaction: {cvss_info['userInteraction']}")
|
|
context.append(f" Scope: {cvss_info['scope']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
|
|
|
|
if 'cvssMetricV30' in metrics:
|
|
cvss_data = metrics['cvssMetricV30'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V3.0:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Attack Vector: {cvss_info['attackVector']}")
|
|
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
|
|
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
|
|
context.append(f" User Interaction: {cvss_info['userInteraction']}")
|
|
context.append(f" Scope: {cvss_info['scope']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
|
|
|
|
if 'cvssMetricV2' in metrics:
|
|
cvss_data = metrics['cvssMetricV2'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V2:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_data['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Access Vector: {cvss_info['accessVector']}")
|
|
context.append(f" Access Complexity: {cvss_info['accessComplexity']}")
|
|
context.append(f" Authentication: {cvss_info['authentication']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
else:
|
|
context.append("CVSS Metrics: Not available")
|
|
|
|
|
|
weaknesses = vuln_data.get('weaknesses', [])
|
|
if weaknesses:
|
|
context.append("\nWeaknesses:")
|
|
for weakness in weaknesses:
|
|
source = weakness.get('source', 'Unknown')
|
|
w_type = weakness.get('type', 'Unknown')
|
|
context.append(f" Source: {source}, Type: {w_type}")
|
|
descriptions = weakness.get('description', [])
|
|
for desc in descriptions:
|
|
lang = desc.get('lang', 'unknown')
|
|
value = desc.get('value', 'Not available')
|
|
context.append(f" CWE ({lang}): {value}")
|
|
else:
|
|
context.append("\nWeaknesses: Not available")
|
|
|
|
|
|
configurations = vuln_data.get('configurations', [])
|
|
if configurations:
|
|
context.append("\nAffected Configurations:")
|
|
for config in configurations:
|
|
nodes = config.get('nodes', [])
|
|
for node in nodes:
|
|
operator = node.get('operator', 'Unknown')
|
|
negate = node.get('negate', False)
|
|
context.append(f" Operator: {operator}, Negate: {negate}")
|
|
cpe_matches = node.get('cpeMatch', [])
|
|
for match in cpe_matches:
|
|
vulnerable = match.get('vulnerable', False)
|
|
criteria = match.get('criteria', 'Not available')
|
|
version_end = match.get('versionEndIncluding', 'Not specified')
|
|
context.append(f" CPE: {criteria}")
|
|
context.append(f" Vulnerable: {vulnerable}")
|
|
if version_end != 'Not specified':
|
|
context.append(f" Version End Including: {version_end}")
|
|
else:
|
|
context.append("\nAffected Configurations: Not available")
|
|
|
|
|
|
context.append(f"\nPublished: {vuln_data.get('published', 'Not available')}")
|
|
context.append(f"Last Modified: {vuln_data.get('lastModified', 'Not available')}")
|
|
|
|
|
|
references = vuln_data.get('references', [])
|
|
if references:
|
|
context.append("\nReferences:")
|
|
for ref in references:
|
|
url = ref.get('url', 'Not available')
|
|
source = ref.get('source', 'Unknown')
|
|
tags = ref.get('tags', [])
|
|
context.append(f" URL: {url}")
|
|
context.append(f" Source: {source}")
|
|
if tags:
|
|
context.append(f" Tags: {', '.join(tags)}")
|
|
context.append("")
|
|
else:
|
|
context.append("\nReferences: Not available")
|
|
|
|
context.append("-" * 50)
|
|
|
|
return "\n".join(context)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error accessing NVD API: {str(e)}"
|
|
|
|
def _format_cve_data(self, vuln_data: dict) -> str:
|
|
"""Format CVE data for display."""
|
|
context = []
|
|
|
|
context.append(f"CVE ID: {vuln_data.get('id', 'Not available')}")
|
|
context.append(f"Source Identifier: {vuln_data.get('sourceIdentifier', 'Not available')}")
|
|
context.append(f"Vulnerability Status: {vuln_data.get('vulnStatus', 'Not available')}")
|
|
|
|
|
|
cve_tags = vuln_data.get('cveTags', [])
|
|
if cve_tags:
|
|
context.append(f"CVE Tags: {', '.join(cve_tags)}")
|
|
else:
|
|
context.append("CVE Tags: None")
|
|
|
|
|
|
descriptions = vuln_data.get('descriptions', [])
|
|
if descriptions:
|
|
for desc in descriptions:
|
|
lang = desc.get('lang', 'unknown')
|
|
value = desc.get('value', 'Not available')
|
|
context.append(f"Description ({lang}): {value}")
|
|
else:
|
|
context.append("Description: Not available")
|
|
|
|
|
|
metrics = vuln_data.get('metrics', {})
|
|
if metrics:
|
|
context.append("\nCVSS Metrics:")
|
|
|
|
|
|
if 'cvssMetricV31' in metrics:
|
|
cvss_data = metrics['cvssMetricV31'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V3.1:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Attack Vector: {cvss_info['attackVector']}")
|
|
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
|
|
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
|
|
context.append(f" User Interaction: {cvss_info['userInteraction']}")
|
|
context.append(f" Scope: {cvss_info['scope']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
|
|
|
|
if 'cvssMetricV30' in metrics:
|
|
cvss_data = metrics['cvssMetricV30'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V3.0:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_info['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Attack Vector: {cvss_info['attackVector']}")
|
|
context.append(f" Attack Complexity: {cvss_info['attackComplexity']}")
|
|
context.append(f" Privileges Required: {cvss_info['privilegesRequired']}")
|
|
context.append(f" User Interaction: {cvss_info['userInteraction']}")
|
|
context.append(f" Scope: {cvss_info['scope']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
|
|
|
|
if 'cvssMetricV2' in metrics:
|
|
cvss_data = metrics['cvssMetricV2'][0]
|
|
cvss_info = cvss_data['cvssData']
|
|
context.append(f"CVSS V2:")
|
|
context.append(f" Base Score: {cvss_info['baseScore']}")
|
|
context.append(f" Base Severity: {cvss_data['baseSeverity']}")
|
|
context.append(f" Vector String: {cvss_info['vectorString']}")
|
|
context.append(f" Access Vector: {cvss_info['accessVector']}")
|
|
context.append(f" Access Complexity: {cvss_info['accessComplexity']}")
|
|
context.append(f" Authentication: {cvss_info['authentication']}")
|
|
context.append(f" Confidentiality Impact: {cvss_info['confidentialityImpact']}")
|
|
context.append(f" Integrity Impact: {cvss_info['integrityImpact']}")
|
|
context.append(f" Availability Impact: {cvss_info['availabilityImpact']}")
|
|
context.append(f" Exploitability Score: {cvss_data['exploitabilityScore']}")
|
|
context.append(f" Impact Score: {cvss_data['impactScore']}")
|
|
else:
|
|
context.append("CVSS Metrics: Not available")
|
|
|
|
|
|
weaknesses = vuln_data.get('weaknesses', [])
|
|
if weaknesses:
|
|
context.append("\nWeaknesses:")
|
|
for weakness in weaknesses:
|
|
source = weakness.get('source', 'Unknown')
|
|
w_type = weakness.get('type', 'Unknown')
|
|
context.append(f" Source: {source}, Type: {w_type}")
|
|
descriptions = weakness.get('description', [])
|
|
for desc in descriptions:
|
|
lang = desc.get('lang', 'unknown')
|
|
value = desc.get('value', 'Not available')
|
|
context.append(f" CWE ({lang}): {value}")
|
|
else:
|
|
context.append("\nWeaknesses: Not available")
|
|
|
|
|
|
configurations = vuln_data.get('configurations', [])
|
|
if configurations:
|
|
context.append("\nAffected Configurations:")
|
|
for config in configurations:
|
|
nodes = config.get('nodes', [])
|
|
for node in nodes:
|
|
operator = node.get('operator', 'Unknown')
|
|
negate = node.get('negate', False)
|
|
context.append(f" Operator: {operator}, Negate: {negate}")
|
|
cpe_matches = node.get('cpeMatch', [])
|
|
for match in cpe_matches:
|
|
vulnerable = match.get('vulnerable', False)
|
|
criteria = match.get('criteria', 'Not available')
|
|
version_end = match.get('versionEndIncluding', 'Not specified')
|
|
context.append(f" CPE: {criteria}")
|
|
context.append(f" Vulnerable: {vulnerable}")
|
|
if version_end != 'Not specified':
|
|
context.append(f" Version End Including: {version_end}")
|
|
else:
|
|
context.append("\nAffected Configurations: Not available")
|
|
|
|
|
|
context.append(f"\nPublished: {vuln_data.get('published', 'Not available')}")
|
|
context.append(f"Last Modified: {vuln_data.get('lastModified', 'Not available')}")
|
|
|
|
|
|
references = vuln_data.get('references', [])
|
|
if references:
|
|
context.append("\nReferences:")
|
|
for ref in references:
|
|
url = ref.get('url', 'Not available')
|
|
source = ref.get('source', 'Unknown')
|
|
tags = ref.get('tags', [])
|
|
context.append(f" URL: {url}")
|
|
context.append(f" Source: {source}")
|
|
if tags:
|
|
context.append(f" Tags: {', '.join(tags)}")
|
|
context.append("")
|
|
else:
|
|
context.append("\nReferences: Not available")
|
|
|
|
return "\n".join(context) |