File size: 10,848 Bytes
52b089b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Tool for searching known exploited vulnerabilities (KEV) using the KEVin API."""

from smolagents import Tool
import requests
import json

class KevinTool(Tool):
    """
    Tool for searching known exploited vulnerabilities (KEV) using the KEVin API.
    """
    
    name = "kevin_search"
    description = """Tool for searching known exploited vulnerabilities (KEV) using the KEVin API.
    
    This tool allows searching for known exploited vulnerabilities in multiple ways:
    - By CVE ID: kevin_search(search_type="cve", identifier="CVE-2021-44228")
    - By keyword: kevin_search(search_type="keyword", identifier="log4j")
    - Check if CVE is in KEV: kevin_search(search_type="exists", identifier="CVE-2023-22527")
    - Filter by ransomware: kevin_search(search_type="ransomware")
    
    CRITICAL - Product name format for keyword search:
    - ALWAYS use ONLY the base product name, NEVER include versions
    - CORRECT examples: "log4j", "apache", "microsoft", "chrome", "tomcat", "nginx"
    - INCORRECT examples: "log4j 2.14.1", "apache http server", "microsoft windows", "chrome 120.0.6099.109"
    - When searching, strip version numbers and descriptive words
    - Example: "Apache Tomcat 9.0.65" → search for "tomcat"
    - Example: "Google Chrome 120.0.6099.109" → search for "chrome"
    - Example: "Log4j 2.14.1" → search for "log4j"
    
    IMPORTANT: The "exists" search type checks if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database.
    A CVE can exist in NVD or other databases but not be in KEV if it hasn't been actively exploited.
    
    Usage examples:
    - kevin_search(search_type="cve", identifier="CVE-2021-44228")
    - kevin_search(search_type="keyword", identifier="log4j")
    - kevin_search(search_type="exists", identifier="CVE-2023-22527")
    - kevin_search(search_type="ransomware")"""
    
    inputs = {
        "search_type": {
            "description": "Type of search to perform. Must be 'cve' for specific CVE ID, 'keyword' for keyword search, 'exists' to check if CVE is in KEV database, or 'ransomware' to filter ransomware vulnerabilities.",
            "type": "string",
        },
        "identifier": {
            "description": "The CVE ID (e.g., 'CVE-2021-44228') or keyword (e.g., 'log4j'). For keywords, use ONLY base product names without versions (e.g., 'log4j' not 'log4j 2.14.1'). Not required for 'ransomware' search type.",
            "type": "string",
            "nullable": True,
        },
    }
    
    output_type = "string"
    
    def __init__(self):
        super().__init__()
        self.base_url = "https://kevin.gtfkd.com"

    def forward(self, search_type: str, identifier: str = None) -> str:
        """Search for known exploited vulnerabilities."""
        try:
            if search_type == "cve":
                if not identifier:
                    return "Error: identifier is required for CVE search"
                return self._search_by_cve(identifier)
            elif search_type == "keyword":
                if not identifier:
                    return "Error: identifier is required for keyword search"
                return self._search_by_keyword(identifier)
            elif search_type == "exists":
                if not identifier:
                    return "Error: identifier is required for exists check"
                return self._check_exists(identifier)
            elif search_type == "ransomware":
                return self._search_ransomware()
            else:
                return "Error: search_type must be 'cve', 'keyword', 'exists', or 'ransomware'"
                
        except Exception as e:
            return f"Error searching KEVin: {str(e)}"

    def _check_exists(self, cve_id: str) -> str:
        """Check if a CVE is listed in the Known Exploited Vulnerabilities (KEV) database."""
        url = f"{self.base_url}/kev/exists"
        params = {"cve": cve_id}
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if data:
                return f"✅ CVE {cve_id} IS listed in the Known Exploited Vulnerabilities (KEV) database - this means it has been actively exploited in the wild."
            else:
                return f"❌ CVE {cve_id} is NOT listed in the Known Exploited Vulnerabilities (KEV) database - this means it has not been reported as actively exploited (though it may exist in other vulnerability databases)."
            
        except requests.exceptions.RequestException as e:
            return f"Error checking KEVin existence: {str(e)}"

    def _search_ransomware(self) -> str:
        """Search for vulnerabilities known to be used in ransomware."""
        url = f"{self.base_url}/kev"
        params = {"filter": "ransomware"}
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if not data:
                return "No ransomware-related vulnerabilities found in KEV database."
            
            context = []
            context.append(f"Found {len(data)} ransomware-related vulnerabilities in KEV database.")
            context.append("\nRansomware vulnerabilities:")
            
            # Show all found vulnerabilities
            for i, vuln in enumerate(data):
                context.append(f"\n--- Vulnerability {i+1} ---")
                context.append(f"CVE ID: {vuln.get('cveID', 'Not available')}")
                context.append(f"Vendor: {vuln.get('vendorProject', 'Not available')}")
                context.append(f"Product: {vuln.get('product', 'Not available')}")
                context.append(f"Vulnerability Name: {vuln.get('vulnerabilityName', 'Not available')}")
                
                # Important dates
                context.append(f"Date Added: {vuln.get('dateAdded', 'Not available')}")
                if vuln.get('dueDate'):
                    context.append(f"Due Date: {vuln['dueDate']}")
                
                context.append(f"Short Description: {vuln.get('shortDescription', 'Not available')}")
                context.append(f"Required Action: {vuln.get('requiredAction', 'Not available')}")
                
                if vuln.get('notes'):
                    context.append(f"Notes: {vuln['notes']}")
                
                context.append("-" * 50)  # Separator between vulnerabilities
            
            return "\n".join(context)
            
        except requests.exceptions.RequestException as e:
            return f"Error accessing KEVin API: {str(e)}"

    def _search_by_cve(self, cve_id: str) -> str:
        """Search for a specific CVE in KEVin."""
        url = f"{self.base_url}/kev/{cve_id}"
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()
            
            if not data:
                return f"No vulnerabilities found for CVE {cve_id}"
            
            result = f"Known exploited vulnerability found for {cve_id}:\n\n"
            result += f"- CVE ID: {data.get('cveID', 'Not available')}\n"
            result += f"- Vendor: {data.get('vendorProject', 'Not available')}\n"
            result += f"- Product: {data.get('product', 'Not available')}\n"
            result += f"- Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}\n"
            result += f"- Date Added: {data.get('dateAdded', 'Not available')}\n"
            result += f"- Short Description: {data.get('shortDescription', 'Not available')}\n"
            result += f"- Required Action: {data.get('requiredAction', 'Not available')}\n"
            result += f"- Due Date: {data.get('dueDate', 'Not available')}\n"
            result += f"- Notes: {data.get('notes', 'Not available')}\n"
            
            return result
            
        except requests.exceptions.RequestException as e:
            return f"Error accessing KEVin API: {str(e)}"

    def _search_by_keyword(self, keyword: str) -> str:
        """Search for vulnerabilities by keyword."""
        url = f"{self.base_url}/kev"
        params = {"search": keyword}
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if not data:
                return f"No vulnerabilities found for keyword '{keyword}'."
            
            context = []
            context.append(f"Found {len(data)} vulnerabilities for keyword '{keyword}'.")
            context.append("\nVulnerabilities found:")
            
            # Show all vulnerabilities
            for i, vuln in enumerate(data):
                context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}")
            
            return "\n".join(context)
            
        except requests.exceptions.RequestException as e:
            return f"Error accessing KEVin API: {str(e)}"

    def _format_context(self, data: dict, search_type: str, identifier: str) -> str:
        """Format the data for context."""
        context = []
        
        if search_type == "cve":
            if not data:
                return f"No vulnerabilities found for CVE {identifier}"
            
            context.append(f"CVE ID: {data.get('cveID', 'Not available')}")
            context.append(f"Vendor: {data.get('vendorProject', 'Not available')}")
            context.append(f"Product: {data.get('product', 'Not available')}")
            context.append(f"Vulnerability Name: {data.get('vulnerabilityName', 'Not available')}")
            context.append(f"Date Added: {data.get('dateAdded', 'Not available')}")
            context.append(f"Short Description: {data.get('shortDescription', 'Not available')}")
            context.append(f"Required Action: {data.get('requiredAction', 'Not available')}")
            
            if data.get('dueDate'):
                context.append(f"Due Date: {data['dueDate']}")
            
            if data.get('notes'):
                context.append(f"Notes: {data['notes']}")
        
        elif search_type == "keyword":
            if not data:
                return "No vulnerabilities found for this search."
            
            context.append(f"Found {len(data)} vulnerabilities.")
            context.append("\nMost relevant vulnerabilities:")
            
            # Show the 5 most relevant vulnerabilities
            for i, vuln in enumerate(data[:5]):
                context.append(f"{i+1}. {vuln.get('cveID', 'Unknown CVE')} - {vuln.get('product', 'Unknown Product')}")
        
        return "\n".join(context)