"""CVE Dashboard - Real-time vulnerability monitoring with NVD API and LLM-powered audience customization.""" import os import json import time import logging from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go import requests # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Audience profiles for tailored CVE descriptions AUDIENCE_PROFILES = { "Cybersecurity Professional": { "focus": "threat assessment, attack vectors, mitigation strategies, and security controls", "tone": "technical and precise", "priorities": ["exploitation methods", "defensive measures", "risk assessment", "compliance implications"] }, "Data Scientist": { "focus": "data exposure risks, model vulnerabilities, and statistical analysis implications", "tone": "analytical and research-oriented", "priorities": ["data integrity", "model security", "pipeline vulnerabilities", "privacy concerns"] }, "Data Engineer": { "focus": "infrastructure vulnerabilities, data pipeline security, and system architecture impacts", "tone": "technical with infrastructure emphasis", "priorities": ["database security", "ETL vulnerabilities", "infrastructure risks", "data flow security"] }, "Full-Stack Developer": { "focus": "code vulnerabilities, dependency risks, and implementation fixes", "tone": "practical and code-oriented", "priorities": ["code examples", "library updates", "patch implementation", "secure coding practices"] }, "Product Owner": { "focus": "business impact, user experience, and prioritization for backlog", "tone": "business-oriented with technical context", "priorities": ["user impact", "feature implications", "timeline considerations", "resource requirements"] }, "Manager": { "focus": "business risk, resource allocation, and strategic implications", "tone": "executive summary style", "priorities": ["business impact", "cost implications", "team requirements", "timeline urgency"] } } class CVEDashboard: """Main CVE Dashboard application class.""" def __init__(self): """Initialize the CVE Dashboard.""" self.api_key = os.getenv('NVD_API_KEY') self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" self.headers = {'apiKey': self.api_key} if self.api_key else {} self.cache = {} self.last_request_time = 0 self.rate_limit_delay = 0.7 if self.api_key else 6 # seconds between requests # HuggingFace token - try environment first self.hf_token = os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN') def _rate_limit(self): """Implement rate limiting for NVD API.""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.rate_limit_delay: time.sleep(self.rate_limit_delay - time_since_last) self.last_request_time = time.time() def fetch_cves(self, year: int, keyword: Optional[str] = None, severity: Optional[str] = None, results_per_page: int = 2000) -> Tuple[List[Dict], str]: """ Fetch CVEs from NVD API for a specific year, handling the 120-day range limit and ensuring the date range does not extend into the future. Args: year: The year to fetch CVEs for. keyword: Optional keyword to search severity: Optional severity filter (LOW, MEDIUM, HIGH, CRITICAL) results_per_page: Number of results per page (max 2000) Returns: Tuple of (list of CVEs, status message) """ try: all_vulnerabilities = [] now = datetime.now() year_start = datetime(year, 1, 1) # If the selected year is the current year, end the search today. # Otherwise, use the end of the selected year. if year == now.year: year_end = now else: year_end = datetime(year, 12, 31, 23, 59, 59) current_start = year_start while current_start < year_end: self._rate_limit() # Calculate the end of the chunk, respecting the 120-day limit chunk_end = min(current_start + timedelta(days=119), year_end) logger.info(f"Fetching CVEs from {current_start.date()} to {chunk_end.date()}") # Format dates with timezone information (Z for UTC) start_date_str = current_start.strftime('%Y-%m-%dT%H:%M:%S.000Z') end_date_str = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.999Z') params = { 'pubStartDate': start_date_str, 'pubEndDate': end_date_str, 'resultsPerPage': min(results_per_page, 2000) } if keyword: params['keywordSearch'] = keyword response = requests.get( self.base_url, headers=self.headers, params=params, timeout=30 ) # Handle different error scenarios if response.status_code == 404: logger.warning(f"No data found for date range {current_start.date()} to {chunk_end.date()}") # Move to the next chunk and continue current_start = chunk_end + timedelta(days=1) continue elif response.status_code != 200: response.raise_for_status() data = response.json() vulnerabilities = data.get('vulnerabilities', []) all_vulnerabilities.extend(vulnerabilities) # Move to the next chunk current_start = chunk_end + timedelta(days=1) # Process and filter all aggregated CVEs processed_cves = [] for vuln in all_vulnerabilities: cve = self._process_cve(vuln.get('cve', {})) if severity and cve['severity'] != severity: continue processed_cves.append(cve) if not processed_cves: return [], f"No CVEs found for year {year}" + (f" matching '{keyword}'" if keyword else "") + (f" with {severity} severity" if severity else "") status = f"✓ Fetched {len(processed_cves)} CVEs from the year {year}" if keyword: status += f" matching '{keyword}'" if severity: status += f" with {severity} severity" return processed_cves, status except requests.exceptions.RequestException as e: error_details = "" if e.response is not None: try: error_data = e.response.json() error_details = f" - {error_data.get('message', e.response.text)}" except json.JSONDecodeError: error_details = f" - Status: {e.response.status_code}, Response: {e.response.text[:200]}" return [], f"✗ API Error: {str(e)}{error_details}" except Exception as e: return [], f"✗ Error: {str(e)}" def _process_cve(self, cve_data: Dict) -> Dict: """Process raw CVE data into a structured format.""" cve_id = cve_data.get('id', 'Unknown') # Extract description descriptions = cve_data.get('descriptions', []) description = next( (d['value'] for d in descriptions if d.get('lang') == 'en'), 'No description available' ) # Extract CVSS metrics and severity metrics = cve_data.get('metrics', {}) cvss_data = {} severity = 'UNKNOWN' score = 0.0 # Try CVSS 3.1 first, then 3.0, then 2.0 for cvss_version in ['cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2']: if cvss_version in metrics and metrics[cvss_version]: metric = metrics[cvss_version][0] cvss_data = metric.get('cvssData', {}) score = cvss_data.get('baseScore', 0.0) severity = cvss_data.get('baseSeverity', 'UNKNOWN') break # Extract references references = cve_data.get('references', []) ref_urls = [ref.get('url', '') for ref in references[:5]] # Limit to 5 refs # Extract dates published = cve_data.get('published', '') modified = cve_data.get('lastModified', '') return { 'id': cve_id, 'description': description, # Keep full description for LLM processing 'display_description': description[:500] + '...' if len(description) > 500 else description, 'severity': severity, 'score': score, 'published': published[:10] if published else 'Unknown', 'modified': modified[:10] if modified else 'Unknown', 'references': ref_urls, 'cvss_version': cvss_data.get('version', 'Unknown'), 'vector_string': cvss_data.get('vectorString', 'N/A') } def create_severity_chart(self, cves: List[Dict]) -> go.Figure: """Create a pie chart of CVE severities.""" if not cves: fig = go.Figure() fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig severity_counts = pd.DataFrame(cves)['severity'].value_counts() colors = { 'CRITICAL': '#d32f2f', 'HIGH': '#f57c00', 'MEDIUM': '#fbc02d', 'LOW': '#388e3c', 'UNKNOWN': '#9e9e9e' } fig = px.pie( values=severity_counts.values, names=severity_counts.index, title="CVE Distribution by Severity", color=severity_counts.index, color_discrete_map=colors ) fig.update_traces(textposition='inside', textinfo='percent+label') fig.update_layout(height=400) return fig def create_timeline_chart(self, cves: List[Dict]) -> go.Figure: """Create a timeline chart of CVE publications.""" if not cves: fig = go.Figure() fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig df = pd.DataFrame(cves) df['published'] = pd.to_datetime(df['published']) # Group by date and severity timeline_data = df.groupby([df['published'].dt.date, 'severity']).size().reset_index(name='count') fig = px.bar( timeline_data, x='published', y='count', color='severity', title="CVE Publications Timeline", color_discrete_map={ 'CRITICAL': '#d32f2f', 'HIGH': '#f57c00', 'MEDIUM': '#fbc02d', 'LOW': '#388e3c', 'UNKNOWN': '#9e9e9e' } ) fig.update_layout( xaxis_title="Publication Date", yaxis_title="Number of CVEs", height=400, hovermode='x unified' ) return fig def create_score_distribution(self, cves: List[Dict]) -> go.Figure: """Create a histogram of CVSS scores.""" if not cves: fig = go.Figure() fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig scores = [cve['score'] for cve in cves if cve['score'] > 0] fig = go.Figure(data=[go.Histogram( x=scores, nbinsx=20, marker_color='#1976d2' )]) fig.update_layout( title="CVSS Score Distribution", xaxis_title="CVSS Score", yaxis_title="Count", height=400, showlegend=False ) # Add severity range annotations fig.add_vrect(x0=0, x1=3.9, fillcolor="green", opacity=0.1, annotation_text="Low") fig.add_vrect(x0=4, x1=6.9, fillcolor="yellow", opacity=0.1, annotation_text="Medium") fig.add_vrect(x0=7, x1=8.9, fillcolor="orange", opacity=0.1, annotation_text="High") fig.add_vrect(x0=9, x1=10, fillcolor="red", opacity=0.1, annotation_text="Critical") return fig def format_cve_table(self, cves: List[Dict]) -> pd.DataFrame: """Format CVEs for display in a table.""" if not cves: return pd.DataFrame() df = pd.DataFrame(cves) # Select and reorder columns columns = ['id', 'severity', 'score', 'published', 'display_description'] df = df[columns] # Format the dataframe df = df.rename(columns={ 'id': 'CVE ID', 'severity': 'Severity', 'score': 'CVSS Score', 'published': 'Published', 'display_description': 'Description' }) return df def generate_tailored_summary(cve_description: str, audience: str, hf_token: Optional[str] = None, max_retries: int = 2) -> str: """ Generates a tailored CVE summary using google/gemma-2-2b-it via HuggingFace Inference API. Args: cve_description: The original CVE description audience: Target audience from AUDIENCE_PROFILES hf_token: HuggingFace API token (optional if set as env var) max_retries: Maximum number of retry attempts Returns: Tailored summary or error message """ # Use provided token or fall back to environment variable token = hf_token or os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN') if not token: return "❌ HuggingFace API token is required. Please set HF_TOKEN environment variable or enter your token." if not cve_description or not audience: return "❌ Please select a CVE and an audience first." if audience not in AUDIENCE_PROFILES: return f"❌ Unknown audience: {audience}" # Define the model(s) to use models = [ "google/gemma-2-2b-it", ] headers = {"Authorization": f"Bearer {token}"} profile = AUDIENCE_PROFILES[audience] # Gemma uses a specific chat template format. # Combine the system and user prompts into a single user turn. full_prompt = f"""You are an expert cybersecurity analyst. Rewrite this CVE description for a {audience}. **Target Audience:** {audience} **Focus:** {profile['focus']} **Tone:** {profile['tone']} **Key Priorities:** {', '.join(profile['priorities'])} **CVE Description:** {cve_description[:1200]} Provide a concise, actionable summary (2-3 sentences) highlighting what matters most to this audience. Focus on practical implications and next steps.""" # Use the OpenAI-compatible messages format messages = [ {"role": "user", "content": full_prompt} ] # Use the new, standardized router endpoint api_url = "https://router.huggingface.co/v1/chat/completions" for model in models: payload = { "model": model, "messages": messages, "max_tokens": 250, "temperature": 0.7, "top_p": 0.95, "stop": ["", ""] # Stop sequences for Gemma } for attempt in range(max_retries): try: logger.info(f"Generating summary with {model} (attempt {attempt + 1})") response = requests.post(api_url, headers=headers, json=payload, timeout=45) if response.status_code == 200: try: result = response.json() # New OpenAI-compatible response parsing summary = "" if "choices" in result and len(result["choices"]) > 0: message = result["choices"][0].get("message", {}) summary = message.get("content", "").strip() if summary and len(summary) > 20: logger.info(f"Successfully generated summary with {model}") return f"**{audience} Summary (via {model.split('/')[-1]}):**\n{summary}" else: # Handle cases where the model returns an empty summary logger.warning(f"Model {model} returned an empty or short summary.") continue # Retry if possible except json.JSONDecodeError as e: logger.warning(f"JSON decode error with {model}: {e}") continue elif response.status_code == 503: logger.warning(f"Model {model} is loading, trying next model...") break # Try next model elif response.status_code == 429: if attempt < max_retries - 1: time.sleep(5) continue else: break else: error_message = response.json().get("error", response.text) logger.warning(f"HTTP {response.status_code} with {model}: {error_message}") # If the model is not found or there's a validation error, don't retry. if response.status_code in [404, 422]: return f"❌ Model '{model}' not found or request is invalid. Please check the model name." break except requests.exceptions.Timeout: logger.warning(f"Timeout with {model} on attempt {attempt + 1}") if attempt >= max_retries - 1: break # Break outer loop if all retries failed except requests.exceptions.RequestException as e: logger.error(f"Request failed with {model}: {e}") break return "⏳ AI models are currently busy. This can happen during peak usage. Please try again in a few minutes." def create_interface(): """Create the Gradio interface.""" dashboard = CVEDashboard() with gr.Blocks(title="CVE Dashboard", theme=gr.themes.Soft()) as interface: # State to store fetched CVEs cve_state = gr.State([]) gr.Markdown( """ # 🛡️ CVE Dashboard with AI-Powered Audience Customization Real-time vulnerability monitoring using NIST National Vulnerability Database (NVD) with LLM-powered audience-specific summaries """ ) with gr.Row(): with gr.Column(scale=1): hf_token = gr.State(dashboard.hf_token) gr.Markdown("### 🔍 Search Parameters") current_year = datetime.now().year # Default to previous year to ensure we have data default_year = current_year - 1 if current_year == 2025 else current_year year_filter = gr.Dropdown( choices=list(range(current_year, current_year - 10, -1)), value=default_year, label="Year" ) keyword = gr.Textbox( label="Keyword Search (Optional)", placeholder="e.g., Apache, Linux, Microsoft" ) severity_filter = gr.Dropdown( choices=[None, "CRITICAL", "HIGH", "MEDIUM", "LOW"], label="Severity Filter", value=None ) fetch_btn = gr.Button("🔍 Fetch CVEs", variant="primary") with gr.Column(scale=3): status_text = gr.Textbox(label="Status", interactive=False) with gr.Tabs(): with gr.Tab("📊 Overview"): with gr.Row(): severity_chart = gr.Plot(label="Severity Distribution") timeline_chart = gr.Plot(label="Timeline") score_chart = gr.Plot(label="CVSS Score Distribution") with gr.Tab("📋 CVE List"): cve_table = gr.DataFrame( label="CVE Details", wrap=True, row_count=15 ) with gr.Tab("🤖 AI-Powered Summaries"): gr.Markdown("### Generate Audience-Specific CVE Summaries") with gr.Row(): with gr.Column(): cve_selector = gr.Dropdown( label="Select CVE", choices=[], info="Choose a CVE from the fetched results" ) audience_selector = gr.Dropdown( label="Target Audience", choices=list(AUDIENCE_PROFILES.keys()), value="Cybersecurity Professional", info="Select the professional perspective" ) generate_btn = gr.Button("🧠 Generate AI Summary", variant="primary") # Add status for generation generation_status = gr.Textbox( label="Generation Status", value="Ready to generate summaries", interactive=False ) with gr.Column(): audience_info = gr.Markdown( value="**Focus:** threat assessment, attack vectors, mitigation strategies, and security controls\n\n**Priorities:** exploitation methods, defensive measures, risk assessment, compliance implications" ) original_description = gr.Textbox( label="Original CVE Description", lines=4, interactive=False ) tailored_summary = gr.Textbox( label="AI-Generated Summary", lines=6, interactive=False, placeholder="Select a CVE and audience, then click 'Generate AI Summary'" ) with gr.Tab("ℹ️ About"): gr.Markdown( """ ### About this Dashboard This dashboard provides real-time monitoring of [Common Vulnerabilities and Exposures (CVEs)](https://en.wikipedia.org/wiki/Common_Vulnerabilities_and_Exposures) from the NIST National Vulnerability Database with AI-powered audience customization. **Features:** - Search CVEs by date range and keywords - Filter by severity levels - Visualize CVE distributions and trends - AI-powered audience-specific summaries using the google/gemma-2-2b-it model. **Severity Levels:** - **CRITICAL** (9.0-10.0): Complete system compromise possible - **HIGH** (7.0-8.9): Significant impact, immediate patching recommended - **MEDIUM** (4.0-6.9): Moderate impact, plan for updates - **LOW** (0.1-3.9): Minor impact, update in regular cycle **Supported Audiences:** - **Cybersecurity Professional:** Focus on threats, attack vectors, and mitigation - **Data Scientist:** Emphasis on data risks and model vulnerabilities - **Data Engineer:** Infrastructure security and pipeline risks - **Full-Stack Developer:** Code vulnerabilities and implementation fixes - **Product Owner:** Business impact and prioritization guidance - **Manager:** Executive summary with business implications **Data Source:** [NIST NVD API](https://nvd.nist.gov/developers/vulnerabilities) **AI Model:** [google/gemma-2-2b-it](https://huggingface.co/google/gemma-2-2b-it) **Disclaimer:** Generated content may be inaccurate or false. The free community tier of the Hugging Face Inference API powers this app's AI features. Since computing resources are shared, anticipate some delay on your initial request as the model loads. Later requests usually process more quickly. **Note:** If you encounter rate limiting or timeouts, please try again after a short wait. **Developed by** [M. Murat Ardag](https://mmuratardag.github.io/). """ ) # Event handlers def fetch_and_display(year, keyword_search, severity): """Fetch CVEs and update all displays.""" cves, status = dashboard.fetch_cves( year=year, keyword=keyword_search if keyword_search else None, severity=severity if severity else None ) if cves: df = dashboard.format_cve_table(cves) severity_fig = dashboard.create_severity_chart(cves) timeline_fig = dashboard.create_timeline_chart(cves) score_fig = dashboard.create_score_distribution(cves) # Update CVE selector choices cve_choices = [f"{cve['id']} ({cve['severity']}, {cve['score']})" for cve in cves] return ( cves, # Update state status, df, severity_fig, timeline_fig, score_fig, gr.Dropdown(choices=cve_choices, value=cve_choices[0] if cve_choices else None) # Update CVE selector ) else: empty_fig = go.Figure() empty_fig.add_annotation( text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return ( [], # Update state status, pd.DataFrame(), empty_fig, empty_fig, empty_fig, gr.Dropdown(choices=[], value=None) # Clear CVE selector ) def update_audience_info(audience): """Update audience information display.""" if audience in AUDIENCE_PROFILES: profile = AUDIENCE_PROFILES[audience] info = f"**Focus:** {profile['focus']}\n\n**Priorities:** {', '.join(profile['priorities'])}" return info return "Select an audience to see details" def update_cve_description(selected_cve, cves): """Update the original CVE description when a CVE is selected.""" if not selected_cve or not cves: return "" # Extract CVE ID from the selection (format: "CVE-2024-1234 (HIGH, 7.5)") cve_id = selected_cve.split(" (")[0] # Find the matching CVE for cve in cves: if cve['id'] == cve_id: return cve['description'] return "CVE description not found" def generate_summary_with_status(selected_cve, audience, token, cves): """Generate audience-specific summary with status updates.""" if not selected_cve or not audience or not cves: return "Please select a CVE and audience first.", "❌ Missing selection" # Extract CVE ID from the selection cve_id = selected_cve.split(" (")[0] # Find the matching CVE for cve in cves: if cve['id'] == cve_id: # Update status to show generation in progress yield "Generating AI summary... This may take 30-60 seconds.", "🔄 Generating..." summary = generate_tailored_summary(cve['description'], audience, token) if summary.startswith("❌"): yield summary, "❌ Generation failed" elif summary.startswith("⏳"): yield summary, "⏳ Models busy" else: yield summary, "✅ Summary generated" return yield "CVE not found", "❌ CVE not found" # Wire up the event handlers fetch_btn.click( fn=fetch_and_display, inputs=[year_filter, keyword, severity_filter], outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector] ) audience_selector.change( fn=update_audience_info, inputs=[audience_selector], outputs=[audience_info] ) cve_selector.change( fn=update_cve_description, inputs=[cve_selector, cve_state], outputs=[original_description] ) generate_btn.click( fn=generate_summary_with_status, inputs=[cve_selector, audience_selector, hf_token, cve_state], outputs=[tailored_summary, generation_status] ) # Load initial data interface.load( fn=fetch_and_display, inputs=[year_filter, keyword, severity_filter], outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector] ) return interface if __name__ == "__main__": # Check for API keys if os.getenv('NVD_API_KEY'): print("✓ NVD API key loaded - Higher rate limits enabled") else: print("⚠ No NVD API key found - Using lower rate limits") print(" Get a free API key at: https://nvd.nist.gov/developers/request-an-api-key") if os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN'): print("✓ HuggingFace token loaded - AI summaries enabled") else: print("⚠ No HuggingFace token found - Users will need to enter their own") print(" Get a free token at: https://huggingface.co/settings/tokens") # Create and launch the interface app = create_interface() app.launch()