File size: 10,848 Bytes
52b089b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
"""Tool for searching known exploited vulnerabilities (KEV) using the KEVin API."""
from smolagents import Tool
import requests
import json
class KevinTool(Tool):
"""
Tool for searching known exploited vulnerabilities (KEV) using the KEVin API.
"""
name = "kevin_search"
description = """Tool for searching known exploited vulnerabilities (KEV) using the KEVin API.
This tool allows searching for known exploited vulnerabilities in multiple ways:
- By CVE ID: kevin_search(search_type="cve", identifier="CVE-2021-44228")
- By keyword: kevin_search(search_type="keyword", identifier="log4j")
- Check if CVE is in KEV: kevin_search(search_type="exists", identifier="CVE-2023-22527")
- Filter by ransomware: kevin_search(search_type="ransomware")
CRITICAL - Product name format for keyword search:
- ALWAYS use ONLY the base product name, NEVER include versions
- CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx"
- INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109"
- When searching, strip version numbers and descriptive words
- Example: "Apache Tomcat 9.0.65" → search for "tomcat"
- Example: "Google Chrome 120.0.6099.109" → search for "chrome"
- Example: "Log4j 2.14.1" → search for "log4j"
IMPORTANT: The "exists" search type checks if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database.
A CVE can exist in NVD or other databases but not be in KEV if it hasn't been actively exploited.
Usage examples:
- kevin_search(search_type="cve", identifier="CVE-2021-44228")
- kevin_search(search_type="keyword", identifier="log4j")
- kevin_search(search_type="exists", identifier="CVE-2023-22527")
- kevin_search(search_type="ransomware")"""
inputs = {
"search_type": {
"description": "Type of search to perform. Must be 'cve' for specific CVE ID, 'keyword' for keyword search, 'exists' to check if CVE is in KEV database, or 'ransomware' to filter ransomware vulnerabilities.",
"type": "string",
},
"identifier": {
"description": "The CVE ID (e.g., 'CVE-2021-44228') or keyword (e.g., 'log4j'). For keywords, use ONLY base product names without versions (e.g., 'log4j' not 'log4j 2.14.1'). Not required for 'ransomware' search type.",
"type": "string",
"nullable": True,
},
}
output_type = "string"
def __init__(self):
super().__init__()
self.base_url = "https://kevin.gtfkd.com"
def forward(self, search_type: str, identifier: str = None) -> str:
"""Search for known exploited vulnerabilities."""
try:
if search_type == "cve":
if not identifier:
return "Error: identifier is required for CVE search"
return self._search_by_cve(identifier)
elif search_type == "keyword":
if not identifier:
return "Error: identifier is required for keyword search"
return self._search_by_keyword(identifier)
elif search_type == "exists":
if not identifier:
return "Error: identifier is required for exists check"
return self._check_exists(identifier)
elif search_type == "ransomware":
return self._search_ransomware()
else:
return "Error: search_type must be 'cve', 'keyword', 'exists', or 'ransomware'"
except Exception as e:
return f"Error searching KEVin: {str(e)}"
def _check_exists(self, cve_id: str) -> str:
"""Check if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database."""
url = f"{self.base_url}/kev/exists"
params = {"cve": cve_id}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if data:
return f"✅ CVE {cve_id} IS listed in the Known Exploited Vulnerabilities (KEV) database - this means it has been actively exploited in the wild."
else:
return f"❌ CVE {cve_id} is NOT listed in the Known Exploited Vulnerabilities (KEV) database - this means it has not been reported as actively exploited (though it may exist in other vulnerability databases)."
except requests.exceptions.RequestException as e:
return f"Error checking KEVin existence: {str(e)}"
def _search_ransomware(self) -> str:
"""Search for vulnerabilities known to be used in ransomware."""
url = f"{self.base_url}/kev"
params = {"filter": "ransomware"}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data:
return "No ransomware-related vulnerabilities found in KEV database."
context = []
context.append(f"Found {len(data)} ransomware-related vulnerabilities in KEV database.")
context.append("\nRansomware vulnerabilities:")
# Show all found vulnerabilities
for i, vuln in enumerate(data):
context.append(f"\n--- Vulnerability {i+1} ---")
context.append(f"CVE ID: {vuln.get('cveID', 'Not available')}")
context.append(f"Vendor: {vuln.get('vendorProject', 'Not available')}")
context.append(f"Product: {vuln.get('product', 'Not available')}")
context.append(f"Vulnerability Name: {vuln.get('vulnerabilityName', 'Not available')}")
# Important dates
context.append(f"Date Added: {vuln.get('dateAdded', 'Not available')}")
if vuln.get('dueDate'):
context.append(f"Due Date: {vuln['dueDate']}")
context.append(f"Short Description: {vuln.get('shortDescription', 'Not available')}")
context.append(f"Required Action: {vuln.get('requiredAction', 'Not available')}")
if vuln.get('notes'):
context.append(f"Notes: {vuln['notes']}")
context.append("-" * 50) # Separator between vulnerabilities
return "\n".join(context)
except requests.exceptions.RequestException as e:
return f"Error accessing KEVin API: {str(e)}"
def _search_by_cve(self, cve_id: str) -> str:
"""Search for a specific CVE in KEVin."""
url = f"{self.base_url}/kev/{cve_id}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if not data:
return f"No vulnerabilities found for CVE {cve_id}"
result = f"Known exploited vulnerability found for {cve_id}:\n\n"
result += f"- CVE ID: {data.get('cveID', 'Not available')}\n"
result += f"- Vendor: {data.get('vendorProject', 'Not available')}\n"
result += f"- Product: {data.get('product', 'Not available')}\n"
result += f"- Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}\n"
result += f"- Date Added: {data.get('dateAdded', 'Not available')}\n"
result += f"- Short Description: {data.get('shortDescription', 'Not available')}\n"
result += f"- Required Action: {data.get('requiredAction', 'Not available')}\n"
result += f"- Due Date: {data.get('dueDate', 'Not available')}\n"
result += f"- Notes: {data.get('notes', 'Not available')}\n"
return result
except requests.exceptions.RequestException as e:
return f"Error accessing KEVin API: {str(e)}"
def _search_by_keyword(self, keyword: str) -> str:
"""Search for vulnerabilities by keyword."""
url = f"{self.base_url}/kev"
params = {"search": keyword}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data:
return f"No vulnerabilities found for keyword '{keyword}'."
context = []
context.append(f"Found {len(data)} vulnerabilities for keyword '{keyword}'.")
context.append("\nVulnerabilities found:")
# Show all vulnerabilities
for i, vuln in enumerate(data):
context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}")
return "\n".join(context)
except requests.exceptions.RequestException as e:
return f"Error accessing KEVin API: {str(e)}"
def _format_context(self, data: dict, search_type: str, identifier: str) -> str:
"""Format the data for context."""
context = []
if search_type == "cve":
if not data:
return f"No vulnerabilities found for CVE {identifier}"
context.append(f"CVE ID: {data.get('cveID', 'Not available')}")
context.append(f"Vendor: {data.get('vendorProject', 'Not available')}")
context.append(f"Product: {data.get('product', 'Not available')}")
context.append(f"Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}")
context.append(f"Date Added: {data.get('dateAdded', 'Not available')}")
context.append(f"Short Description: {data.get('shortDescription', 'Not available')}")
context.append(f"Required Action: {data.get('requiredAction', 'Not available')}")
if data.get('dueDate'):
context.append(f"Due Date: {data['dueDate']}")
if data.get('notes'):
context.append(f"Notes: {data['notes']}")
elif search_type == "keyword":
if not data:
return "No vulnerabilities found for this search."
context.append(f"Found {len(data)} vulnerabilities.")
context.append("\nMost relevant vulnerabilities:")
# Show the 5 most relevant vulnerabilities
for i, vuln in enumerate(data[:5]):
context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}")
return "\n".join(context) |