|
|
import json |
|
|
import os |
|
|
import base64 |
|
|
from typing import Dict, List, Any |
|
|
import requests |
|
|
import gradio as gr |
|
|
from dotenv import load_dotenv |
|
|
from datasets import load_dataset |
|
|
try: |
|
|
from langchain_core.documents import Document |
|
|
except ImportError: |
|
|
try: |
|
|
from langchain.docstore.document import Document |
|
|
except ImportError: |
|
|
try: |
|
|
from langchain.schema import Document |
|
|
except ImportError: |
|
|
|
|
|
class Document: |
|
|
def __init__(self, page_content: str, metadata: dict = None): |
|
|
self.page_content = page_content |
|
|
self.metadata = metadata or {} |
|
|
|
|
|
|
|
|
RecursiveCharacterTextSplitter = None |
|
|
|
|
|
try: |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
print("β
Using langchain.text_splitter.RecursiveCharacterTextSplitter") |
|
|
except ImportError: |
|
|
try: |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
print("β
Using langchain_text_splitters.RecursiveCharacterTextSplitter") |
|
|
except ImportError: |
|
|
print("β οΈ Using fallback RecursiveCharacterTextSplitter") |
|
|
|
|
|
class RecursiveCharacterTextSplitter: |
|
|
def __init__(self, chunk_size=500, chunk_overlap=50, **kwargs): |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
print(f"π Initialized fallback text splitter with chunk_size={chunk_size}") |
|
|
|
|
|
def split_documents(self, documents): |
|
|
"""Simple document splitting fallback""" |
|
|
print(f"π Splitting {len(documents)} documents using fallback method...") |
|
|
result = [] |
|
|
for doc in documents: |
|
|
text = doc.page_content |
|
|
|
|
|
for i in range(0, len(text), self.chunk_size - self.chunk_overlap): |
|
|
chunk = text[i:i + self.chunk_size] |
|
|
if chunk.strip(): |
|
|
result.append(Document(page_content=chunk, metadata=doc.metadata)) |
|
|
print(f"β
Split into {len(result)} chunks") |
|
|
return result |
|
|
from langchain_community.retrievers import BM25Retriever |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
class GitHubMCPServer: |
|
|
"""GitHub MCP Server for repository scanning, file access, and CVE retrieval""" |
|
|
|
|
|
def __init__(self): |
|
|
self.github_token = os.getenv("GITHUB_TOKEN") |
|
|
if not self.github_token: |
|
|
raise ValueError("GITHUB_TOKEN environment variable is required") |
|
|
|
|
|
self.headers = { |
|
|
"Authorization": f"token {self.github_token}", |
|
|
"Accept": "application/vnd.github.v3+json" |
|
|
} |
|
|
|
|
|
|
|
|
self.cve_retriever = None |
|
|
self._initialize_cve_retriever() |
|
|
|
|
|
def _initialize_cve_retriever(self): |
|
|
"""Initialize the CVE retriever with Hugging Face dataset""" |
|
|
try: |
|
|
print("π Loading CVE dataset from Hugging Face...") |
|
|
|
|
|
|
|
|
|
|
|
knowledge_base = load_dataset("CIRCL/vulnerability", split="train") |
|
|
|
|
|
print(f"π Loaded {len(knowledge_base)} vulnerability records from Hugging Face") |
|
|
|
|
|
|
|
|
print("π Dataset structure analysis:") |
|
|
print(f"Dataset columns: {knowledge_base.column_names}") |
|
|
for i in range(min(2, len(knowledge_base))): |
|
|
print(f"Record {i}: {dict(knowledge_base[i])}") |
|
|
|
|
|
|
|
|
print("π Filtering for CVE entries only...") |
|
|
cve_dataset = knowledge_base.filter(lambda row: str(row["id"]).startswith("CVE-")) |
|
|
|
|
|
print(f"π Filtered to {len(cve_dataset)} CVE records (excluded GHSA entries)") |
|
|
|
|
|
|
|
|
source_docs = [] |
|
|
for record in cve_dataset: |
|
|
cve_id = record.get('id', '') |
|
|
description = record.get('description', '') |
|
|
|
|
|
|
|
|
if not cve_id or not description: |
|
|
continue |
|
|
|
|
|
|
|
|
content = f"CVE ID: {cve_id}\nDescription: {description}" |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'cve_id': str(cve_id), |
|
|
'description': str(description) |
|
|
} |
|
|
|
|
|
source_docs.append(Document(page_content=content, metadata=metadata)) |
|
|
|
|
|
print(f"π Created {len(source_docs)} CVE document objects") |
|
|
|
|
|
if not source_docs: |
|
|
print("β No valid CVE documents found in dataset") |
|
|
self.cve_retriever = None |
|
|
return |
|
|
|
|
|
|
|
|
print("π Initializing text splitter...") |
|
|
try: |
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=500, |
|
|
chunk_overlap=50, |
|
|
add_start_index=True, |
|
|
strip_whitespace=True, |
|
|
separators=["\n\n", "\n", ".", " ", ""], |
|
|
) |
|
|
print("β
Text splitter initialized successfully") |
|
|
except Exception as splitter_error: |
|
|
print(f"β Text splitter initialization failed: {splitter_error}") |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) |
|
|
print("β
Using simple fallback text splitter") |
|
|
|
|
|
print("π Processing documents with text splitter...") |
|
|
try: |
|
|
docs_processed = text_splitter.split_documents(source_docs) |
|
|
print(f"π Knowledge base prepared with {len(docs_processed)} document chunks") |
|
|
except Exception as processing_error: |
|
|
print(f"β Document processing failed: {processing_error}") |
|
|
|
|
|
docs_processed = source_docs |
|
|
print(f"β
Using original documents without splitting: {len(docs_processed)} documents") |
|
|
|
|
|
|
|
|
print("π Initializing BM25 retriever...") |
|
|
try: |
|
|
self.cve_retriever = BM25Retriever.from_documents( |
|
|
docs_processed, |
|
|
k=3 |
|
|
) |
|
|
print(f"β
CVE Retriever initialized with {len(docs_processed)} document chunks") |
|
|
except Exception as retriever_error: |
|
|
print(f"β BM25 retriever initialization failed: {retriever_error}") |
|
|
self.cve_retriever = None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error initializing CVE retriever: {str(e)}") |
|
|
print("π‘ Make sure you have access to the Hugging Face dataset 'CIRCL/vulnerability'") |
|
|
print("π‘ You may need to login with: huggingface-cli login") |
|
|
print("π‘ Dataset columns should be: id, title, description, cpes") |
|
|
self.cve_retriever = None |
|
|
|
|
|
def get_repository_info(self, owner: str, repo: str) -> dict: |
|
|
"""Get basic repository information""" |
|
|
try: |
|
|
url = f"https://api.github.com/repos/{owner}/{repo}" |
|
|
response = requests.get(url, headers=self.headers) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
return { |
|
|
"success": True, |
|
|
"repository_name": data["name"], |
|
|
"full_name": data["full_name"], |
|
|
"description": data.get("description", "No description available"), |
|
|
"primary_language": data.get("language", "Unknown"), |
|
|
"size_kb": data["size"], |
|
|
"stars": data["stargazers_count"], |
|
|
"forks": data["forks_count"], |
|
|
"default_branch": data["default_branch"], |
|
|
"created_date": data["created_at"][:10], |
|
|
"last_updated": data["updated_at"][:10], |
|
|
"is_private": data["private"], |
|
|
"clone_url": data["clone_url"] |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Repository not found or inaccessible (HTTP {response.status_code})", |
|
|
"status_code": response.status_code |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Failed to fetch repository information: {str(e)}" |
|
|
} |
|
|
|
|
|
def get_file_content(self, owner: str, repo: str, path: str) -> str: |
|
|
"""Get content of a specific file - returns just the file content as string""" |
|
|
try: |
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" |
|
|
response = requests.get(url, headers=self.headers) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
if data["type"] == "file" and "content" in data: |
|
|
|
|
|
try: |
|
|
content = base64.b64decode(data["content"]).decode('utf-8') |
|
|
return content |
|
|
except UnicodeDecodeError: |
|
|
return f"ERROR: File '{path}' contains binary data that cannot be decoded as text" |
|
|
else: |
|
|
return f"ERROR: Path '{path}' is not a file or content is not available" |
|
|
else: |
|
|
return f"ERROR: File '{path}' not found or inaccessible (HTTP {response.status_code})" |
|
|
|
|
|
except Exception as e: |
|
|
return f"ERROR: Failed to fetch file content for '{path}': {str(e)}" |
|
|
|
|
|
def scan_repository(self, owner: str, repo: str, extensions: str = ".py,.js,.ts,.php,.java") -> list: |
|
|
"""Scan repository for code files - returns simple list of file paths""" |
|
|
try: |
|
|
ext_list = [ext.strip() for ext in extensions.split(",") if ext.strip()] |
|
|
all_files = [] |
|
|
self._scan_directory_sync(owner, repo, "", ext_list, all_files) |
|
|
|
|
|
|
|
|
file_paths = [file_info.get('path', '') for file_info in all_files[:50]] |
|
|
return file_paths |
|
|
|
|
|
except Exception as e: |
|
|
return [f"ERROR: Failed to scan repository: {str(e)}"] |
|
|
|
|
|
def _scan_directory_sync(self, owner: str, repo: str, path: str, extensions: List[str], all_files: List[Dict]): |
|
|
"""Recursively scan directory for files""" |
|
|
try: |
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" |
|
|
response = requests.get(url, headers=self.headers) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
for item in data: |
|
|
if item["type"] == "file": |
|
|
if any(item["name"].endswith(ext) for ext in extensions): |
|
|
all_files.append({ |
|
|
"name": item["name"], |
|
|
"path": item["path"], |
|
|
"type": item["type"], |
|
|
"size": item.get("size", 0), |
|
|
"sha": item["sha"] |
|
|
}) |
|
|
elif item["type"] == "dir" and len(all_files) < 100: |
|
|
self._scan_directory_sync(owner, repo, item["path"], extensions, all_files) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def search_cve_database(self, query: str) -> str: |
|
|
"""Search CVE database for relevant vulnerability information""" |
|
|
if not self.cve_retriever: |
|
|
return "β CVE retriever not properly initialized. Please check Hugging Face dataset access." |
|
|
|
|
|
try: |
|
|
|
|
|
docs = self.cve_retriever.invoke(query) |
|
|
|
|
|
if not docs: |
|
|
return f"No relevant CVE information found for query: '{query}'" |
|
|
|
|
|
|
|
|
result = f"π **CVE Knowledge Base Results for: '{query}'**\n\n" |
|
|
|
|
|
for i, doc in enumerate(docs, 1): |
|
|
metadata = doc.metadata |
|
|
result += f"**Result {i}:**\n" |
|
|
result += f"- **CVE ID**: {metadata.get('cve_id', 'Unknown')}\n" |
|
|
|
|
|
|
|
|
description = metadata.get('description', '') |
|
|
if not description: |
|
|
content_lines = doc.page_content.split('\n') |
|
|
desc_line = next((line for line in content_lines if line.startswith('Description:')), '') |
|
|
description = desc_line.replace('Description: ', '').strip() if desc_line else 'No description available' |
|
|
|
|
|
result += f"- **Description**: {description[:200]}{'...' if len(description) > 200 else ''}\n" |
|
|
result += "---\n" |
|
|
|
|
|
|
|
|
cve_ids = [doc.metadata.get('cve_id') for doc in docs if doc.metadata.get('cve_id')] |
|
|
|
|
|
result += f"\n**π Analysis Summary:**\n" |
|
|
result += f"- **CVE Examples**: {', '.join(cve_ids[:3])}{'...' if len(cve_ids) > 3 else ''}\n" |
|
|
result += f"- **Total Matches**: {len(docs)}\n" |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error retrieving CVE information: {str(e)}" |
|
|
|
|
|
def simple_cve_search(self, query: str, k: int = 3) -> str: |
|
|
"""Simple CVE search that returns only CVE IDs and descriptions for multi-agent workflow""" |
|
|
if not self.cve_retriever: |
|
|
return "β CVE retriever not properly initialized. Please check Hugging Face dataset access." |
|
|
|
|
|
try: |
|
|
|
|
|
original_k = self.cve_retriever.k |
|
|
self.cve_retriever.k = k |
|
|
|
|
|
|
|
|
docs = self.cve_retriever.invoke(query) |
|
|
|
|
|
|
|
|
self.cve_retriever.k = original_k |
|
|
|
|
|
if not docs: |
|
|
return f"No relevant CVE information found for query: '{query}'" |
|
|
|
|
|
|
|
|
result = f"Top {len(docs)} CVE matches for '{query}':\n\n" |
|
|
|
|
|
for i, doc in enumerate(docs, 1): |
|
|
metadata = doc.metadata |
|
|
cve_id = metadata.get('cve_id', 'Unknown') |
|
|
|
|
|
|
|
|
description = metadata.get('description', '') |
|
|
if not description: |
|
|
content_lines = doc.page_content.split('\n') |
|
|
desc_line = next((line for line in content_lines if line.startswith('Description:')), '') |
|
|
description = desc_line.replace('Description: ', '').strip() if desc_line else 'No description available' |
|
|
|
|
|
result += f"{i}. {cve_id}\n" |
|
|
result += f" {description[:150]}{'...' if len(description) > 150 else ''}\n\n" |
|
|
|
|
|
return result.strip() |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error retrieving CVE information: {str(e)}" |
|
|
|
|
|
def get_nvd_cve_details(self, cve_id: str) -> str: |
|
|
""" |
|
|
Fetches detailed CVE information from NVD (National Vulnerability Database). |
|
|
|
|
|
Args: |
|
|
cve_id: The CVE identifier (e.g., 'CVE-2019-16515') |
|
|
|
|
|
Returns: |
|
|
Formatted string containing detailed CVE information from NVD |
|
|
""" |
|
|
try: |
|
|
|
|
|
cve_id = cve_id.strip().upper() |
|
|
if not cve_id.startswith('CVE-'): |
|
|
return f"β Invalid CVE ID format: '{cve_id}'\nCVE ID must start with 'CVE-' (e.g., CVE-2019-16515)" |
|
|
|
|
|
|
|
|
nvd_api_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" |
|
|
nvd_web_url = f"https://nvd.nist.gov/vuln/detail/{cve_id}" |
|
|
|
|
|
|
|
|
params = {"cveId": cve_id} |
|
|
headers = { |
|
|
"User-Agent": "VulnerabilityScanner/1.0 (GitHub Security Analysis Tool)" |
|
|
} |
|
|
|
|
|
print(f"π Fetching NVD details for {cve_id}...") |
|
|
response = requests.get(nvd_api_url, params=params, headers=headers, timeout=15) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
if data.get('resultsPerPage', 0) == 0: |
|
|
return f"β οΈ CVE not found in NVD database: {cve_id}\n\nπ **NVD URL**: {nvd_web_url}\n\nNote: The CVE may not yet be published in NVD or the ID might be incorrect." |
|
|
|
|
|
|
|
|
vuln = data['vulnerabilities'][0]['cve'] |
|
|
|
|
|
|
|
|
result = f"π **NVD CVE Details: {cve_id}**\n\n" |
|
|
result += f"π **NVD URL**: {nvd_web_url}\n\n" |
|
|
|
|
|
|
|
|
result += f"**Status**: {vuln.get('vulnStatus', 'N/A')}\n" |
|
|
result += f"**Published**: {vuln.get('published', 'N/A')[:10]}\n" |
|
|
result += f"**Last Modified**: {vuln.get('lastModified', 'N/A')[:10]}\n\n" |
|
|
|
|
|
|
|
|
descriptions = vuln.get('descriptions', []) |
|
|
for desc in descriptions: |
|
|
if desc.get('lang') == 'en': |
|
|
result += f"**π Description**:\n{desc.get('value', 'N/A')}\n\n" |
|
|
break |
|
|
|
|
|
|
|
|
metrics = vuln.get('metrics', {}) |
|
|
|
|
|
|
|
|
if 'cvssMetricV31' in metrics or 'cvssMetricV30' in metrics: |
|
|
cvss_key = 'cvssMetricV31' if 'cvssMetricV31' in metrics else 'cvssMetricV30' |
|
|
cvss_v3 = metrics[cvss_key][0]['cvssData'] |
|
|
|
|
|
result += f"**π― CVSS v3 Score**:\n" |
|
|
result += f"- **Base Score**: {cvss_v3.get('baseScore', 'N/A')} ({cvss_v3.get('baseSeverity', 'N/A')})\n" |
|
|
result += f"- **Vector String**: {cvss_v3.get('vectorString', 'N/A')}\n" |
|
|
result += f"- **Attack Vector**: {cvss_v3.get('attackVector', 'N/A')}\n" |
|
|
result += f"- **Attack Complexity**: {cvss_v3.get('attackComplexity', 'N/A')}\n" |
|
|
result += f"- **Privileges Required**: {cvss_v3.get('privilegesRequired', 'N/A')}\n" |
|
|
result += f"- **User Interaction**: {cvss_v3.get('userInteraction', 'N/A')}\n" |
|
|
result += f"- **Scope**: {cvss_v3.get('scope', 'N/A')}\n" |
|
|
result += f"- **Confidentiality Impact**: {cvss_v3.get('confidentialityImpact', 'N/A')}\n" |
|
|
result += f"- **Integrity Impact**: {cvss_v3.get('integrityImpact', 'N/A')}\n" |
|
|
result += f"- **Availability Impact**: {cvss_v3.get('availabilityImpact', 'N/A')}\n\n" |
|
|
|
|
|
|
|
|
if 'cvssMetricV2' in metrics: |
|
|
cvss_v2 = metrics['cvssMetricV2'][0]['cvssData'] |
|
|
result += f"**CVSS v2 Score**:\n" |
|
|
result += f"- **Base Score**: {cvss_v2.get('baseScore', 'N/A')} ({metrics['cvssMetricV2'][0].get('baseSeverity', 'N/A')})\n" |
|
|
result += f"- **Vector String**: {cvss_v2.get('vectorString', 'N/A')}\n\n" |
|
|
|
|
|
|
|
|
weaknesses = vuln.get('weaknesses', []) |
|
|
if weaknesses: |
|
|
result += f"**π CWE (Common Weakness Enumeration)**:\n" |
|
|
cwe_list = [] |
|
|
for weakness in weaknesses: |
|
|
for desc in weakness.get('description', []): |
|
|
if desc.get('lang') == 'en': |
|
|
cwe_list.append(desc.get('value', 'N/A')) |
|
|
result += f"- {', '.join(set(cwe_list))}\n\n" |
|
|
|
|
|
|
|
|
references = vuln.get('references', []) |
|
|
if references: |
|
|
result += f"**π References** (showing first 5):\n" |
|
|
for i, ref in enumerate(references[:5], 1): |
|
|
result += f"{i}. [{ref.get('source', 'Source')}]({ref.get('url', '#')})\n" |
|
|
if len(references) > 5: |
|
|
result += f"\n... and {len(references) - 5} more references\n" |
|
|
result += "\n" |
|
|
|
|
|
result += f"---\n" |
|
|
result += f"π‘ **Tip**: Use this CVE information to cross-reference vulnerabilities found in code analysis.\n" |
|
|
|
|
|
return result |
|
|
|
|
|
elif response.status_code == 404: |
|
|
return f"β οΈ CVE not found: {cve_id}\n\nπ **NVD URL**: {nvd_web_url}\n\nThe CVE may not exist or may not yet be published in NVD." |
|
|
|
|
|
elif response.status_code == 403: |
|
|
return f"β Access denied to NVD API (HTTP 403)\n\nThis might be due to rate limiting. Please try again in a few moments.\n\nπ **NVD URL**: {nvd_web_url}" |
|
|
|
|
|
else: |
|
|
return f"β NVD API request failed with status {response.status_code}\n\nπ **NVD URL**: {nvd_web_url}\n\nYou can view the CVE details directly on the NVD website." |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
return f"β±οΈ Request to NVD API timed out for {cve_id}\n\nPlease try again or visit: {nvd_web_url}" |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return f"β Network error while fetching CVE details: {str(e)}\n\nπ **NVD URL**: {nvd_web_url}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Unexpected error fetching NVD details for {cve_id}: {str(e)}\n\nπ **NVD URL**: {nvd_web_url}" |
|
|
|
|
|
def search_and_fetch_cve_details(self, query: str, max_nvd_fetches: int = 5) -> str: |
|
|
""" |
|
|
Smart combined function: Searches CVE database and automatically fetches NVD details. |
|
|
|
|
|
This function: |
|
|
1. Searches the CVE knowledge base (RAG) for relevant vulnerabilities |
|
|
2. Automatically parses CVE IDs from the results |
|
|
3. Fetches detailed NVD information for top CVEs |
|
|
4. Returns combined results with both RAG data and NVD details |
|
|
|
|
|
Args: |
|
|
query: Vulnerability search query (e.g., "SQL injection", "XSS") |
|
|
max_nvd_fetches: Maximum number of CVEs to fetch NVD details for (default: 5) |
|
|
|
|
|
Returns: |
|
|
Formatted string with RAG results + detailed NVD information |
|
|
""" |
|
|
import re |
|
|
import time |
|
|
|
|
|
try: |
|
|
|
|
|
print(f"π Step 1: Searching CVE knowledge base for '{query}'...") |
|
|
rag_results = self.search_cve_database(query) |
|
|
|
|
|
if "β" in rag_results or "No relevant CVE information found" in rag_results: |
|
|
return rag_results |
|
|
|
|
|
|
|
|
print(f"π Step 2: Parsing CVE IDs from results...") |
|
|
cve_pattern = r'CVE-\d{4}-\d{4,7}' |
|
|
cve_ids = re.findall(cve_pattern, rag_results) |
|
|
|
|
|
|
|
|
unique_cve_ids = list(dict.fromkeys(cve_ids))[:max_nvd_fetches] |
|
|
|
|
|
if not unique_cve_ids: |
|
|
return rag_results + "\n\nβ οΈ No CVE IDs found in results to fetch NVD details." |
|
|
|
|
|
print(f"β
Found {len(unique_cve_ids)} unique CVE IDs: {', '.join(unique_cve_ids)}") |
|
|
|
|
|
|
|
|
combined_result = "π¬ **COMPREHENSIVE CVE ANALYSIS**\n" |
|
|
combined_result += "=" * 80 + "\n\n" |
|
|
|
|
|
|
|
|
combined_result += "## π PART 1: CVE Knowledge Base Search Results\n\n" |
|
|
combined_result += rag_results |
|
|
combined_result += "\n\n" + "=" * 80 + "\n\n" |
|
|
|
|
|
|
|
|
combined_result += f"## π PART 2: Detailed NVD Information (Top {len(unique_cve_ids)} CVEs)\n\n" |
|
|
combined_result += f"Fetching official NVD details for: {', '.join(unique_cve_ids)}\n\n" |
|
|
combined_result += "-" * 80 + "\n\n" |
|
|
|
|
|
for idx, cve_id in enumerate(unique_cve_ids, 1): |
|
|
print(f"π Step 3.{idx}: Fetching NVD details for {cve_id}...") |
|
|
|
|
|
|
|
|
nvd_result = self.get_nvd_cve_details(cve_id) |
|
|
|
|
|
combined_result += nvd_result |
|
|
combined_result += "\n" + "=" * 80 + "\n\n" |
|
|
|
|
|
|
|
|
if idx < len(unique_cve_ids): |
|
|
time.sleep(6) |
|
|
|
|
|
|
|
|
combined_result += "## π SUMMARY\n\n" |
|
|
combined_result += f"β
**Total CVEs Analyzed**: {len(unique_cve_ids)}\n" |
|
|
combined_result += f"β
**Search Query**: {query}\n" |
|
|
combined_result += f"β
**RAG Results**: {len(cve_ids)} CVE references found\n" |
|
|
combined_result += f"β
**NVD Details Fetched**: {len(unique_cve_ids)} CVEs\n\n" |
|
|
combined_result += "π‘ **Next Steps**: Use this information to:\n" |
|
|
combined_result += "- Cross-reference vulnerabilities in your code\n" |
|
|
combined_result += "- Understand CVSS severity scores\n" |
|
|
combined_result += "- Review CWE classifications\n" |
|
|
combined_result += "- Check official NVD references for remediation guidance\n" |
|
|
|
|
|
print(f"β
Combined analysis complete!") |
|
|
return combined_result |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error in combined CVE analysis: {str(e)}\n\nPlease try using search_cve_database and get_nvd_cve_details separately." |
|
|
|
|
|
|
|
|
github_server = GitHubMCPServer() |
|
|
|
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
[ |
|
|
gr.Interface( |
|
|
fn=github_server.get_repository_info, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Repository Owner", placeholder="octocat"), |
|
|
gr.Textbox(label="Repository Name", placeholder="Hello-World") |
|
|
], |
|
|
outputs=gr.Textbox(label="Repository Information", lines=15), |
|
|
title="Get Repository Information", |
|
|
description="Get basic information about a GitHub repository", |
|
|
api_name="get_repository_info" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.get_file_content, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Repository Owner", placeholder="octocat"), |
|
|
gr.Textbox(label="Repository Name", placeholder="Hello-World"), |
|
|
gr.Textbox(label="File Path", placeholder="README.md") |
|
|
], |
|
|
outputs=gr.Textbox(label="File Content", lines=20), |
|
|
title="Get File Content", |
|
|
description="Get the content of a specific file from a GitHub repository", |
|
|
api_name="get_file_content" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.scan_repository, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Repository Owner", placeholder="octocat"), |
|
|
gr.Textbox(label="Repository Name", placeholder="Hello-World"), |
|
|
gr.Textbox(label="File Extensions", value=".py,.js,.ts,.php,.java", placeholder=".py,.js,.ts,.php,.java") |
|
|
], |
|
|
outputs=gr.Textbox(label="Scan Results", lines=20), |
|
|
title="Scan Repository for Code Files", |
|
|
description="Scan a GitHub repository for code files with specified extensions", |
|
|
api_name="scan_repository" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.search_cve_database, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc.") |
|
|
], |
|
|
outputs=gr.Textbox(label="CVE Search Results", lines=25), |
|
|
title="Search CVE Database", |
|
|
description="Search the CVE knowledge base for vulnerability patterns and CWE information", |
|
|
api_name="search_cve_database" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.get_nvd_cve_details, |
|
|
inputs=[ |
|
|
gr.Textbox(label="CVE ID", placeholder="CVE-2019-16515", value="CVE-2019-16515") |
|
|
], |
|
|
outputs=gr.Textbox(label="NVD CVE Details", lines=30), |
|
|
title="Get NVD CVE Details", |
|
|
description="Fetch detailed CVE information from National Vulnerability Database (NVD)", |
|
|
api_name="get_nvd_cve_details" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.search_and_fetch_cve_details, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc.", value="SQL injection"), |
|
|
gr.Slider(minimum=1, maximum=10, value=5, step=1, label="Max NVD Fetches", info="Number of CVEs to fetch NVD details for") |
|
|
], |
|
|
outputs=gr.Textbox(label="Comprehensive CVE Analysis", lines=40), |
|
|
title="π¬ Smart CVE Analysis (RAG + NVD)", |
|
|
description="Automatically searches CVE database AND fetches detailed NVD information for top CVEs", |
|
|
api_name="search_and_fetch_cve_details" |
|
|
), |
|
|
gr.Interface( |
|
|
fn=github_server.simple_cve_search, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Vulnerability Query", placeholder="SQL injection, XSS, command injection, etc."), |
|
|
gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Number of Results", info="Number of CVE matches to return") |
|
|
], |
|
|
outputs=gr.Textbox(label="Simple CVE Search Results", lines=15), |
|
|
title="π Simple CVE Search", |
|
|
description="Simple CVE search returning only CVE IDs and descriptions (for multi-agent workflow)", |
|
|
api_name="simple_cve_search" |
|
|
) |
|
|
], |
|
|
[ |
|
|
"Repository Info", |
|
|
"File Content", |
|
|
"Repository Scanner", |
|
|
"CVE Database", |
|
|
"NVD CVE Details", |
|
|
"π¬ Smart CVE Analysis", |
|
|
"π Simple CVE Search" |
|
|
], |
|
|
title="π GitHub MCP Server with CVE Knowledge Base & NVD Integration" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("π Starting GitHub MCP Server with CVE Knowledge Base & NVD Integration...") |
|
|
print("π‘ Server will provide GitHub repository access, CVE search, and NVD details via MCP") |
|
|
print("π οΈ Available tools:") |
|
|
print(" - get_repository_info: Get repository metadata") |
|
|
print(" - get_file_content: Retrieve file contents") |
|
|
print(" - scan_repository: Scan for code files") |
|
|
print(" - search_cve_database: Search CVE knowledge base") |
|
|
print(" - get_nvd_cve_details: Fetch detailed CVE info from NVD") |
|
|
print(" - π search_and_fetch_cve_details: Smart combined RAG + NVD analysis") |
|
|
|
|
|
demo.launch(mcp_server=True) |