daqc's picture
Upload 36 files
52b089b verified
import requests
from typing import Optional
from smolagents import Tool
class CVEDBTool(Tool):
"""
Tool for searching vulnerabilities using Shodan's CVEDB API (free, no API key required).
"""
name = "cvedb_search"
description = """Tool for searching vulnerabilities using Shodan's CVEDB API (free, no API key required).
REQUIRED PARAMETERS:
- search_type: Must be 'cve' for specific CVE ID or 'product' for product name search
- identifier: The CVE ID or product name to search for
This tool allows searching for vulnerabilities in two ways:
1. For a specific CVE: cvedb_search(search_type="cve", identifier="CVE-2021-44228")
2. For a product: cvedb_search(search_type="product", identifier="microsoft")
CRITICAL - Product name format for product search:
- ALWAYS use ONLY the base product name, NEVER include versions
- CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx"
- INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109"
- When searching, strip version numbers and descriptive words
- Example: "Apache Tomcat 9.0.65" → search for "tomcat"
- Example: "Google Chrome 120.0.6099.109" → search for "chrome"
- Example: "Log4j 2.14.1" → search for "log4j"
CORRECT Usage examples:
- cvedb_search(search_type="cve", identifier="CVE-2021-44228")
- cvedb_search(search_type="product", identifier="microsoft")
- cvedb_search(search_type="product", identifier="mobaxterm")
INCORRECT Usage (will cause errors):
- cvedb_search(identifier="microsoft") # Missing search_type
- cvedb_search(search_type="product") # Missing identifier"""
inputs = {
"search_type": {
"description": "Type of search to perform. Must be 'cve' for specific CVE ID or 'product' for product name search.",
"type": "string",
},
"identifier": {
"description": "The CVE ID (e.g., 'CVE-2021-44228') or product name (e.g., 'microsoft'). For products, use ONLY base product names without versions (e.g., 'microsoft' not 'microsoft windows').",
"type": "string",
},
}
output_type = "string"
def __init__(self):
super().__init__()
def forward(self, search_type: str, identifier: str) -> str:
"""Search for vulnerabilities in CVEDB."""
try:
import requests
if search_type == "cve":
return self._search_by_cve(identifier)
elif search_type == "product":
return self._search_by_product(identifier)
else:
return "Error: search_type must be 'cve' or 'product'"
except Exception as e:
return f"Error searching CVEDB: {str(e)}"
def _search_by_cve(self, cve_id: str) -> str:
"""Search for a specific CVE."""
import requests
url = f"https://cvedb.shodan.io/cve/{cve_id}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data:
return f"No vulnerabilities found for CVE {cve_id}"
result = f"Vulnerability found for {cve_id}:\n\n"
result += f"- CVE ID: {data.get('cve_id', 'Not available')}\n"
result += f"- Summary: {data.get('summary', 'Not available')}\n"
result += f"- CVSS Score: {data.get('cvss', 'Not available')}\n"
result += f"- CVSS Version: {data.get('cvss_version', 'Not available')}\n"
result += f"- CVSS V2: {data.get('cvss_v2', 'Not available')}\n"
result += f"- CVSS V3: {data.get('cvss_v3', 'Not available')}\n"
result += f"- EPSS Score: {data.get('epss', 'Not available')}\n"
result += f"- EPSS Ranking: {data.get('ranking_epss', 'Not available')}\n"
result += f"- Known as exploitable (KEV): {'Yes' if data.get('kev') else 'No'}\n"
result += f"- Propose Action: {data.get('propose_action', 'Not available')}\n"
result += f"- Ransomware Campaign: {data.get('ransomware_campaign', 'Not available')}\n"
result += f"- Publication Date: {data.get('published_time', 'Not available')}\n"
# Show references
references = data.get('references', [])
if references:
result += f"- References ({len(references)} total):\n"
for i, ref in enumerate(references[:10], 1): # Show first 10 references
result += f" {i}. {ref}\n"
if len(references) > 10:
result += f" ... and {len(references) - 10} more references\n"
else:
result += "- References: None\n"
# Show CPE count but not the full list
cpes = data.get('cpes', [])
if cpes:
result += f"- Affected CPEs: {len(cpes)} CPEs found (list omitted due to length)\n"
else:
result += "- Affected CPEs: None\n"
return result
except requests.exceptions.RequestException as e:
return f"Error accessing CVEDB API: {str(e)}"
def _search_by_product(self, product: str) -> str:
"""Search for vulnerabilities by product."""
import requests
# Convert product name to lowercase for consistent search
product = product.lower()
url = f"https://cvedb.shodan.io/cves"
params = {"product": product}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data or not data.get('cves'):
return f"No vulnerabilities found for the product {product}"
result = f"Vulnerabilities found for {product}:\n\n"
# Show all vulnerabilities
for i, vuln in enumerate(data['cves']):
result += f"**Vulnerability {i+1}:**\n"
result += f"- CVE ID: {vuln.get('cve_id', vuln.get('id', 'Not available'))}\n"
result += f"- Summary: {vuln.get('summary', 'Not available')}\n"
result += f"- CVSS Score: {vuln.get('cvss', 'Not available')}\n"
result += f"- CVSS Version: {vuln.get('cvss_version', 'Not available')}\n"
result += f"- CVSS V2: {vuln.get('cvss_v2', 'Not available')}\n"
result += f"- CVSS V3: {vuln.get('cvss_v3', 'Not available')}\n"
result += f"- EPSS Score: {vuln.get('epss', 'Not available')}\n"
result += f"- EPSS Ranking: {vuln.get('ranking_epss', 'Not available')}\n"
result += f"- Known as exploitable (KEV): {'Yes' if vuln.get('kev') else 'No'}\n"
result += f"- Propose Action: {vuln.get('propose_action', 'Not available')}\n"
result += f"- Ransomware Campaign: {vuln.get('ransomware_campaign', 'Not available')}\n"
result += f"- Publication Date: {vuln.get('published_time', 'Not available')}\n"
# Show references count
references = vuln.get('references', [])
if references:
result += f"- References: {len(references)} references available\n"
else:
result += "- References: None\n"
# Show CPE count but not the full list
cpes = vuln.get('cpes', [])
if cpes:
result += f"- Affected CPEs: {len(cpes)} CPEs found (list omitted due to length)\n"
else:
result += "- Affected CPEs: None\n"
result += "\n"
return result
except requests.exceptions.RequestException as e:
return f"Error accessing CVEDB API: {str(e)}"
def _format_context(self, data: dict, search_type: str, identifier: str) -> str:
"""Format the data for context."""
context = []
if search_type == "cve":
if not data:
return f"No vulnerabilities found for CVE {identifier}"
context.append(f"CVSS Score: {data['cvss']}")
context.append(f"EPSS Score: {data['epss']}")
context.append(f"Known as exploitable (KEV): {'Yes' if data.get('kev') else 'No'}")
# Important dates
if data.get('published_time'):
context.append(f"Publication Date: {data['published_time']}")
context.append(f"Summary: {data.get('summary', 'Not available')}")
# Important references
if data.get('references'):
context.append("\nImportant references:")
for ref in data['references'][:3]: # Show first 3 references
context.append(f"- {ref}")
elif search_type == "product":
if not data or not data.get('cves'):
return "No vulnerabilities found for this product."
context.append(f"Found {len(data['cves'])} vulnerabilities for this product.")
context.append("\nMost relevant vulnerabilities:")
# Show all vulnerabilities
for i, vuln in enumerate(data['cves']):
context.append(f"{i+1}. {vuln.get('id', 'Unknown CVE')} - CVSS: {vuln.get('cvss', 'N/A')}")
if len(data['cves']) > 5:
context.append(f"\n... and {len(data['cves']) - 5} more vulnerabilities")
return "\n".join(context)