suryanshp1's picture
Upload app.py
edea0e8 verified
#!/usr/bin/env python3
"""
CVE Details MCP Server with Gradio Interface
A Model Context Protocol server that provides CVE vulnerability details through a web interface.
"""
import asyncio
import logging
from typing import Any, Dict, Optional
from urllib.parse import quote
import aiohttp
import gradio as gr
import ssl
import certifi
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CVEDataFetcher:
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
self.session = aiohttp.ClientSession(connector=connector)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get_cve_details(self, cve_id: str) -> Dict[str, Any]:
"""Fetch detailed CVE information from NVD API."""
if not cve_id or not cve_id.upper().startswith('CVE-'):
raise ValueError("Invalid CVE ID format. Expected format: CVE-YYYY-NNNN")
cve_id = cve_id.upper()
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
async with self.session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to fetch CVE data: HTTP {response.status}")
data = await response.json()
if not data.get('vulnerabilities'):
raise Exception(f"CVE {cve_id} not found")
vuln = data['vulnerabilities'][0]['cve']
# Extract and format the vulnerability details
details = {
'id': vuln['id'],
'sourceIdentifier': vuln.get('sourceIdentifier', 'N/A'),
'published': vuln.get('published', 'N/A'),
'lastModified': vuln.get('lastModified', 'N/A'),
'vulnStatus': vuln.get('vulnStatus', 'N/A'),
'descriptions': [],
'cvss_scores': {},
'weaknesses': [],
'configurations': [],
'references': [],
'vendor_comments': []
}
# Descriptions
for desc in vuln.get('descriptions', []):
if desc['lang'] == 'en':
details['descriptions'].append(desc['value'])
# CVSS Scores
metrics = vuln.get('metrics', {})
if 'cvssMetricV31' in metrics:
cvss31 = metrics['cvssMetricV31'][0]['cvssData']
details['cvss_scores']['v3.1'] = {
'baseScore': cvss31.get('baseScore'),
'baseSeverity': cvss31.get('baseSeverity'),
'vectorString': cvss31.get('vectorString'),
'attackVector': cvss31.get('attackVector'),
'attackComplexity': cvss31.get('attackComplexity'),
'privilegesRequired': cvss31.get('privilegesRequired'),
'userInteraction': cvss31.get('userInteraction'),
'scope': cvss31.get('scope'),
'confidentialityImpact': cvss31.get('confidentialityImpact'),
'integrityImpact': cvss31.get('integrityImpact'),
'availabilityImpact': cvss31.get('availabilityImpact')
}
if 'cvssMetricV2' in metrics:
cvss2 = metrics['cvssMetricV2'][0]['cvssData']
details['cvss_scores']['v2.0'] = {
'baseScore': cvss2.get('baseScore'),
'vectorString': cvss2.get('vectorString'),
'accessVector': cvss2.get('accessVector'),
'accessComplexity': cvss2.get('accessComplexity'),
'authentication': cvss2.get('authentication'),
'confidentialityImpact': cvss2.get('confidentialityImpact'),
'integrityImpact': cvss2.get('integrityImpact'),
'availabilityImpact': cvss2.get('availabilityImpact')
}
# Weaknesses (CWE)
for weakness in vuln.get('weaknesses', []):
for desc in weakness.get('description', []):
if desc['lang'] == 'en':
details['weaknesses'].append({
'type': weakness.get('type'),
'cwe_id': desc.get('value'),
'description': desc.get('value')
})
# Configurations (affected systems)
for config in vuln.get('configurations', []):
for node in config.get('nodes', []):
for cpe_match in node.get('cpeMatch', []):
details['configurations'].append({
'criteria': cpe_match.get('criteria'),
'vulnerable': cpe_match.get('vulnerable'),
'versionStartIncluding': cpe_match.get('versionStartIncluding'),
'versionEndExcluding': cpe_match.get('versionEndExcluding')
})
# References
for ref in vuln.get('references', []):
details['references'].append({
'url': ref.get('url'),
'source': ref.get('source'),
'tags': ref.get('tags', [])
})
# Vendor comments
for comment in vuln.get('vendorComments', []):
details['vendor_comments'].append({
'organization': comment.get('organization'),
'comment': comment.get('comment'),
'lastModified': comment.get('lastModified')
})
return details
async def search_cves(self, keyword: str, limit: int = 10) -> Dict[str, Any]:
"""Search for CVEs by keyword."""
if not keyword:
raise ValueError("Keyword is required for search")
# URL encode the keyword
encoded_keyword = quote(keyword)
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={encoded_keyword}&resultsPerPage={limit}"
async with self.session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to search CVEs: HTTP {response.status}")
data = await response.json()
results = {
'total_results': data.get('totalResults', 0),
'results_per_page': data.get('resultsPerPage', 0),
'start_index': data.get('startIndex', 0),
'cves': []
}
for vuln_data in data.get('vulnerabilities', []):
vuln = vuln_data['cve']
cve_summary = {
'id': vuln['id'],
'published': vuln.get('published', 'N/A'),
'lastModified': vuln.get('lastModified', 'N/A'),
'vulnStatus': vuln.get('vulnStatus', 'N/A'),
'description': '',
'cvss_score': None,
'severity': None
}
# Get first English description
for desc in vuln.get('descriptions', []):
if desc['lang'] == 'en':
cve_summary['description'] = desc['value'][:200] + '...' if len(desc['value']) > 200 else desc['value']
break
# Get CVSS score
metrics = vuln.get('metrics', {})
if 'cvssMetricV31' in metrics:
cvss_data = metrics['cvssMetricV31'][0]['cvssData']
cve_summary['cvss_score'] = cvss_data.get('baseScore')
cve_summary['severity'] = cvss_data.get('baseSeverity')
elif 'cvssMetricV2' in metrics:
cvss_data = metrics['cvssMetricV2'][0]['cvssData']
cve_summary['cvss_score'] = cvss_data.get('baseScore')
results['cves'].append(cve_summary)
return results
# Global server instance
cve_fetcher = CVEDataFetcher()
def get_cve_details_sync(cve_id: str) -> str:
"""Synchronous wrapper for getting CVE details."""
return asyncio.run(get_cve_details_gradio(cve_id))
def search_cves_sync(keyword: str, limit: int) -> str:
"""Synchronous wrapper for searching CVEs."""
return asyncio.run(search_cves_gradio(keyword, limit))
async def get_cve_details_gradio(cve_id: str) -> str:
"""Gradio interface function for getting CVE details."""
if not cve_id.strip():
return "Please enter a CVE ID (e.g., CVE-2023-1234)"
try:
async with CVEDataFetcher() as fetcher:
details = await fetcher.get_cve_details(cve_id.strip())
# Format the output for better readability
output = f"# {details['id']} Details\n\n"
output += f"**Status:** {details['vulnStatus']}\n"
output += f"**Published:** {details['published']}\n"
output += f"**Last Modified:** {details['lastModified']}\n"
output += f"**Source:** {details['sourceIdentifier']}\n\n"
# Description
if details['descriptions']:
output += "## Description\n"
for desc in details['descriptions']:
output += f"{desc}\n\n"
# CVSS Scores
if details['cvss_scores']:
output += "## CVSS Scores\n"
for version, scores in details['cvss_scores'].items():
output += f"### CVSS {version}\n"
if 'baseScore' in scores:
output += f"**Base Score:** {scores['baseScore']}\n"
if 'baseSeverity' in scores:
output += f"**Severity:** {scores['baseSeverity']}\n"
if 'vectorString' in scores:
output += f"**Vector:** {scores['vectorString']}\n"
output += "\n"
# Weaknesses
if details['weaknesses']:
output += "## Weaknesses (CWE)\n"
for weakness in details['weaknesses']:
output += f"- **{weakness['cwe_id']}:** {weakness['description']}\n"
output += "\n"
# Affected Configurations
if details['configurations']:
output += "## Affected Configurations\n"
for config in details['configurations'][:10]: # Limit to first 10
output += f"- {config['criteria']}\n"
if config.get('versionStartIncluding'):
output += f" - From version: {config['versionStartIncluding']}\n"
if config.get('versionEndExcluding'):
output += f" - Before version: {config['versionEndExcluding']}\n"
output += "\n"
# References
if details['references']:
output += "## References\n"
for ref in details['references'][:10]: # Limit to first 10
output += f"- [{ref['source']}]({ref['url']})\n"
if ref['tags']:
output += f" - Tags: {', '.join(ref['tags'])}\n"
output += "\n"
return output
except Exception as e:
return f"Error: {str(e)}"
async def search_cves_gradio(keyword: str, limit: int) -> str:
"""Gradio interface function for searching CVEs."""
if not keyword.strip():
return "Please enter a search keyword"
try:
async with CVEDataFetcher() as fetcher:
results = await fetcher.search_cves(keyword.strip(), limit)
output = f"# CVE Search Results for '{keyword}'\n\n"
output += f"**Total Results:** {results['total_results']}\n"
output += f"**Showing:** {len(results['cves'])} results\n\n"
for cve in results['cves']:
output += f"## {cve['id']}\n"
output += f"**Published:** {cve['published']}\n"
output += f"**Status:** {cve['vulnStatus']}\n"
if cve['cvss_score']:
output += f"**CVSS Score:** {cve['cvss_score']}"
if cve['severity']:
output += f" ({cve['severity']})"
output += "\n"
output += f"**Description:** {cve['description']}\n\n"
output += "---\n\n"
return output
except Exception as e:
return f"Error: {str(e)}"
def create_gradio_interface():
"""Create the Gradio web interface."""
with gr.Blocks(title="CVE Details MCP Server", theme=gr.themes.Soft()) as app:
gr.Markdown("# πŸ”’ CVE Details MCP Server")
gr.Markdown("Get comprehensive vulnerability details from the National Vulnerability Database")
with gr.Tabs():
# CVE Details Tab
with gr.Tab("CVE Details"):
gr.Markdown("Enter a CVE ID to get detailed vulnerability information")
with gr.Row():
cve_input = gr.Textbox(
label="CVE ID",
placeholder="e.g., CVE-2023-1234",
value="CVE-2023-44487"
)
get_details_btn = gr.Button("Get Details", variant="primary")
cve_output = gr.Markdown(label="CVE Details")
get_details_btn.click(
fn=get_cve_details_sync,
inputs=[cve_input],
outputs=[cve_output]
)
# CVE Search Tab
with gr.Tab("CVE Search"):
gr.Markdown("Search for CVEs by keyword, vendor, or product name")
with gr.Row():
search_input = gr.Textbox(
label="Search Keyword",
placeholder="e.g., apache, wordpress, buffer overflow"
)
limit_input = gr.Slider(
label="Max Results",
minimum=1,
maximum=50,
value=10,
step=1
)
search_btn = gr.Button("Search CVEs", variant="primary")
search_output = gr.Markdown(label="Search Results")
search_btn.click(
fn=search_cves_sync,
inputs=[search_input, limit_input],
outputs=[search_output]
)
# MCP Info Tab
with gr.Tab("MCP Server Info"):
gr.Markdown("""
## Model Context Protocol (MCP) Server
This server implements the MCP specification and provides two main tools:
### Available Tools:
1. **get_cve_details**: Get comprehensive details for a specific CVE
2. **search_cves**: Search for CVEs by keyword
### Usage as MCP Server:
This interface automatically runs as an MCP server when launched.
### Features:
- Real-time CVE data from NIST NVD
- CVSS v2.0 and v3.1 scoring
- CWE weakness classification
- Affected product configurations
- Reference links and vendor comments
- Comprehensive search functionality
### Data Source:
[NIST National Vulnerability Database (NVD)](https://nvd.nist.gov/)
""")
return app
# Create the main CVE details interface
cve_details_demo = gr.Interface(
fn=get_cve_details_sync,
inputs=gr.Textbox(label="CVE ID", placeholder="e.g., CVE-2023-1234"),
outputs=gr.Markdown(label="CVE Details"),
title="CVE Details Lookup",
description="Enter a CVE ID to get comprehensive vulnerability information from the National Vulnerability Database."
)
# Create the CVE search interface
cve_search_demo = gr.Interface(
fn=search_cves_sync,
inputs=[
gr.Textbox(label="Search Keyword", placeholder="e.g., apache, wordpress, buffer overflow"),
gr.Slider(label="Max Results", minimum=1, maximum=50, value=10, step=1)
],
outputs=gr.Markdown(label="Search Results"),
title="CVE Search",
description="Search for CVEs by keyword, vendor, or product name."
)
# Create tabbed interface combining both functions
demo = gr.TabbedInterface(
[cve_details_demo, cve_search_demo],
["CVE Details", "CVE Search"],
title="πŸ”’ CVE Details MCP Server"
)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False,
mcp_server=True
)