|
|
|
|
|
""" |
|
|
CVE Details MCP Server with Gradio Interface |
|
|
A Model Context Protocol server that provides CVE vulnerability details through a web interface. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
from typing import Any, Dict, Optional |
|
|
from urllib.parse import quote |
|
|
import aiohttp |
|
|
import gradio as gr |
|
|
import ssl |
|
|
import certifi |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CVEDataFetcher: |
|
|
def __init__(self): |
|
|
self.session: Optional[aiohttp.ClientSession] = None |
|
|
|
|
|
async def __aenter__(self): |
|
|
ssl_context = ssl.create_default_context(cafile=certifi.where()) |
|
|
connector = aiohttp.TCPConnector(ssl=ssl_context) |
|
|
self.session = aiohttp.ClientSession(connector=connector) |
|
|
return self |
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb): |
|
|
if self.session: |
|
|
await self.session.close() |
|
|
|
|
|
async def get_cve_details(self, cve_id: str) -> Dict[str, Any]: |
|
|
"""Fetch detailed CVE information from NVD API.""" |
|
|
if not cve_id or not cve_id.upper().startswith('CVE-'): |
|
|
raise ValueError("Invalid CVE ID format. Expected format: CVE-YYYY-NNNN") |
|
|
|
|
|
cve_id = cve_id.upper() |
|
|
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}" |
|
|
|
|
|
async with self.session.get(url) as response: |
|
|
if response.status != 200: |
|
|
raise Exception(f"Failed to fetch CVE data: HTTP {response.status}") |
|
|
|
|
|
data = await response.json() |
|
|
|
|
|
if not data.get('vulnerabilities'): |
|
|
raise Exception(f"CVE {cve_id} not found") |
|
|
|
|
|
vuln = data['vulnerabilities'][0]['cve'] |
|
|
|
|
|
|
|
|
details = { |
|
|
'id': vuln['id'], |
|
|
'sourceIdentifier': vuln.get('sourceIdentifier', 'N/A'), |
|
|
'published': vuln.get('published', 'N/A'), |
|
|
'lastModified': vuln.get('lastModified', 'N/A'), |
|
|
'vulnStatus': vuln.get('vulnStatus', 'N/A'), |
|
|
'descriptions': [], |
|
|
'cvss_scores': {}, |
|
|
'weaknesses': [], |
|
|
'configurations': [], |
|
|
'references': [], |
|
|
'vendor_comments': [] |
|
|
} |
|
|
|
|
|
|
|
|
for desc in vuln.get('descriptions', []): |
|
|
if desc['lang'] == 'en': |
|
|
details['descriptions'].append(desc['value']) |
|
|
|
|
|
|
|
|
metrics = vuln.get('metrics', {}) |
|
|
if 'cvssMetricV31' in metrics: |
|
|
cvss31 = metrics['cvssMetricV31'][0]['cvssData'] |
|
|
details['cvss_scores']['v3.1'] = { |
|
|
'baseScore': cvss31.get('baseScore'), |
|
|
'baseSeverity': cvss31.get('baseSeverity'), |
|
|
'vectorString': cvss31.get('vectorString'), |
|
|
'attackVector': cvss31.get('attackVector'), |
|
|
'attackComplexity': cvss31.get('attackComplexity'), |
|
|
'privilegesRequired': cvss31.get('privilegesRequired'), |
|
|
'userInteraction': cvss31.get('userInteraction'), |
|
|
'scope': cvss31.get('scope'), |
|
|
'confidentialityImpact': cvss31.get('confidentialityImpact'), |
|
|
'integrityImpact': cvss31.get('integrityImpact'), |
|
|
'availabilityImpact': cvss31.get('availabilityImpact') |
|
|
} |
|
|
|
|
|
if 'cvssMetricV2' in metrics: |
|
|
cvss2 = metrics['cvssMetricV2'][0]['cvssData'] |
|
|
details['cvss_scores']['v2.0'] = { |
|
|
'baseScore': cvss2.get('baseScore'), |
|
|
'vectorString': cvss2.get('vectorString'), |
|
|
'accessVector': cvss2.get('accessVector'), |
|
|
'accessComplexity': cvss2.get('accessComplexity'), |
|
|
'authentication': cvss2.get('authentication'), |
|
|
'confidentialityImpact': cvss2.get('confidentialityImpact'), |
|
|
'integrityImpact': cvss2.get('integrityImpact'), |
|
|
'availabilityImpact': cvss2.get('availabilityImpact') |
|
|
} |
|
|
|
|
|
|
|
|
for weakness in vuln.get('weaknesses', []): |
|
|
for desc in weakness.get('description', []): |
|
|
if desc['lang'] == 'en': |
|
|
details['weaknesses'].append({ |
|
|
'type': weakness.get('type'), |
|
|
'cwe_id': desc.get('value'), |
|
|
'description': desc.get('value') |
|
|
}) |
|
|
|
|
|
|
|
|
for config in vuln.get('configurations', []): |
|
|
for node in config.get('nodes', []): |
|
|
for cpe_match in node.get('cpeMatch', []): |
|
|
details['configurations'].append({ |
|
|
'criteria': cpe_match.get('criteria'), |
|
|
'vulnerable': cpe_match.get('vulnerable'), |
|
|
'versionStartIncluding': cpe_match.get('versionStartIncluding'), |
|
|
'versionEndExcluding': cpe_match.get('versionEndExcluding') |
|
|
}) |
|
|
|
|
|
|
|
|
for ref in vuln.get('references', []): |
|
|
details['references'].append({ |
|
|
'url': ref.get('url'), |
|
|
'source': ref.get('source'), |
|
|
'tags': ref.get('tags', []) |
|
|
}) |
|
|
|
|
|
|
|
|
for comment in vuln.get('vendorComments', []): |
|
|
details['vendor_comments'].append({ |
|
|
'organization': comment.get('organization'), |
|
|
'comment': comment.get('comment'), |
|
|
'lastModified': comment.get('lastModified') |
|
|
}) |
|
|
|
|
|
return details |
|
|
|
|
|
async def search_cves(self, keyword: str, limit: int = 10) -> Dict[str, Any]: |
|
|
"""Search for CVEs by keyword.""" |
|
|
if not keyword: |
|
|
raise ValueError("Keyword is required for search") |
|
|
|
|
|
|
|
|
encoded_keyword = quote(keyword) |
|
|
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={encoded_keyword}&resultsPerPage={limit}" |
|
|
|
|
|
async with self.session.get(url) as response: |
|
|
if response.status != 200: |
|
|
raise Exception(f"Failed to search CVEs: HTTP {response.status}") |
|
|
|
|
|
data = await response.json() |
|
|
|
|
|
results = { |
|
|
'total_results': data.get('totalResults', 0), |
|
|
'results_per_page': data.get('resultsPerPage', 0), |
|
|
'start_index': data.get('startIndex', 0), |
|
|
'cves': [] |
|
|
} |
|
|
|
|
|
for vuln_data in data.get('vulnerabilities', []): |
|
|
vuln = vuln_data['cve'] |
|
|
cve_summary = { |
|
|
'id': vuln['id'], |
|
|
'published': vuln.get('published', 'N/A'), |
|
|
'lastModified': vuln.get('lastModified', 'N/A'), |
|
|
'vulnStatus': vuln.get('vulnStatus', 'N/A'), |
|
|
'description': '', |
|
|
'cvss_score': None, |
|
|
'severity': None |
|
|
} |
|
|
|
|
|
|
|
|
for desc in vuln.get('descriptions', []): |
|
|
if desc['lang'] == 'en': |
|
|
cve_summary['description'] = desc['value'][:200] + '...' if len(desc['value']) > 200 else desc['value'] |
|
|
break |
|
|
|
|
|
|
|
|
metrics = vuln.get('metrics', {}) |
|
|
if 'cvssMetricV31' in metrics: |
|
|
cvss_data = metrics['cvssMetricV31'][0]['cvssData'] |
|
|
cve_summary['cvss_score'] = cvss_data.get('baseScore') |
|
|
cve_summary['severity'] = cvss_data.get('baseSeverity') |
|
|
elif 'cvssMetricV2' in metrics: |
|
|
cvss_data = metrics['cvssMetricV2'][0]['cvssData'] |
|
|
cve_summary['cvss_score'] = cvss_data.get('baseScore') |
|
|
|
|
|
results['cves'].append(cve_summary) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
cve_fetcher = CVEDataFetcher() |
|
|
|
|
|
def get_cve_details_sync(cve_id: str) -> str: |
|
|
"""Synchronous wrapper for getting CVE details.""" |
|
|
return asyncio.run(get_cve_details_gradio(cve_id)) |
|
|
|
|
|
def search_cves_sync(keyword: str, limit: int) -> str: |
|
|
"""Synchronous wrapper for searching CVEs.""" |
|
|
return asyncio.run(search_cves_gradio(keyword, limit)) |
|
|
|
|
|
async def get_cve_details_gradio(cve_id: str) -> str: |
|
|
"""Gradio interface function for getting CVE details.""" |
|
|
|
|
|
if not cve_id.strip(): |
|
|
return "Please enter a CVE ID (e.g., CVE-2023-1234)" |
|
|
|
|
|
try: |
|
|
async with CVEDataFetcher() as fetcher: |
|
|
details = await fetcher.get_cve_details(cve_id.strip()) |
|
|
|
|
|
|
|
|
output = f"# {details['id']} Details\n\n" |
|
|
output += f"**Status:** {details['vulnStatus']}\n" |
|
|
output += f"**Published:** {details['published']}\n" |
|
|
output += f"**Last Modified:** {details['lastModified']}\n" |
|
|
output += f"**Source:** {details['sourceIdentifier']}\n\n" |
|
|
|
|
|
|
|
|
if details['descriptions']: |
|
|
output += "## Description\n" |
|
|
for desc in details['descriptions']: |
|
|
output += f"{desc}\n\n" |
|
|
|
|
|
|
|
|
if details['cvss_scores']: |
|
|
output += "## CVSS Scores\n" |
|
|
for version, scores in details['cvss_scores'].items(): |
|
|
output += f"### CVSS {version}\n" |
|
|
if 'baseScore' in scores: |
|
|
output += f"**Base Score:** {scores['baseScore']}\n" |
|
|
if 'baseSeverity' in scores: |
|
|
output += f"**Severity:** {scores['baseSeverity']}\n" |
|
|
if 'vectorString' in scores: |
|
|
output += f"**Vector:** {scores['vectorString']}\n" |
|
|
output += "\n" |
|
|
|
|
|
|
|
|
if details['weaknesses']: |
|
|
output += "## Weaknesses (CWE)\n" |
|
|
for weakness in details['weaknesses']: |
|
|
output += f"- **{weakness['cwe_id']}:** {weakness['description']}\n" |
|
|
output += "\n" |
|
|
|
|
|
|
|
|
if details['configurations']: |
|
|
output += "## Affected Configurations\n" |
|
|
for config in details['configurations'][:10]: |
|
|
output += f"- {config['criteria']}\n" |
|
|
if config.get('versionStartIncluding'): |
|
|
output += f" - From version: {config['versionStartIncluding']}\n" |
|
|
if config.get('versionEndExcluding'): |
|
|
output += f" - Before version: {config['versionEndExcluding']}\n" |
|
|
output += "\n" |
|
|
|
|
|
|
|
|
if details['references']: |
|
|
output += "## References\n" |
|
|
for ref in details['references'][:10]: |
|
|
output += f"- [{ref['source']}]({ref['url']})\n" |
|
|
if ref['tags']: |
|
|
output += f" - Tags: {', '.join(ref['tags'])}\n" |
|
|
output += "\n" |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
async def search_cves_gradio(keyword: str, limit: int) -> str: |
|
|
"""Gradio interface function for searching CVEs.""" |
|
|
if not keyword.strip(): |
|
|
return "Please enter a search keyword" |
|
|
|
|
|
try: |
|
|
async with CVEDataFetcher() as fetcher: |
|
|
results = await fetcher.search_cves(keyword.strip(), limit) |
|
|
|
|
|
output = f"# CVE Search Results for '{keyword}'\n\n" |
|
|
output += f"**Total Results:** {results['total_results']}\n" |
|
|
output += f"**Showing:** {len(results['cves'])} results\n\n" |
|
|
|
|
|
for cve in results['cves']: |
|
|
output += f"## {cve['id']}\n" |
|
|
output += f"**Published:** {cve['published']}\n" |
|
|
output += f"**Status:** {cve['vulnStatus']}\n" |
|
|
if cve['cvss_score']: |
|
|
output += f"**CVSS Score:** {cve['cvss_score']}" |
|
|
if cve['severity']: |
|
|
output += f" ({cve['severity']})" |
|
|
output += "\n" |
|
|
output += f"**Description:** {cve['description']}\n\n" |
|
|
output += "---\n\n" |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def create_gradio_interface(): |
|
|
"""Create the Gradio web interface.""" |
|
|
with gr.Blocks(title="CVE Details MCP Server", theme=gr.themes.Soft()) as app: |
|
|
gr.Markdown("# π CVE Details MCP Server") |
|
|
gr.Markdown("Get comprehensive vulnerability details from the National Vulnerability Database") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("CVE Details"): |
|
|
gr.Markdown("Enter a CVE ID to get detailed vulnerability information") |
|
|
|
|
|
with gr.Row(): |
|
|
cve_input = gr.Textbox( |
|
|
label="CVE ID", |
|
|
placeholder="e.g., CVE-2023-1234", |
|
|
value="CVE-2023-44487" |
|
|
) |
|
|
get_details_btn = gr.Button("Get Details", variant="primary") |
|
|
|
|
|
cve_output = gr.Markdown(label="CVE Details") |
|
|
|
|
|
get_details_btn.click( |
|
|
fn=get_cve_details_sync, |
|
|
inputs=[cve_input], |
|
|
outputs=[cve_output] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("CVE Search"): |
|
|
gr.Markdown("Search for CVEs by keyword, vendor, or product name") |
|
|
|
|
|
with gr.Row(): |
|
|
search_input = gr.Textbox( |
|
|
label="Search Keyword", |
|
|
placeholder="e.g., apache, wordpress, buffer overflow" |
|
|
) |
|
|
limit_input = gr.Slider( |
|
|
label="Max Results", |
|
|
minimum=1, |
|
|
maximum=50, |
|
|
value=10, |
|
|
step=1 |
|
|
) |
|
|
|
|
|
search_btn = gr.Button("Search CVEs", variant="primary") |
|
|
search_output = gr.Markdown(label="Search Results") |
|
|
|
|
|
search_btn.click( |
|
|
fn=search_cves_sync, |
|
|
inputs=[search_input, limit_input], |
|
|
outputs=[search_output] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("MCP Server Info"): |
|
|
gr.Markdown(""" |
|
|
## Model Context Protocol (MCP) Server |
|
|
|
|
|
This server implements the MCP specification and provides two main tools: |
|
|
|
|
|
### Available Tools: |
|
|
1. **get_cve_details**: Get comprehensive details for a specific CVE |
|
|
2. **search_cves**: Search for CVEs by keyword |
|
|
|
|
|
### Usage as MCP Server: |
|
|
This interface automatically runs as an MCP server when launched. |
|
|
|
|
|
### Features: |
|
|
- Real-time CVE data from NIST NVD |
|
|
- CVSS v2.0 and v3.1 scoring |
|
|
- CWE weakness classification |
|
|
- Affected product configurations |
|
|
- Reference links and vendor comments |
|
|
- Comprehensive search functionality |
|
|
|
|
|
### Data Source: |
|
|
[NIST National Vulnerability Database (NVD)](https://nvd.nist.gov/) |
|
|
""") |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
cve_details_demo = gr.Interface( |
|
|
fn=get_cve_details_sync, |
|
|
inputs=gr.Textbox(label="CVE ID", placeholder="e.g., CVE-2023-1234"), |
|
|
outputs=gr.Markdown(label="CVE Details"), |
|
|
title="CVE Details Lookup", |
|
|
description="Enter a CVE ID to get comprehensive vulnerability information from the National Vulnerability Database." |
|
|
) |
|
|
|
|
|
|
|
|
cve_search_demo = gr.Interface( |
|
|
fn=search_cves_sync, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Search Keyword", placeholder="e.g., apache, wordpress, buffer overflow"), |
|
|
gr.Slider(label="Max Results", minimum=1, maximum=50, value=10, step=1) |
|
|
], |
|
|
outputs=gr.Markdown(label="Search Results"), |
|
|
title="CVE Search", |
|
|
description="Search for CVEs by keyword, vendor, or product name." |
|
|
) |
|
|
|
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
[cve_details_demo, cve_search_demo], |
|
|
["CVE Details", "CVE Search"], |
|
|
title="π CVE Details MCP Server" |
|
|
) |
|
|
|
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
debug=False, |
|
|
mcp_server=True |
|
|
) |
|
|
|