sonic-cyber-assistant / tools /cve_avd_tool.py
Muhammed Machrouh
Initial files
4531c67
raw
history blame
5.96 kB
import requests
from langchain.tools import tool
import json
from datetime import datetime, timedelta
def get_current_formatted_date():
# Get the current date and time
now = datetime.now()
# Format the date and time in the desired format
formatted_date = now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
return formatted_date
def get_yesterday_formatted_date():
# Get the current date and time
now = datetime.now()
# Calculate yesterday's date and time
yesterday = now - timedelta(days=2)
# Format the date and time in the desired format
formatted_date = yesterday.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
return formatted_date
def format_cve(cve_response):
"""
Takes a dictionary representing the API response and formats each CVE entry into a structured prompt for LLM.
Parameters:
- cve_response: A dictionary representing the API response containing CVE information.
Returns:
- A list of strings, each a formatted prompt for LLM based on the CVE entries in the response.
"""
formatted_prompts = "Use these CVE Search Results to answer user question:\n\n"
if len(cve_response['vulnerabilities']) == 0:
formatted_prompts = "No CVEs found matching the search criteria."
return formatted_prompts
for vulnerability in cve_response['vulnerabilities']:
cve = vulnerability.get('cve', {})
# prompt = f"Explain {cve.get('id', 'N/A')} in simple terms:\n\n"
prompt = f"- CVE ID: {cve.get('id', 'N/A')}\n"
prompt += f"- Status: {cve.get('vulnStatus', 'Unknown')}\n"
descriptions = cve.get('descriptions', [])
description_text = descriptions[0].get('value', 'No description available.') if descriptions else "No description available."
prompt += f"- Description: {description_text}\n"
if 'metrics' in cve and 'cvssMetricV2' in cve['metrics']:
cvss_metrics = cve['metrics']['cvssMetricV2'][0]
prompt += f"- CVSS Score: {cvss_metrics.get('cvssData', {}).get('baseScore', 'Not available')} ({cvss_metrics.get('baseSeverity', 'Unknown')})\n"
else:
prompt += "- CVSS Score: Not available\n"
configurations = cve.get('configurations', {})
for conf in configurations:
nodes = conf.get('nodes', [])
affected_configs = []
for node in nodes:
for cpe_match in node.get('cpeMatch', []):
if cpe_match.get('vulnerable', False):
affected_configs.append(cpe_match.get('criteria', 'Not specified'))
prompt += f"- Affected Configurations: {', '.join(affected_configs) if affected_configs else 'Not specified'}\n"
references = cve.get('references', [])
ref_urls = ', '.join([ref.get('url', 'No URL') for ref in references])
prompt += f"- References: {ref_urls if references else 'No references available.'}\n"
formatted_prompts += prompt+"\n\n"
print(formatted_prompts)
# formatted_prompts += "\nSummarize the vulnerability, its impact, and any known mitigation strategies."
return formatted_prompts
class CVESearchTool():
@tool("CVE search Tool")
def cvesearch(keyword: str, date: str = None):
"""
Searches for CVEs based on a keyword or phrase and returns the results in JSON format.
Parameters:
- keyword (str): A word or phrase to search for in the CVE descriptions.
- date (str): An optional date to include in the search query.
Returns:
- JSON: A list of CVEs matching the keyword search.
"""
if date:
keyword = f"{keyword} {date}"
# Encode the spaces in the keyword(s) as "%20" for the URL
keyword_encoded = keyword.replace(" ", "%20")
# keyword_encoded = keyword_encoded.join(" 2023")
# Construct the URL for the API request
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={keyword_encoded}&resultsPerPage=5"
try:
# Send the request to the NVD API
response = requests.get(url)
# Check if the request was successful
if response.status_code == 200:
# Return the JSON response
return format_cve(response.json())
else:
return {"error": "Failed to fetch data from the NVD API.", "status_code": response.status_code}
except Exception as e:
return {"error": str(e)}
@tool("Fetch last 5 CVEs")
def fetchcve(keyword: str):
"""
Fetches the last 5 CVEs based on a keyword or phrase and returns the results in JSON format.
Use this exclusively to get the latest CVEs, or for todays CVEs.
Parameters:
- keyword (str): A word or phrase to search for in the CVE descriptions.
- date (str): An optional date to include in the search query.
Returns:
- JSON: A list of CVEs matching the keyword search.
"""
# Encode the spaces in the keyword(s) as "%20" for the URL
keyword_encoded = keyword.replace(" ", "%20")
# keyword_encoded = keyword_encoded.join(" 2023")
# Construct the URL for the API request
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0/?pubStartDate={get_yesterday_formatted_date()}&pubEndDate={get_current_formatted_date()}&keywordSearch={keyword_encoded}&resultsPerPage=3"
print(url)
try:
# Send the request to the NVD API
response = requests.get(url)
# Check if the request was successful
if response.status_code == 200:
# Return the JSON response
return format_cve(response.json())
else:
return {"error": "Failed to fetch data from the NVD API.", "status_code": response.status_code}
except Exception as e:
return {"error": str(e)}