"""Tool for searching known exploited vulnerabilities (KEV) using the KEVin API.""" from smolagents import Tool import requests import json class KevinTool(Tool): """ Tool for searching known exploited vulnerabilities (KEV) using the KEVin API. """ name = "kevin_search" description = """Tool for searching known exploited vulnerabilities (KEV) using the KEVin API. This tool allows searching for known exploited vulnerabilities in multiple ways: - By CVE ID: kevin_search(search_type="cve", identifier="CVE-2021-44228") - By keyword: kevin_search(search_type="keyword", identifier="log4j") - Check if CVE is in KEV: kevin_search(search_type="exists", identifier="CVE-2023-22527") - Filter by ransomware: kevin_search(search_type="ransomware") CRITICAL - Product name format for keyword search: - ALWAYS use ONLY the base product name, NEVER include versions - CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx" - INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109" - When searching, strip version numbers and descriptive words - Example: "Apache Tomcat 9.0.65" → search for "tomcat" - Example: "Google Chrome 120.0.6099.109" → search for "chrome" - Example: "Log4j 2.14.1" → search for "log4j" IMPORTANT: The "exists" search type checks if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database. A CVE can exist in NVD or other databases but not be in KEV if it hasn't been actively exploited. Usage examples: - kevin_search(search_type="cve", identifier="CVE-2021-44228") - kevin_search(search_type="keyword", identifier="log4j") - kevin_search(search_type="exists", identifier="CVE-2023-22527") - kevin_search(search_type="ransomware")""" inputs = { "search_type": { "description": "Type of search to perform. Must be 'cve' for specific CVE ID, 'keyword' for keyword search, 'exists' to check if CVE is in KEV database, or 'ransomware' to filter ransomware vulnerabilities.", "type": "string", }, "identifier": { "description": "The CVE ID (e.g., 'CVE-2021-44228') or keyword (e.g., 'log4j'). For keywords, use ONLY base product names without versions (e.g., 'log4j' not 'log4j 2.14.1'). Not required for 'ransomware' search type.", "type": "string", "nullable": True, }, } output_type = "string" def __init__(self): super().__init__() self.base_url = "https://kevin.gtfkd.com" def forward(self, search_type: str, identifier: str = None) -> str: """Search for known exploited vulnerabilities.""" try: if search_type == "cve": if not identifier: return "Error: identifier is required for CVE search" return self._search_by_cve(identifier) elif search_type == "keyword": if not identifier: return "Error: identifier is required for keyword search" return self._search_by_keyword(identifier) elif search_type == "exists": if not identifier: return "Error: identifier is required for exists check" return self._check_exists(identifier) elif search_type == "ransomware": return self._search_ransomware() else: return "Error: search_type must be 'cve', 'keyword', 'exists', or 'ransomware'" except Exception as e: return f"Error searching KEVin: {str(e)}" def _check_exists(self, cve_id: str) -> str: """Check if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database.""" url = f"{self.base_url}/kev/exists" params = {"cve": cve_id} try: response = requests.get(url, params=params) response.raise_for_status() data = response.json() if data: return f"✅ CVE {cve_id} IS listed in the Known Exploited Vulnerabilities (KEV) database - this means it has been actively exploited in the wild." else: return f"❌ CVE {cve_id} is NOT listed in the Known Exploited Vulnerabilities (KEV) database - this means it has not been reported as actively exploited (though it may exist in other vulnerability databases)." except requests.exceptions.RequestException as e: return f"Error checking KEVin existence: {str(e)}" def _search_ransomware(self) -> str: """Search for vulnerabilities known to be used in ransomware.""" url = f"{self.base_url}/kev" params = {"filter": "ransomware"} try: response = requests.get(url, params=params) response.raise_for_status() data = response.json() if not data: return "No ransomware-related vulnerabilities found in KEV database." context = [] context.append(f"Found {len(data)} ransomware-related vulnerabilities in KEV database.") context.append("\nRansomware vulnerabilities:") # Show all found vulnerabilities for i, vuln in enumerate(data): context.append(f"\n--- Vulnerability {i+1} ---") context.append(f"CVE ID: {vuln.get('cveID', 'Not available')}") context.append(f"Vendor: {vuln.get('vendorProject', 'Not available')}") context.append(f"Product: {vuln.get('product', 'Not available')}") context.append(f"Vulnerability Name: {vuln.get('vulnerabilityName', 'Not available')}") # Important dates context.append(f"Date Added: {vuln.get('dateAdded', 'Not available')}") if vuln.get('dueDate'): context.append(f"Due Date: {vuln['dueDate']}") context.append(f"Short Description: {vuln.get('shortDescription', 'Not available')}") context.append(f"Required Action: {vuln.get('requiredAction', 'Not available')}") if vuln.get('notes'): context.append(f"Notes: {vuln['notes']}") context.append("-" * 50) # Separator between vulnerabilities return "\n".join(context) except requests.exceptions.RequestException as e: return f"Error accessing KEVin API: {str(e)}" def _search_by_cve(self, cve_id: str) -> str: """Search for a specific CVE in KEVin.""" url = f"{self.base_url}/kev/{cve_id}" try: response = requests.get(url) response.raise_for_status() data = response.json() if not data: return f"No vulnerabilities found for CVE {cve_id}" result = f"Known exploited vulnerability found for {cve_id}:\n\n" result += f"- CVE ID: {data.get('cveID', 'Not available')}\n" result += f"- Vendor: {data.get('vendorProject', 'Not available')}\n" result += f"- Product: {data.get('product', 'Not available')}\n" result += f"- Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}\n" result += f"- Date Added: {data.get('dateAdded', 'Not available')}\n" result += f"- Short Description: {data.get('shortDescription', 'Not available')}\n" result += f"- Required Action: {data.get('requiredAction', 'Not available')}\n" result += f"- Due Date: {data.get('dueDate', 'Not available')}\n" result += f"- Notes: {data.get('notes', 'Not available')}\n" return result except requests.exceptions.RequestException as e: return f"Error accessing KEVin API: {str(e)}" def _search_by_keyword(self, keyword: str) -> str: """Search for vulnerabilities by keyword.""" url = f"{self.base_url}/kev" params = {"search": keyword} try: response = requests.get(url, params=params) response.raise_for_status() data = response.json() if not data: return f"No vulnerabilities found for keyword '{keyword}'." context = [] context.append(f"Found {len(data)} vulnerabilities for keyword '{keyword}'.") context.append("\nVulnerabilities found:") # Show all vulnerabilities for i, vuln in enumerate(data): context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}") return "\n".join(context) except requests.exceptions.RequestException as e: return f"Error accessing KEVin API: {str(e)}" def _format_context(self, data: dict, search_type: str, identifier: str) -> str: """Format the data for context.""" context = [] if search_type == "cve": if not data: return f"No vulnerabilities found for CVE {identifier}" context.append(f"CVE ID: {data.get('cveID', 'Not available')}") context.append(f"Vendor: {data.get('vendorProject', 'Not available')}") context.append(f"Product: {data.get('product', 'Not available')}") context.append(f"Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}") context.append(f"Date Added: {data.get('dateAdded', 'Not available')}") context.append(f"Short Description: {data.get('shortDescription', 'Not available')}") context.append(f"Required Action: {data.get('requiredAction', 'Not available')}") if data.get('dueDate'): context.append(f"Due Date: {data['dueDate']}") if data.get('notes'): context.append(f"Notes: {data['notes']}") elif search_type == "keyword": if not data: return "No vulnerabilities found for this search." context.append(f"Found {len(data)} vulnerabilities.") context.append("\nMost relevant vulnerabilities:") # Show the 5 most relevant vulnerabilities for i, vuln in enumerate(data[:5]): context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}") return "\n".join(context)