Spaces:
Runtime error
Runtime error
import streamlit as st | |
import json | |
import re | |
import requests | |
import subprocess | |
import tempfile | |
import time | |
import os | |
import numpy as np | |
import pandas as pd | |
import datetime | |
from datetime import timedelta | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import base64 | |
import hashlib | |
import io | |
import csv | |
import uuid | |
from duckduckgo_search import DDGS | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
from PIL import Image | |
import pytz | |
import threading | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import logging | |
from functools import lru_cache | |
import sqlite3 | |
from typing import Dict, List, Optional, Tuple, Any | |
import warnings | |
import psycopg2 | |
from psycopg2 import pool | |
import random | |
import seaborn as sns | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import mean_squared_error, r2_score | |
import xml.etree.ElementTree as ET | |
warnings.filterwarnings('ignore') | |
# ===================== ENHANCED SYSTEM CONFIGURATION ===================== | |
DEBUG_MODE = os.getenv('DEBUG_MODE', 'False').lower() == 'true' | |
MAX_RESEARCH_RESULTS = 10 | |
CODE_EXECUTION_TIMEOUT = 30 | |
SAFE_MODE = True | |
PERSONAS = ["Researcher", "Teacher", "Analyst", "Engineer", "Scientist", "Assistant", "Consultant", "Creative", "Problem Solver"] | |
SESSION_FILE = "session_state.json" | |
USER_DB = "users.db" | |
TEAM_DB = "teams.json" | |
WORKFLOW_DB = "workflows.json" | |
CACHE_DB = "cache.db" | |
# Setup enhanced logging | |
logging.basicConfig( | |
level=logging.INFO if DEBUG_MODE else logging.WARNING, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# ===================== ENHANCED DATABASE MANAGER ===================== | |
class EnhancedDatabaseManager: | |
def __init__(self): | |
self.pg_pool = None | |
self.init_databases() | |
def init_databases(self): | |
"""Initialize both SQLite and PostgreSQL databases""" | |
try: | |
# Initialize SQLite for local data | |
self.init_sqlite() | |
# Try to initialize PostgreSQL if available | |
self.init_postgresql() | |
except Exception as e: | |
logger.error(f"Database initialization error: {e}") | |
def init_sqlite(self): | |
"""Initialize SQLite database with proper error handling""" | |
try: | |
# Ensure database file exists | |
if not os.path.exists(CACHE_DB): | |
open(CACHE_DB, 'a').close() | |
conn = sqlite3.connect(CACHE_DB) | |
cursor = conn.cursor() | |
# Create tables with IF NOT EXISTS | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id TEXT PRIMARY KEY, | |
name TEXT, | |
preferences TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS sessions ( | |
id TEXT PRIMARY KEY, | |
user_id TEXT, | |
data TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS cache ( | |
key TEXT PRIMARY KEY, | |
value TEXT, | |
expires_at TIMESTAMP, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS analytics ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id TEXT, | |
action TEXT, | |
details TEXT, | |
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
conn.commit() | |
conn.close() | |
logger.info("SQLite database initialized successfully") | |
except Exception as e: | |
logger.error(f"SQLite initialization error: {e}") | |
def init_postgresql(self): | |
"""Initialize PostgreSQL connection if available""" | |
try: | |
database_url = os.environ.get('DATABASE_URL') | |
if database_url: | |
self.pg_pool = psycopg2.pool.SimpleConnectionPool( | |
1, 10, database_url | |
) | |
logger.info("PostgreSQL connection pool initialized") | |
except Exception as e: | |
logger.warning(f"PostgreSQL not available: {e}") | |
def get_connection(self): | |
"""Get database connection (prefer PostgreSQL, fallback to SQLite)""" | |
if self.pg_pool: | |
try: | |
return self.pg_pool.getconn(), "postgresql" | |
except Exception as e: | |
logger.warning(f"PostgreSQL connection failed: {e}") | |
return sqlite3.connect(CACHE_DB), "sqlite" | |
def return_connection(self, conn, db_type): | |
"""Return connection to pool""" | |
try: | |
if db_type == "postgresql" and self.pg_pool: | |
self.pg_pool.putconn(conn) | |
else: | |
conn.close() | |
except Exception as e: | |
logger.error(f"Error returning connection: {e}") | |
def get_cached_result(self, key: str) -> Optional[str]: | |
"""Get cached result with enhanced error handling""" | |
conn, db_type = None, None | |
try: | |
conn, db_type = self.get_connection() | |
cursor = conn.cursor() | |
if db_type == "postgresql": | |
cursor.execute(''' | |
SELECT value FROM cache | |
WHERE key = %s AND expires_at > NOW() | |
''', (key,)) | |
else: | |
cursor.execute(''' | |
SELECT value FROM cache | |
WHERE key = ? AND expires_at > datetime('now') | |
''', (key,)) | |
result = cursor.fetchone() | |
return result[0] if result else None | |
except Exception as e: | |
logger.error(f"Cache retrieval error: {e}") | |
return None | |
finally: | |
if conn: | |
self.return_connection(conn, db_type) | |
def set_cached_result(self, key: str, value: str, ttl_minutes: int = 60): | |
"""Set cached result with enhanced error handling""" | |
conn, db_type = None, None | |
try: | |
conn, db_type = self.get_connection() | |
cursor = conn.cursor() | |
expires_at = datetime.datetime.now() + timedelta(minutes=ttl_minutes) | |
if db_type == "postgresql": | |
cursor.execute(''' | |
INSERT INTO cache (key, value, expires_at) | |
VALUES (%s, %s, %s) | |
ON CONFLICT (key) DO UPDATE SET | |
value = EXCLUDED.value, | |
expires_at = EXCLUDED.expires_at | |
''', (key, value, expires_at)) | |
else: | |
cursor.execute(''' | |
INSERT OR REPLACE INTO cache (key, value, expires_at) | |
VALUES (?, ?, ?) | |
''', (key, value, expires_at)) | |
conn.commit() | |
except Exception as e: | |
logger.error(f"Cache storage error: {e}") | |
finally: | |
if conn: | |
self.return_connection(conn, db_type) | |
def log_analytics(self, user_id: str, action: str, details: str = ""): | |
"""Log analytics data""" | |
conn, db_type = None, None | |
try: | |
conn, db_type = self.get_connection() | |
cursor = conn.cursor() | |
if db_type == "postgresql": | |
cursor.execute(''' | |
INSERT INTO analytics (user_id, action, details) | |
VALUES (%s, %s, %s) | |
''', (user_id, action, details)) | |
else: | |
cursor.execute(''' | |
INSERT INTO analytics (user_id, action, details) | |
VALUES (?, ?, ?) | |
''', (user_id, action, details)) | |
conn.commit() | |
except Exception as e: | |
logger.error(f"Analytics logging error: {e}") | |
finally: | |
if conn: | |
self.return_connection(conn, db_type) | |
# ===================== ENHANCED SECURITY MANAGER ===================== | |
class EnhancedSecurityManager: | |
def __init__(self): | |
self.blocked_patterns = [ | |
r"import\s+(os|sys|shutil|subprocess|socket|tempfile)", | |
r"__import__", r"eval\(", r"exec\(", r"open\(", r"file\(", | |
r"system\(", r"popen\(", r"rm\s+", r"del\s+", r"format\s*\(", | |
r"\.format\s*\(", r"f['\"].*\{.*\}.*['\"]", r"input\(", r"raw_input\(" | |
] | |
self.max_execution_time = CODE_EXECUTION_TIMEOUT | |
self.max_code_length = 10000 | |
self.rate_limits = {} | |
def check_rate_limit(self, user_id: str, action: str, limit: int = 10, window: int = 60) -> bool: | |
"""Enhanced rate limiting""" | |
now = time.time() | |
key = f"{user_id}:{action}" | |
if key not in self.rate_limits: | |
self.rate_limits[key] = [] | |
# Clean old entries | |
self.rate_limits[key] = [t for t in self.rate_limits[key] if now - t < window] | |
if len(self.rate_limits[key]) >= limit: | |
return False | |
self.rate_limits[key].append(now) | |
return True | |
def sanitize_input(self, text: str, max_length: int = 2000) -> str: | |
"""Enhanced input sanitization""" | |
if not text or len(text) > max_length: | |
return "" | |
# Remove potentially dangerous characters | |
sanitized = re.sub(r"[;\\<>/&|$`]", "", text) | |
# Check for blocked patterns | |
for pattern in self.blocked_patterns: | |
if re.search(pattern, sanitized, re.IGNORECASE): | |
sanitized = re.sub(pattern, "[BLOCKED]", sanitized, flags=re.IGNORECASE) | |
return sanitized[:max_length] | |
def safe_execute(self, code: str, user_id: str = "default") -> str: | |
"""Enhanced safe code execution""" | |
if not self.check_rate_limit(user_id, "code_execution", 5, 300): | |
return "🔒 Rate limit exceeded. Please wait before executing more code." | |
if len(code) > self.max_code_length: | |
return "🔒 Code too long for execution" | |
for pattern in self.blocked_patterns: | |
if re.search(pattern, code, re.IGNORECASE): | |
return "🔒 Security: Restricted operation detected" | |
try: | |
# Enhanced safe execution environment | |
safe_code = f""" | |
import sys | |
import time | |
import math | |
import random | |
import json | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
import matplotlib.pyplot as plt | |
# Timeout handler | |
import signal | |
def timeout_handler(signum, frame): | |
raise TimeoutError("Execution timeout") | |
signal.signal(signal.SIGALRM, timeout_handler) | |
signal.alarm({self.max_execution_time}) | |
# Capture output | |
import io | |
import contextlib | |
output_buffer = io.StringIO() | |
try: | |
with contextlib.redirect_stdout(output_buffer): | |
with contextlib.redirect_stderr(output_buffer): | |
{chr(10).join(' ' + line for line in code.split(chr(10)))} | |
except Exception as e: | |
print(f"Error: {{e}}") | |
finally: | |
signal.alarm(0) | |
print("\\n--- OUTPUT ---") | |
print(output_buffer.getvalue()) | |
""" | |
with tempfile.NamedTemporaryFile(suffix=".py", delete=False, mode="w") as f: | |
f.write(safe_code) | |
f.flush() | |
start_time = time.time() | |
result = subprocess.run( | |
["python", f.name], | |
capture_output=True, | |
text=True, | |
timeout=self.max_execution_time | |
) | |
exec_time = time.time() - start_time | |
# Clean up | |
os.unlink(f.name) | |
output = result.stdout.strip() or "Execution completed" | |
if result.stderr: | |
output += f"\nWarnings: {result.stderr.strip()}" | |
# Sanitize output | |
sanitized = re.sub( | |
r"\b(token|key|secret|password|api_key)\s*=\s*[\"\'].+?[\"\']", | |
"[REDACTED]", | |
output, | |
flags=re.IGNORECASE | |
) | |
return f"{sanitized[:2000]}\n⏱️ Execution time: {exec_time:.2f}s" | |
except subprocess.TimeoutExpired: | |
return "⏱️ Execution timed out" | |
except Exception as e: | |
return f"⚠️ Error: {str(e)}" | |
# ===================== ENHANCED RESEARCH ENGINE ===================== | |
class EnhancedResearchEngine: | |
def __init__(self, db_manager: EnhancedDatabaseManager): | |
self.db_manager = db_manager | |
self.executor = ThreadPoolExecutor(max_workers=5) | |
self.user_agents = [ | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', | |
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' | |
] | |
def search_multiple_sources(self, query: str, max_results: int = 5) -> Dict[str, List[Dict]]: | |
"""Enhanced multi-source search with better error handling""" | |
cache_key = f"search_{hashlib.md5(query.encode()).hexdigest()}_{max_results}" | |
cached = self.db_manager.get_cached_result(cache_key) | |
if cached: | |
try: | |
return json.loads(cached) | |
except: | |
pass | |
results = {} | |
# Submit concurrent searches with error handling | |
futures = {} | |
try: | |
futures['web'] = self.executor.submit(self._search_web_enhanced, query, max_results) | |
futures['wikipedia'] = self.executor.submit(self._search_wikipedia_enhanced, query) | |
futures['arxiv'] = self.executor.submit(self._search_arxiv_enhanced, query, max_results) | |
except Exception as e: | |
logger.error(f"Error submitting search tasks: {e}") | |
# Collect results with timeouts | |
for source, future in futures.items(): | |
try: | |
results[source] = future.result(timeout=15) | |
except Exception as e: | |
logger.error(f"Search error for {source}: {e}") | |
results[source] = [] | |
# Cache successful results | |
if any(results.values()): | |
self.db_manager.set_cached_result(cache_key, json.dumps(results), 60) | |
return results | |
def _search_web_enhanced(self, query: str, max_results: int = 5) -> List[Dict]: | |
"""Enhanced web search with multiple fallbacks""" | |
try: | |
# Try DuckDuckGo with retry logic | |
for attempt in range(3): | |
try: | |
time.sleep(random.uniform(1, 3)) # Random delay | |
with DDGS() as ddgs: | |
results = [] | |
for r in ddgs.text(query, max_results=max_results): | |
results.append({ | |
"title": r.get("title", "")[:150], | |
"url": r.get("href", ""), | |
"snippet": r.get("body", "")[:300], | |
"source": "DuckDuckGo" | |
}) | |
if results: | |
return results | |
except Exception as e: | |
logger.warning(f"DuckDuckGo attempt {attempt + 1} failed: {e}") | |
if attempt < 2: | |
time.sleep(random.uniform(2, 5)) | |
# Fallback to manual search | |
return self._fallback_web_search(query, max_results) | |
except Exception as e: | |
logger.error(f"Enhanced web search error: {e}") | |
return [] | |
def _fallback_web_search(self, query: str, max_results: int) -> List[Dict]: | |
"""Fallback web search method""" | |
try: | |
# Create synthetic results based on query analysis | |
results = [] | |
keywords = query.lower().split() | |
# Generate educational suggestions | |
if any(word in keywords for word in ['learn', 'how', 'what', 'explain']): | |
results.append({ | |
"title": f"Understanding {query}", | |
"url": "https://example.com/educational", | |
"snippet": f"Comprehensive guide to understanding {query}. Learn the fundamentals and key concepts.", | |
"source": "Educational" | |
}) | |
# Generate technical suggestions | |
if any(word in keywords for word in ['code', 'programming', 'algorithm', 'software']): | |
results.append({ | |
"title": f"Programming Guide: {query}", | |
"url": "https://example.com/programming", | |
"snippet": f"Technical documentation and examples for {query}. Best practices and implementation details.", | |
"source": "Technical" | |
}) | |
return results[:max_results] | |
except Exception as e: | |
logger.error(f"Fallback search error: {e}") | |
return [] | |
def _search_wikipedia_enhanced(self, query: str) -> List[Dict]: | |
"""Enhanced Wikipedia search""" | |
try: | |
headers = {'User-Agent': random.choice(self.user_agents)} | |
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{query.replace(' ', '_')}" | |
response = requests.get(url, headers=headers, timeout=10) | |
if response.status_code == 200: | |
data = response.json() | |
return [{ | |
"title": data.get("title", ""), | |
"url": data.get("content_urls", {}).get("desktop", {}).get("page", ""), | |
"snippet": data.get("extract", "")[:400], | |
"source": "Wikipedia" | |
}] | |
except Exception as e: | |
logger.error(f"Wikipedia search error: {e}") | |
return [] | |
def _search_arxiv_enhanced(self, query: str, max_results: int = 3) -> List[Dict]: | |
"""Enhanced arXiv search for academic papers""" | |
try: | |
url = f"http://export.arxiv.org/api/query?search_query=all:{query}&start=0&max_results={max_results}" | |
response = requests.get(url, timeout=10) | |
results = [] | |
if response.status_code == 200: | |
root = ET.fromstring(response.content) | |
for entry in root.findall('{http://www.w3.org/2005/Atom}entry'): | |
title = entry.find('{http://www.w3.org/2005/Atom}title') | |
summary = entry.find('{http://www.w3.org/2005/Atom}summary') | |
link = entry.find('{http://www.w3.org/2005/Atom}id') | |
if title is not None and summary is not None: | |
results.append({ | |
"title": title.text[:150], | |
"url": link.text if link is not None else "", | |
"snippet": summary.text[:300], | |
"source": "arXiv" | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"arXiv search error: {e}") | |
return [] | |
# ===================== ADVANCED ANALYTICS ENGINE ===================== | |
class AdvancedAnalyticsEngine: | |
def __init__(self): | |
self.datasets = {} | |
self.models = {} | |
self.visualizations = {} | |
def create_advanced_visualization(self, data: pd.DataFrame, viz_type: str, | |
title: str = "Data Visualization", | |
theme: str = "plotly_dark") -> go.Figure: | |
"""Create advanced visualizations with enhanced styling""" | |
try: | |
fig = None | |
# Set color palette | |
colors = px.colors.qualitative.Set3 | |
if viz_type.lower() == "line": | |
if len(data.columns) >= 2: | |
fig = px.line(data, x=data.columns[0], y=data.columns[1], | |
title=title, template=theme, color_discrete_sequence=colors) | |
elif viz_type.lower() == "bar": | |
if len(data.columns) >= 2: | |
fig = px.bar(data, x=data.columns[0], y=data.columns[1], | |
title=title, template=theme, color_discrete_sequence=colors) | |
elif viz_type.lower() == "scatter": | |
if len(data.columns) >= 2: | |
fig = px.scatter(data, x=data.columns[0], y=data.columns[1], | |
title=title, template=theme, color_discrete_sequence=colors) | |
if len(data.columns) >= 3: | |
fig.update_traces(marker_size=data.iloc[:, 2] * 10) | |
elif viz_type.lower() == "histogram": | |
fig = px.histogram(data, x=data.columns[0], title=title, | |
template=theme, color_discrete_sequence=colors) | |
elif viz_type.lower() == "pie": | |
if len(data.columns) >= 2: | |
fig = px.pie(data, names=data.columns[0], values=data.columns[1], | |
title=title, template=theme, color_discrete_sequence=colors) | |
elif viz_type.lower() == "heatmap": | |
numeric_data = data.select_dtypes(include=[np.number]) | |
if not numeric_data.empty: | |
corr_matrix = numeric_data.corr() | |
fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", | |
title=f"{title} - Correlation Matrix", template=theme) | |
elif viz_type.lower() == "box": | |
numeric_cols = data.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) > 0: | |
fig = px.box(data, y=numeric_cols[0], title=title, | |
template=theme, color_discrete_sequence=colors) | |
elif viz_type.lower() == "3d_scatter": | |
if len(data.columns) >= 3: | |
numeric_cols = data.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) >= 3: | |
fig = px.scatter_3d(data, x=numeric_cols[0], y=numeric_cols[1], | |
z=numeric_cols[2], title=title, template=theme) | |
else: | |
# Default to line chart | |
if len(data.columns) >= 2: | |
fig = px.line(data, x=data.columns[0], y=data.columns[1], | |
title=title, template=theme) | |
# Enhanced styling | |
if fig: | |
fig.update_layout( | |
font_size=14, | |
title_font_size=18, | |
margin=dict(l=40, r=40, t=60, b=40), | |
hovermode='closest', | |
showlegend=True, | |
autosize=True, | |
height=500, | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)' | |
) | |
# Add interactivity | |
fig.update_traces( | |
hovertemplate='<b>%{fullData.name}</b><br>' + | |
'X: %{x}<br>' + | |
'Y: %{y}<br>' + | |
'<extra></extra>' | |
) | |
return fig | |
except Exception as e: | |
logger.error(f"Visualization error: {e}") | |
# Return error visualization | |
fig = go.Figure() | |
fig.add_annotation( | |
text=f"Visualization Error: {str(e)}", | |
xref="paper", yref="paper", | |
x=0.5, y=0.5, showarrow=False, | |
font=dict(size=16, color="red") | |
) | |
fig.update_layout( | |
title="Visualization Error", | |
xaxis=dict(showgrid=False, showticklabels=False), | |
yaxis=dict(showgrid=False, showticklabels=False) | |
) | |
return fig | |
def generate_comprehensive_analysis(self, data: pd.DataFrame) -> str: | |
"""Generate comprehensive data analysis""" | |
try: | |
analysis = "# 📊 Comprehensive Data Analysis\n\n" | |
# Basic info | |
analysis += f"## 📋 Dataset Overview\n" | |
analysis += f"- **Shape**: {data.shape[0]:,} rows × {data.shape[1]} columns\n" | |
analysis += f"- **Memory Usage**: {data.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n\n" | |
# Column analysis | |
analysis += "## 📈 Column Analysis\n" | |
for col, dtype in data.dtypes.items(): | |
null_count = data[col].isnull().sum() | |
null_pct = (null_count / len(data)) * 100 | |
analysis += f"- **{col}**: {dtype} ({null_count:,} nulls, {null_pct:.1f}%)\n" | |
analysis += "\n" | |
# Numerical analysis | |
numeric_cols = data.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) > 0: | |
analysis += "## 🔢 Numerical Statistics\n" | |
desc = data[numeric_cols].describe() | |
for col in numeric_cols: | |
analysis += f"### {col}\n" | |
analysis += f"- Mean: {desc.loc['mean', col]:.2f}\n" | |
analysis += f"- Median: {desc.loc['50%', col]:.2f}\n" | |
analysis += f"- Std Dev: {desc.loc['std', col]:.2f}\n" | |
analysis += f"- Range: {desc.loc['min', col]:.2f} to {desc.loc['max', col]:.2f}\n\n" | |
# Categorical analysis | |
cat_cols = data.select_dtypes(include=['object']).columns | |
if len(cat_cols) > 0: | |
analysis += "## 📝 Categorical Analysis\n" | |
for col in cat_cols[:5]: # Limit to first 5 | |
unique_count = data[col].nunique() | |
most_common = data[col].value_counts().head(3) | |
analysis += f"### {col}\n" | |
analysis += f"- Unique values: {unique_count:,}\n" | |
analysis += f"- Most common:\n" | |
for val, count in most_common.items(): | |
analysis += f" - {val}: {count:,} ({count/len(data)*100:.1f}%)\n" | |
analysis += "\n" | |
# Correlation analysis | |
if len(numeric_cols) > 1: | |
corr_matrix = data[numeric_cols].corr() | |
analysis += "## 🔗 Correlation Insights\n" | |
# Find high correlations | |
high_corr_pairs = [] | |
for i in range(len(corr_matrix.columns)): | |
for j in range(i+1, len(corr_matrix.columns)): | |
corr_val = corr_matrix.iloc[i, j] | |
if abs(corr_val) > 0.7: | |
high_corr_pairs.append(( | |
corr_matrix.columns[i], | |
corr_matrix.columns[j], | |
corr_val | |
)) | |
if high_corr_pairs: | |
analysis += "**Strong correlations found:**\n" | |
for col1, col2, corr_val in high_corr_pairs: | |
analysis += f"- {col1} ↔ {col2}: {corr_val:.3f}\n" | |
else: | |
analysis += "No strong correlations (>0.7) detected.\n" | |
analysis += "\n" | |
# Data quality assessment | |
analysis += "## ✅ Data Quality Assessment\n" | |
total_nulls = data.isnull().sum().sum() | |
total_cells = len(data) * len(data.columns) | |
completeness = ((total_cells - total_nulls) / total_cells) * 100 | |
analysis += f"- **Completeness**: {completeness:.1f}%\n" | |
analysis += f"- **Total missing values**: {total_nulls:,}\n" | |
# Duplicate check | |
duplicates = data.duplicated().sum() | |
analysis += f"- **Duplicate rows**: {duplicates:,} ({duplicates/len(data)*100:.1f}%)\n" | |
return analysis | |
except Exception as e: | |
return f"❌ Error generating analysis: {str(e)}" | |
def generate_ai_insights(self, data: pd.DataFrame) -> str: | |
"""Generate AI-powered insights about the data""" | |
try: | |
insights = [] | |
# Data quality insights | |
null_percentage = (data.isnull().sum().sum() / (len(data) * len(data.columns))) * 100 | |
if null_percentage > 10: | |
insights.append(f"⚠️ **Data Quality Alert**: {null_percentage:.1f}% of your data contains missing values. Consider data cleaning strategies.") | |
elif null_percentage > 0: | |
insights.append(f"✅ **Good Data Quality**: Only {null_percentage:.1f}% missing values detected.") | |
else: | |
insights.append("✅ **Excellent Data Quality**: No missing values detected!") | |
# Pattern detection | |
numeric_cols = data.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) >= 2: | |
correlations = data[numeric_cols].corr() | |
high_corr = [] | |
for i in range(len(correlations.columns)): | |
for j in range(i+1, len(correlations.columns)): | |
corr_val = correlations.iloc[i, j] | |
if abs(corr_val) > 0.8: | |
high_corr.append((correlations.columns[i], correlations.columns[j], corr_val)) | |
if high_corr: | |
insights.append("🔗 **Strong Correlations Detected**:") | |
for col1, col2, corr in high_corr[:3]: | |
direction = "positive" if corr > 0 else "negative" | |
insights.append(f" - {col1} and {col2} show strong {direction} correlation ({corr:.3f})") | |
# Anomaly detection insights | |
if len(numeric_cols) > 0: | |
outlier_counts = {} | |
for col in numeric_cols[:3]: # Check first 3 numeric columns | |
Q1 = data[col].quantile(0.25) | |
Q3 = data[col].quantile(0.75) | |
IQR = Q3 - Q1 | |
outliers = data[(data[col] < (Q1 - 1.5 * IQR)) | (data[col] > (Q3 + 1.5 * IQR))] | |
if len(outliers) > 0: | |
outlier_counts[col] = len(outliers) | |
if outlier_counts: | |
insights.append("📊 **Outlier Detection**:") | |
for col, count in outlier_counts.items(): | |
percentage = (count / len(data)) * 100 | |
insights.append(f" - {col}: {count} outliers ({percentage:.1f}% of data)") | |
# Trend insights for time series | |
date_cols = data.select_dtypes(include=['datetime64', 'object']).columns | |
time_col = None | |
for col in date_cols: | |
try: | |
pd.to_datetime(data[col].head()) | |
time_col = col | |
break | |
except: | |
continue | |
if time_col and len(numeric_cols) > 0: | |
insights.append(f"📈 **Time Series Potential**: Detected time column '{time_col}' - consider time series analysis") | |
# Distribution insights | |
if len(numeric_cols) > 0: | |
skewed_cols = [] | |
for col in numeric_cols[:3]: | |
skewness = data[col].skew() | |
if abs(skewness) > 1: | |
direction = "right" if skewness > 0 else "left" | |
skewed_cols.append(f"{col} ({direction}-skewed)") | |
if skewed_cols: | |
insights.append(f"📊 **Distribution Analysis**: Skewed distributions detected in: {', '.join(skewed_cols)}") | |
# Recommendations | |
insights.append("\n### 💡 **Recommendations**:") | |
if len(data) < 100: | |
insights.append("- Consider collecting more data for robust analysis") | |
elif len(data) > 10000: | |
insights.append("- Large dataset detected - consider sampling for initial exploration") | |
if len(numeric_cols) >= 3: | |
insights.append("- Rich numerical data available - try dimensionality reduction (PCA)") | |
categorical_cols = data.select_dtypes(include=['object']).columns | |
if len(categorical_cols) > 0: | |
insights.append(f"- {len(categorical_cols)} categorical variables detected - consider encoding for ML") | |
insights.append("- Use the visualization tools above to explore patterns visually") | |
insights.append("- Try the ML model feature if you have a target variable in mind") | |
return "\n".join(insights) | |
except Exception as e: | |
return f"❌ Error generating insights: {str(e)}" | |
def create_ml_model(self, data: pd.DataFrame, target_col: str, model_type: str = "regression") -> Dict: | |
"""Create and train machine learning models""" | |
try: | |
if target_col not in data.columns: | |
return {"error": "Target column not found"} | |
# Prepare data | |
numeric_data = data.select_dtypes(include=[np.number]) | |
if target_col not in numeric_data.columns: | |
return {"error": "Target must be numeric"} | |
X = numeric_data.drop(columns=[target_col]) | |
y = numeric_data[target_col] | |
if X.empty: | |
return {"error": "No numeric features available"} | |
# Split data | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
# Train model | |
if model_type.lower() == "regression": | |
model = RandomForestRegressor(n_estimators=100, random_state=42) | |
model.fit(X_train, y_train) | |
# Predictions | |
y_pred = model.predict(X_test) | |
# Metrics | |
mse = mean_squared_error(y_test, y_pred) | |
r2 = r2_score(y_test, y_pred) | |
return { | |
"model_type": "Random Forest Regression", | |
"features": list(X.columns), | |
"target": target_col, | |
"metrics": { | |
"mse": mse, | |
"rmse": np.sqrt(mse), | |
"r2_score": r2 | |
}, | |
"feature_importance": dict(zip(X.columns, model.feature_importances_)), | |
"predictions": y_pred[:10].tolist(), | |
"actual": y_test[:10].tolist() | |
} | |
except Exception as e: | |
return {"error": f"Model training error: {str(e)}"} | |
# ===================== ENHANCED AUTONOMOUS AGENT ===================== | |
class EnhancedAutonomousAgent: | |
def __init__(self, user_id: str = "default"): | |
self.user_id = user_id | |
self.db_manager = EnhancedDatabaseManager() | |
self.security = EnhancedSecurityManager() | |
self.research_engine = EnhancedResearchEngine(self.db_manager) | |
self.analytics = AdvancedAnalyticsEngine() | |
self.session_id = str(uuid.uuid4()) | |
self.conversation_history = [] | |
self.context_memory = {} | |
# Initialize user session | |
self._init_user_session() | |
def _init_user_session(self): | |
"""Initialize user session with enhanced preferences""" | |
try: | |
default_preferences = { | |
"persona": "Assistant", | |
"theme": "dark", | |
"language": "en-US", | |
"response_style": "detailed", | |
"auto_research": True, | |
"code_execution": True, | |
"visualization_theme": "plotly_dark" | |
} | |
# Log session start | |
self.db_manager.log_analytics(self.user_id, "session_start", self.session_id) | |
except Exception as e: | |
logger.error(f"Session initialization error: {e}") | |
def teach_enhanced_concept(self, topic: str) -> str: | |
"""Enhanced concept teaching method""" | |
try: | |
# Use the existing execute_enhanced_goal method for teaching | |
response, _ = self.execute_enhanced_goal(f"Please teach me about: {topic}") | |
return response | |
except Exception as e: | |
return f"❌ Teaching error: {str(e)}" | |
def execute_enhanced_goal(self, goal: str, context: Dict = None) -> Tuple[str, Dict]: | |
"""Enhanced goal execution with comprehensive capabilities""" | |
goal = self.security.sanitize_input(goal, 3000) | |
if not goal: | |
return "❌ Please provide a valid goal", {} | |
# Rate limiting check | |
if not self.security.check_rate_limit(self.user_id, "goal_execution", 20, 300): | |
return "🔒 Rate limit exceeded. Please wait before submitting more requests.", {} | |
try: | |
# Log the request | |
self.db_manager.log_analytics(self.user_id, "goal_execution", goal[:100]) | |
# Add to conversation history | |
self.conversation_history.append({ | |
"timestamp": datetime.datetime.now().isoformat(), | |
"user_input": goal, | |
"type": "goal", | |
"session_id": self.session_id | |
}) | |
# Analyze goal type and intent | |
goal_analysis = self._analyze_goal(goal) | |
# Execute based on goal type | |
response_parts = [] | |
metadata = {"session_id": self.session_id, "goal_type": goal_analysis["type"]} | |
# Research phase (if applicable) | |
if goal_analysis["needs_research"]: | |
research_results = self.research_engine.search_multiple_sources(goal, 8) | |
metadata["research_sources"] = len([r for r in research_results.values() if r]) | |
if research_results and any(research_results.values()): | |
response_parts.append("## 🔍 Research Results\n") | |
for source, results in research_results.items(): | |
if results: | |
response_parts.append(f"### {source.title()} ({len(results)} results)") | |
for i, result in enumerate(results[:3], 1): | |
response_parts.append(f"{i}. **{result.get('title', 'N/A')}**") | |
if 'snippet' in result: | |
response_parts.append(f" {result['snippet']}") | |
if 'url' in result and result['url']: | |
response_parts.append(f" 🔗 [Read more]({result['url']})") | |
response_parts.append("") | |
# Code generation and execution | |
if goal_analysis["needs_code"]: | |
code_solution = self._generate_enhanced_code_solution(goal, goal_analysis) | |
if code_solution: | |
response_parts.append("## 💻 Code Solution\n") | |
response_parts.append(f"```python\n{code_solution}\n```\n") | |
# Execute code safely | |
execution_result = self.security.safe_execute(code_solution, self.user_id) | |
response_parts.append("## 📊 Execution Result\n") | |
response_parts.append(f"```\n{execution_result}\n```\n") | |
# Educational content | |
if goal_analysis["is_educational"]: | |
educational_content = self._generate_educational_content(goal) | |
response_parts.extend(educational_content) | |
# Problem solving approach | |
if goal_analysis["is_problem_solving"]: | |
problem_solution = self._generate_problem_solution(goal) | |
response_parts.extend(problem_solution) | |
# Generate enhanced suggestions | |
suggestions = self._generate_enhanced_suggestions(goal, goal_analysis) | |
if suggestions: | |
response_parts.append("## 💡 Next Steps & Recommendations\n") | |
for i, suggestion in enumerate(suggestions, 1): | |
response_parts.append(f"{i}. {suggestion}") | |
response_parts.append("") | |
# Compile final response | |
if not response_parts: | |
response_parts = [self._generate_fallback_response(goal)] | |
final_response = "\n".join(response_parts) | |
# Update conversation history | |
self.conversation_history[-1]["system_response"] = final_response | |
self.conversation_history[-1]["metadata"] = metadata | |
# Update context memory | |
self._update_context_memory(goal, final_response, goal_analysis) | |
# Enhanced metadata | |
metadata.update({ | |
"response_length": len(final_response), | |
"suggestions_count": len(suggestions), | |
"conversation_turn": len(self.conversation_history), | |
"processing_time": time.time() | |
}) | |
return final_response, metadata | |
except Exception as e: | |
error_msg = f"⚠️ System error: {str(e)}" | |
logger.error(f"Goal execution error: {e}") | |
return error_msg, {"error": str(e), "session_id": self.session_id} | |
def _analyze_goal(self, goal: str) -> Dict: | |
"""Analyze goal to determine appropriate response strategy""" | |
goal_lower = goal.lower() | |
analysis = { | |
"type": "general", | |
"needs_research": False, | |
"needs_code": False, | |
"is_educational": False, | |
"is_problem_solving": False, | |
"complexity": "medium", | |
"keywords": goal_lower.split() | |
} | |
# Research indicators | |
research_keywords = ['research', 'find', 'search', 'what is', 'tell me about', 'information', 'latest'] | |
if any(keyword in goal_lower for keyword in research_keywords): | |
analysis["needs_research"] = True | |
analysis["type"] = "research" | |
# Code indicators | |
code_keywords = ['code', 'program', 'script', 'function', 'algorithm', 'implement', 'develop', 'build app'] | |
if any(keyword in goal_lower for keyword in code_keywords): | |
analysis["needs_code"] = True | |
analysis["type"] = "coding" | |
# Educational indicators | |
edu_keywords = ['learn', 'explain', 'how does', 'tutorial', 'guide', 'teach', 'understand'] | |
if any(keyword in goal_lower for keyword in edu_keywords): | |
analysis["is_educational"] = True | |
analysis["type"] = "educational" | |
# Problem solving indicators | |
problem_keywords = ['solve', 'help', 'fix', 'debug', 'error', 'problem', 'issue', 'troubleshoot'] | |
if any(keyword in goal_lower for keyword in problem_keywords): | |
analysis["is_problem_solving"] = True | |
analysis["type"] = "problem_solving" | |
# Complexity assessment | |
if len(goal.split()) > 20 or any(word in goal_lower for word in ['complex', 'advanced', 'comprehensive']): | |
analysis["complexity"] = "high" | |
elif len(goal.split()) < 5: | |
analysis["complexity"] = "low" | |
return analysis | |
def _generate_enhanced_code_solution(self, goal: str, analysis: Dict) -> str: | |
"""Generate enhanced code solutions based on goal analysis""" | |
goal_lower = goal.lower() | |
# Data science and analysis | |
if any(keyword in goal_lower for keyword in ['data', 'analyze', 'visualize', 'chart', 'graph']): | |
return """ | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from datetime import datetime, timedelta | |
# Generate sample data | |
np.random.seed(42) | |
dates = pd.date_range('2023-01-01', periods=100, freq='D') | |
data = pd.DataFrame({ | |
'date': dates, | |
'sales': np.random.normal(1000, 200, 100), | |
'customers': np.random.poisson(50, 100), | |
'revenue': np.random.normal(5000, 1000, 100) | |
}) | |
# Basic analysis | |
print("Dataset Info:") | |
print(f"Shape: {data.shape}") | |
print(f"Date range: {data['date'].min()} to {data['date'].max()}") | |
print() | |
print("Statistical Summary:") | |
print(data.describe()) | |
print() | |
# Correlation analysis | |
numeric_cols = data.select_dtypes(include=[np.number]) | |
correlations = numeric_cols.corr() | |
print("Correlations:") | |
print(correlations) | |
# Create visualization | |
plt.figure(figsize=(12, 8)) | |
plt.subplot(2, 2, 1) | |
plt.plot(data['date'], data['sales']) | |
plt.title('Sales Over Time') | |
plt.xticks(rotation=45) | |
plt.subplot(2, 2, 2) | |
plt.scatter(data['customers'], data['sales']) | |
plt.xlabel('Customers') | |
plt.ylabel('Sales') | |
plt.title('Sales vs Customers') | |
plt.subplot(2, 2, 3) | |
plt.hist(data['revenue'], bins=20, alpha=0.7) | |
plt.title('Revenue Distribution') | |
plt.subplot(2, 2, 4) | |
sns.heatmap(correlations, annot=True, cmap='coolwarm', center=0) | |
plt.title('Correlation Matrix') | |
plt.tight_layout() | |
plt.show() | |
print("Data analysis complete!")""" | |
# Machine learning | |
elif any(keyword in goal_lower for keyword in ['machine learning', 'ml', 'predict', 'model', 'classification', 'regression']): | |
return """ | |
import pandas as pd | |
import numpy as np | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor | |
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score | |
from sklearn.preprocessing import LabelEncoder | |
# Generate sample dataset | |
np.random.seed(42) | |
n_samples = 1000 | |
# Create features | |
age = np.random.randint(18, 80, n_samples) | |
income = np.random.normal(50000, 20000, n_samples) | |
education_years = np.random.randint(10, 20, n_samples) | |
credit_score = np.random.randint(300, 850, n_samples) | |
# Create target (loan approval - classification example) | |
approval_prob = ( | |
(credit_score - 300) / 550 * 0.4 + | |
(income - 10000) / 90000 * 0.3 + | |
(education_years - 10) / 10 * 0.2 + | |
np.random.random(n_samples) * 0.1 | |
) | |
loan_approved = (approval_prob > 0.5).astype(int) | |
# Create DataFrame | |
data = pd.DataFrame({ | |
'age': age, | |
'income': income, | |
'education_years': education_years, | |
'credit_score': credit_score, | |
'loan_approved': loan_approved | |
}) | |
print("Dataset created:") | |
print(data.head()) | |
print(f"\\nDataset shape: {data.shape}") | |
print(f"Loan approval rate: {data['loan_approved'].mean():.2%}") | |
# Prepare features and target | |
X = data[['age', 'income', 'education_years', 'credit_score']] | |
y = data['loan_approved'] | |
# Split the data | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
# Train classification model | |
clf = RandomForestClassifier(n_estimators=100, random_state=42) | |
clf.fit(X_train, y_train) | |
# Make predictions | |
y_pred = clf.predict(X_test) | |
accuracy = accuracy_score(y_test, y_pred) | |
print(f"\\nModel Performance:") | |
print(f"Accuracy: {accuracy:.3f}") | |
print(f"\\nFeature Importance:") | |
for feature, importance in zip(X.columns, clf.feature_importances_): | |
print(f"{feature}: {importance:.3f}") | |
# Example prediction | |
new_applicant = [[35, 65000, 16, 720]] # age, income, education, credit_score | |
prediction = clf.predict(new_applicant)[0] | |
probability = clf.predict_proba(new_applicant)[0] | |
print(f"\\nExample Prediction:") | |
print(f"New applicant: Age=35, Income=$65k, Education=16yrs, Credit=720") | |
print(f"Loan approval prediction: {'Approved' if prediction else 'Denied'}") | |
print(f"Approval probability: {probability[1]:.3f}")""" | |
elif 'fibonacci' in goal_lower: | |
return """ | |
def fibonacci(n): | |
\"\"\"Calculate Fibonacci sequence up to n\"\"\" | |
if n <= 1: | |
return n | |
else: | |
return fibonacci(n-1) + fibonacci(n-2) | |
def fibonacci_iterative(n): | |
\"\"\"Iterative Fibonacci implementation - O(n) time, O(1) space\"\"\" | |
if n <= 1: | |
return n | |
a, b = 0, 1 | |
for _ in range(2, n + 1): | |
a, b = b, a + b | |
return b | |
def fibonacci_recursive(n, memo={}): | |
\"\"\"Recursive Fibonacci with memoization - O(n) time and space\"\"\" | |
if n in memo: | |
return memo[n] | |
if n <= 1: | |
return n | |
memo[n] = fibonacci_recursive(n-1, memo) + fibonacci_recursive(n-2, memo) | |
return memo[n] | |
def fibonacci_sequence(count): | |
\"\"\"Generate Fibonacci sequence\"\"\" | |
sequence = [] | |
for i in range(count): | |
sequence.append(fibonacci_iterative(i)) | |
return sequence | |
# Test the implementations | |
print("Fibonacci Implementations:") | |
print("=" * 40) | |
# Test individual numbers | |
test_numbers = [0, 1, 5, 10, 15, 20] | |
for n in test_numbers: | |
iterative = fibonacci_iterative(n) | |
recursive = fibonacci_recursive(n) | |
print(f"F({n}): Iterative={iterative}, Recursive={recursive}") | |
print() | |
# Generate sequence | |
sequence_length = 15 | |
sequence = fibonacci_sequence(sequence_length) | |
print(f"First {sequence_length} Fibonacci numbers:") | |
print(sequence) | |
# Performance comparison | |
import time | |
n = 30 | |
print(f"\\nPerformance comparison for F({n}):") | |
start_time = time.time() | |
result_iterative = fibonacci_iterative(n) | |
iterative_time = time.time() - start_time | |
start_time = time.time() | |
result_recursive = fibonacci_recursive(n) | |
recursive_time = time.time() - start_time | |
print(f"Iterative: {result_iterative} (Time: {iterative_time:.6f}s)") | |
print(f"Recursive: {result_recursive} (Time: {recursive_time:.6f}s)")""" | |
elif 'prime' in goal_lower: | |
return """ | |
def is_prime(n): | |
\"\"\"Check if a number is prime - optimized version\"\"\" | |
if n < 2: | |
return False | |
if n == 2: | |
return True | |
if n % 2 == 0: | |
return False | |
# Check odd divisors up to sqrt(n) | |
for i in range(3, int(n**0.5) + 1, 2): | |
if n % i == 0: | |
return False | |
return True | |
def sieve_of_eratosthenes(limit): | |
\"\"\"Find all primes up to limit using Sieve of Eratosthenes\"\"\" | |
if limit < 2: | |
return [] | |
# Initialize boolean array | |
is_prime_arr = [True] * (limit + 1) | |
is_prime_arr[0] = is_prime_arr[1] = False | |
for i in range(2, int(limit**0.5) + 1): | |
if is_prime_arr[i]: | |
# Mark multiples of i as not prime | |
for j in range(i*i, limit + 1, i): | |
is_prime_arr[j] = False | |
# Return list of primes | |
return [i for i in range(2, limit + 1) if is_prime_arr[i]] | |
def prime_factorization(n): | |
\"\"\"Find prime factorization of a number\"\"\" | |
factors = [] | |
d = 2 | |
while d * d <= n: | |
while n % d == 0: | |
factors.append(d) | |
n //= d | |
d += 1 | |
if n > 1: | |
factors.append(n) | |
return factors | |
def nth_prime(n): | |
\"\"\"Find the nth prime number\"\"\" | |
if n < 1: | |
return None | |
primes = [] | |
candidate = 2 | |
while len(primes) < n: | |
if is_prime(candidate): | |
primes.append(candidate) | |
candidate += 1 | |
return primes[-1] | |
# Demonstrate prime number functions | |
print("Prime Number Operations:") | |
print("=" * 30) | |
# Test individual numbers | |
test_numbers = [2, 7, 15, 17, 25, 29, 97, 100] | |
print("Prime check:") | |
for num in test_numbers: | |
result = is_prime(num) | |
print(f"{num}: {'Prime' if result else 'Not prime'}") | |
print() | |
# Find primes up to 50 | |
limit = 50 | |
primes_up_to_50 = sieve_of_eratosthenes(limit) | |
print(f"Primes up to {limit}: {primes_up_to_50}") | |
print(f"Count: {len(primes_up_to_50)}") | |
print() | |
# Prime factorization examples | |
factorization_examples = [12, 24, 60, 97, 100] | |
print("Prime factorization:") | |
for num in factorization_examples: | |
factors = prime_factorization(num) | |
print(f"{num} = {' × '.join(map(str, factors))}") | |
print() | |
# Find specific prime numbers | |
nth_examples = [1, 5, 10, 20, 25] | |
print("Nth prime numbers:") | |
for n in nth_examples: | |
prime = nth_prime(n) | |
print(f"{n}th prime: {prime}")""" | |
elif any(keyword in goal_lower for keyword in ['algorithm', 'sort', 'search', 'fibonacci', 'prime']): | |
return """ | |
def bubble_sort(arr): | |
\"\"\"Bubble Sort - O(n²) time complexity\"\"\" | |
n = len(arr) | |
arr = arr.copy() # Don't modify original | |
for i in range(n): | |
swapped = False | |
for j in range(0, n - i - 1): | |
if arr[j] > arr[j + 1]: | |
arr[j], arr[j + 1] = arr[j + 1], arr[j] | |
swapped = True | |
# If no swapping occurred, array is sorted | |
if not swapped: | |
break | |
return arr | |
def quick_sort(arr): | |
\"\"\"Quick Sort - O(n log n) average time complexity\"\"\" | |
if len(arr) <= 1: | |
return arr | |
pivot = arr[len(arr) // 2] | |
left = [x for x in arr if x < pivot] | |
middle = [x for x in arr if x == pivot] | |
right = [x for x in arr if x > pivot] | |
return quick_sort(left) + middle + quick_sort(right) | |
def merge_sort(arr): | |
\"\"\"Merge Sort - O(n log n) time complexity\"\"\" | |
if len(arr) <= 1: | |
return arr | |
mid = len(arr) // 2 | |
left = merge_sort(arr[:mid]) | |
right = merge_sort(arr[mid:]) | |
return merge(left, right) | |
def merge(left, right): | |
\"\"\"Helper function for merge sort\"\"\" | |
result = [] | |
i, j = 0, 0 | |
while i < len(left) and j < len(right): | |
if left[i] <= right[j]: | |
result.append(left[i]) | |
i += 1 | |
else: | |
result.append(right[j]) | |
j += 1 | |
result.extend(left[i:]) | |
result.extend(right[j:]) | |
return result | |
def binary_search(arr, target): | |
\"\"\"Binary Search - O(log n) time complexity\"\"\" | |
left, right = 0, len(arr) - 1 | |
while left <= right: | |
mid = (left + right) // 2 | |
if arr[mid] == target: | |
return mid | |
elif arr[mid] < target: | |
left = mid + 1 | |
else: | |
right = mid - 1 | |
return -1 # Not found | |
# Demonstrate sorting algorithms | |
import random | |
import time | |
print("Sorting Algorithm Comparison:") | |
print("=" * 40) | |
# Generate test data | |
test_sizes = [100, 1000] | |
for size in test_sizes: | |
print(f"\\nTesting with {size} elements:") | |
original_data = [random.randint(1, 1000) for _ in range(size)] | |
# Test bubble sort (only for smaller arrays) | |
if size <= 100: | |
start_time = time.time() | |
bubble_result = bubble_sort(original_data) | |
bubble_time = time.time() - start_time | |
print(f"Bubble Sort: {bubble_time:.6f} seconds") | |
# Test quick sort | |
start_time = time.time() | |
quick_result = quick_sort(original_data) | |
quick_time = time.time() - start_time | |
print(f"Quick Sort: {quick_time:.6f} seconds") | |
# Test merge sort | |
start_time = time.time() | |
merge_result = merge_sort(original_data) | |
merge_time = time.time() - start_time | |
print(f"Merge Sort: {merge_time:.6f} seconds") | |
# Test Python's built-in sort | |
start_time = time.time() | |
python_result = sorted(original_data) | |
python_time = time.time() - start_time | |
print(f"Python sorted(): {python_time:.6f} seconds") | |
# Demonstrate binary search | |
print("\\nBinary Search Example:") | |
sorted_array = list(range(0, 100, 2)) # [0, 2, 4, 6, ..., 98] | |
targets = [10, 25, 50, 99, 101] | |
for target in targets: | |
index = binary_search(sorted_array, target) | |
if index != -1: | |
print(f"Found {target} at index {index}") | |
else: | |
print(f"{target} not found in array")""" | |
# Default programming solution | |
else: | |
return f""" | |
# Solution for: {goal[:60]} | |
print("Task: {goal[:60]}") | |
print("=" * 50) | |
# Template solution - customize as needed | |
def solve_problem(): | |
\"\"\" | |
Main solution function | |
Modify this based on your specific requirements | |
\"\"\" | |
result = "Processing your request..." | |
# Add your implementation here | |
# This is a template that you can customize | |
return result | |
def demonstrate_capabilities(): | |
\"\"\"Demonstrate various programming capabilities\"\"\" | |
# Data structures | |
data_examples = {{ | |
'list': [1, 2, 3, 4, 5], | |
'dict': {{'key1': 'value1', 'key2': 'value2'}}, | |
'set': {{1, 2, 3, 4, 5}}, | |
'tuple': (1, 2, 3, 4, 5) | |
}} | |
print("Data Structures:") | |
for data_type, example in data_examples.items(): | |
print(f"{data_type}: {example}") | |
print() | |
# Control structures | |
print("Control Structures:") | |
# Loop example | |
print("For loop example:") | |
for i in range(5): | |
print(f" Iteration {i}") | |
# Conditional example | |
print("Conditional example:") | |
for num in [1, 2, 3, 4, 5]: | |
if num % 2 == 0: | |
print(f" {num} is even") | |
else: | |
print(f" {num} is odd") | |
print() | |
# Function examples | |
print("Function Examples:") | |
def calculate_factorial(n): | |
return 1 if n <= 1 else n * calculate_factorial(n - 1) | |
def fibonacci(n): | |
return n if n <= 1 else fibonacci(n-1) + fibonacci(n-2) | |
print(f"Factorial of 5: {calculate_factorial(5)}") | |
print(f"5th Fibonacci number: {fibonacci(5)}") | |
# Execute the solution | |
print("Executing solution...") | |
result = solve_problem() | |
print(f"Result: {result}") | |
print() | |
demonstrate_capabilities() | |
print("\\nSolution complete!")""" | |
def _generate_educational_content(self, goal: str) -> List[str]: | |
"""Generate structured educational content""" | |
content = [] | |
content.extend([ | |
"## 📚 Learning Guide\n", | |
"### 🎯 Learning Objectives", | |
"After completing this guide, you will:", | |
"- Understand the fundamental concepts", | |
"- Know how to apply this knowledge practically", | |
"- Be able to explain the topic to others", | |
"- Identify related concepts and connections\n", | |
"### 📖 Key Concepts", | |
"This section covers the essential information you need to know:\n", | |
"### 🔬 Practical Applications", | |
"Here's how this knowledge applies in real-world scenarios:\n", | |
"### 🧪 Practice Exercises", | |
"Try these activities to reinforce your learning:", | |
"1. Research additional examples online", | |
"2. Create your own examples or use cases", | |
"3. Explain the concept to someone else", | |
"4. Find connections to other topics you know\n", | |
"### 📚 Further Reading", | |
"Explore these resources to deepen your understanding:", | |
"- Look for academic papers or textbooks on the topic", | |
"- Find online courses or tutorials", | |
"- Join relevant communities or forums", | |
"- Practice with hands-on projects\n" | |
]) | |
return content | |
def _generate_problem_solution(self, goal: str) -> List[str]: | |
"""Generate structured problem-solving approach""" | |
content = [] | |
content.extend([ | |
"## 🔧 Problem-Solving Approach\n", | |
"### 1. 🎯 Problem Analysis", | |
"Let's break down the problem systematically:", | |
"- **What** exactly needs to be solved?", | |
"- **Why** is this problem occurring?", | |
"- **When** does this problem happen?", | |
"- **Where** is the problem manifesting?", | |
"- **Who** is affected by this problem?\n", | |
"### 2. 🔍 Root Cause Investigation", | |
"Potential underlying causes to investigate:", | |
"- Technical factors", | |
"- Process-related issues", | |
"- Environmental conditions", | |
"- User behavior patterns\n", | |
"### 3. 💡 Solution Strategies", | |
"Recommended approaches to try:", | |
"- **Immediate fixes**: Quick solutions to address symptoms", | |
"- **Short-term solutions**: Temporary measures while investigating", | |
"- **Long-term solutions**: Permanent fixes addressing root causes", | |
"- **Preventive measures**: Steps to avoid future occurrences\n", | |
"### 4. ✅ Implementation Plan", | |
"Steps to implement the solution:", | |
"1. Gather necessary resources and information", | |
"2. Test the solution in a safe environment", | |
"3. Implement gradually with monitoring", | |
"4. Validate the results and measure success", | |
"5. Document the solution for future reference\n", | |
"### 5. 🔄 Follow-up Actions", | |
"After implementing the solution:", | |
"- Monitor for any side effects or new issues", | |
"- Gather feedback from affected users", | |
"- Document lessons learned", | |
"- Update procedures or guidelines as needed\n" | |
]) | |
return content | |
def _generate_enhanced_suggestions(self, goal: str, analysis: Dict) -> List[str]: | |
"""Generate enhanced, contextual suggestions""" | |
suggestions = [] | |
goal_lower = goal.lower() | |
# Type-specific suggestions | |
if analysis["type"] == "research": | |
suggestions.extend([ | |
"🔍 Cross-reference findings with multiple reliable sources", | |
"📊 Create a summary document with key findings", | |
"🔗 Save important sources for future reference", | |
"🤝 Share findings with colleagues or study groups" | |
]) | |
elif analysis["type"] == "coding": | |
suggestions.extend([ | |
"🧪 Test the code with different input scenarios", | |
"📝 Add comprehensive comments and documentation", | |
"🔧 Consider error handling and edge cases", | |
"⚡ Optimize for performance if needed", | |
"🔄 Version control your code changes" | |
]) | |
elif analysis["type"] == "educational": | |
suggestions.extend([ | |
"📖 Create study notes or mind maps", | |
"🎯 Set up a learning schedule with milestones", | |
"👥 Find study partners or learning communities", | |
"🔬 Apply knowledge through practical projects", | |
"📚 Explore advanced topics in the same field" | |
]) | |
elif analysis["type"] == "problem_solving": | |
suggestions.extend([ | |
"🔍 Document the problem-solving process", | |
"📋 Create a checklist for similar future issues", | |
"🤝 Consult with experts or experienced colleagues", | |
"🔄 Implement monitoring to prevent recurrence", | |
"📚 Research best practices in the problem domain" | |
]) | |
# Complexity-based suggestions | |
if analysis["complexity"] == "high": | |
suggestions.extend([ | |
"🎯 Break down into smaller, manageable sub-tasks", | |
"📅 Create a realistic timeline with milestones", | |
"🤝 Consider collaborating with others", | |
"📊 Use project management tools to track progress" | |
]) | |
# General enhancement suggestions | |
suggestions.extend([ | |
"💡 Explore alternative approaches or methodologies", | |
"📈 Set measurable goals to track progress", | |
"🔄 Schedule regular reviews and improvements", | |
"📚 Build on this foundation for more advanced topics" | |
]) | |
# Remove duplicates and limit | |
unique_suggestions = [] | |
for suggestion in suggestions: | |
if suggestion not in unique_suggestions: | |
unique_suggestions.append(suggestion) | |
return unique_suggestions[:8] # Limit to 8 suggestions | |
def _generate_fallback_response(self, goal: str) -> str: | |
"""Generate a helpful fallback response when specific handlers don't apply""" | |
return f"""## 🤖 AI Assistant Response | |
Thank you for your question: "{goal}" | |
I understand you're looking for assistance with this topic. While I may not have specific pre-programmed responses for every query, I can help you approach this systematically: | |
### 🔍 Analysis Approach | |
1. **Research**: I can help you find relevant information from multiple sources | |
2. **Problem-solving**: We can break down complex issues into manageable parts | |
3. **Learning**: I can provide educational content and explanations | |
4. **Implementation**: If coding or technical work is needed, I can provide examples | |
### 💡 How I Can Help Further | |
- Ask me to research specific aspects of your topic | |
- Request code examples or implementations | |
- Ask for explanations of concepts you're unsure about | |
- Request step-by-step guides or tutorials | |
### 🎯 Making Your Request More Specific | |
To provide the most helpful response, you could: | |
- Specify what type of help you need (research, coding, explanation, etc.) | |
- Provide more context about your goals or constraints | |
- Break down complex requests into smaller parts | |
- Ask follow-up questions about specific aspects | |
Feel free to rephrase your request or ask more specific questions, and I'll do my best to provide detailed, helpful responses!""" | |
def _update_context_memory(self, goal: str, response: str, analysis: Dict): | |
"""Update context memory for better future responses""" | |
try: | |
# Store conversation context | |
context_key = f"context_{len(self.conversation_history)}" | |
self.context_memory[context_key] = { | |
"goal": goal, | |
"response_summary": response[:200] + "..." if len(response) > 200 else response, | |
"goal_type": analysis["type"], | |
"timestamp": datetime.datetime.now().isoformat(), | |
"keywords": analysis["keywords"] | |
} | |
# Keep only recent context (last 10 interactions) | |
if len(self.context_memory) > 10: | |
oldest_key = min(self.context_memory.keys()) | |
del self.context_memory[oldest_key] | |
except Exception as e: | |
logger.error(f"Context memory update error: {e}") | |
# ===================== STREAMLIT INTERFACE ===================== | |
def main(): | |
st.set_page_config( | |
page_title="🤖 Enhanced AI System Pro", | |
page_icon="🤖", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Enhanced mobile-optimized CSS with better styling | |
st.markdown(""" | |
<style> | |
/* Import Google Fonts */ | |
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); | |
/* Global Styles */ | |
* { | |
font-family: 'Inter', sans-serif; | |
} | |
.main .block-container { | |
padding: 1rem; | |
max-width: 100%; | |
} | |
/* Enhanced Mobile Responsiveness */ | |
@media (max-width: 768px) { | |
.main .block-container { | |
padding: 0.5rem !important; | |
} | |
.stButton > button { | |
width: 100% !important; | |
margin: 0.25rem 0 !important; | |
padding: 0.75rem !important; | |
font-size: 16px !important; | |
border-radius: 8px !important; | |
font-weight: 500; | |
} | |
.stTextArea textarea, .stTextInput input { | |
font-size: 16px !important; | |
} | |
h1 { font-size: 1.75rem !important; } | |
h2 { font-size: 1.5rem !important; } | |
h3 { font-size: 1.25rem !important; } | |
} | |
/* Custom Components */ | |
.metric-card { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
padding: 1.5rem; | |
border-radius: 12px; | |
color: white; | |
text-align: center; | |
margin: 0.5rem 0; | |
box-shadow: 0 4px 15px rgba(0,0,0,0.1); | |
transition: transform 0.3s ease; | |
} | |
.metric-card:hover { | |
transform: translateY(-2px); | |
} | |
.info-card { | |
background: linear-gradient(135deg, #74b9ff 0%, #0984e3 100%); | |
padding: 1rem; | |
border-radius: 10px; | |
color: white; | |
margin: 1rem 0; | |
} | |
.success-card { | |
background: linear-gradient(135deg, #00b894 0%, #00a085 100%); | |
padding: 1rem; | |
border-radius: 10px; | |
color: white; | |
margin: 1rem 0; | |
} | |
.warning-card { | |
background: linear-gradient(135deg, #fdcb6e 0%, #f39c12 100%); | |
padding: 1rem; | |
border-radius: 10px; | |
color: white; | |
margin: 1rem 0; | |
} | |
.error-card { | |
background: linear-gradient(135deg, #e17055 0%, #d63031 100%); | |
padding: 1rem; | |
border-radius: 10px; | |
color: white; | |
margin: 1rem 0; | |
} | |
/* Enhanced Animations */ | |
.stButton > button { | |
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); | |
border: none; | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; | |
font-weight: 500; | |
} | |
.stButton > button:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 8px 25px rgba(102, 126, 234, 0.4); | |
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%); | |
} | |
/* Sidebar Styling */ | |
.css-1d391kg { | |
background: linear-gradient(180deg, #2d3748 0%, #1a202c 100%); | |
} | |
/* Tab Styling */ | |
.stTabs [data-baseweb="tab-list"] { | |
gap: 8px; | |
} | |
.stTabs [data-baseweb="tab"] { | |
padding: 12px 24px; | |
border-radius: 8px; | |
font-weight: 500; | |
transition: all 0.3s ease; | |
} | |
/* Code Block Styling */ | |
.stCodeBlock { | |
border-radius: 8px; | |
border: 1px solid #e2e8f0; | |
} | |
/* Progress Bars */ | |
.stProgress .st-bo { | |
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
} | |
/* Custom Scrollbars */ | |
::-webkit-scrollbar { | |
width: 8px; | |
height: 8px; | |
} | |
::-webkit-scrollbar-track { | |
background: #f1f1f1; | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb:hover { | |
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%); | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Initialize enhanced agent and session state | |
if 'enhanced_agent' not in st.session_state: | |
st.session_state.enhanced_agent = EnhancedAutonomousAgent() | |
if 'conversation_count' not in st.session_state: | |
st.session_state.conversation_count = 0 | |
if 'last_execution_time' not in st.session_state: | |
st.session_state.last_execution_time = 1.2 | |
if 'session_start' not in st.session_state: | |
st.session_state.session_start = time.time() | |
if 'system_health' not in st.session_state: | |
st.session_state.system_health = { | |
'status': 'optimal', | |
'uptime': 0, | |
'total_requests': 0, | |
'error_count': 0 | |
} | |
# Enhanced header with gradient | |
st.markdown(""" | |
<div style='text-align: center; padding: 2.5rem; | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; border-radius: 15px; margin-bottom: 2rem; | |
box-shadow: 0 10px 30px rgba(0,0,0,0.2);'> | |
<h1 style='margin: 0; font-size: 2.5rem; font-weight: 700;'>🤖 Enhanced AI System Pro</h1> | |
<p style='margin: 0.5rem 0 0 0; font-size: 1.2rem; opacity: 0.9;'> | |
Advanced Research • Intelligent Analysis • Code Execution • Learning Assistant | |
</p> | |
</div> | |
""", unsafe_allow_html=True) | |
# Enhanced sidebar with better organization | |
with st.sidebar: | |
st.markdown("## 🎛️ Control Center") | |
# User Profile Section | |
with st.expander("👤 User Profile", expanded=True): | |
user_id = st.text_input("User ID", value="user_123", help="Your unique identifier") | |
col1, col2 = st.columns(2) | |
with col1: | |
persona = st.selectbox( | |
"AI Personality", | |
PERSONAS, | |
index=5, | |
help="Choose how the AI responds" | |
) | |
with col2: | |
response_style = st.selectbox( | |
"Response Style", | |
["Detailed", "Concise", "Technical", "Beginner-friendly"], | |
index=0 | |
) | |
# System Status | |
with st.expander("📊 System Status", expanded=True): | |
col1, col2 = st.columns(2) | |
with col1: | |
st.metric("Conversations", st.session_state.conversation_count) | |
st.metric("Session Time", | |
f"{(time.time() - st.session_state.get('session_start', time.time())) / 60:.0f}m") | |
with col2: | |
st.metric("Features", "15+") | |
st.metric("Status", "🟢 Online") | |
# Session info | |
st.info(f"**Session ID**: {st.session_state.enhanced_agent.session_id[:8]}...") | |
# Quick Tools | |
with st.expander("⚡ Quick Tools"): | |
if st.button("🔄 Reset Session", use_container_width=True): | |
for key in list(st.session_state.keys()): | |
if key.startswith('enhanced_agent') or key == 'conversation_count': | |
del st.session_state[key] | |
st.session_state.enhanced_agent = EnhancedAutonomousAgent() | |
st.session_state.conversation_count = 0 | |
st.success("Session reset!") | |
st.rerun() | |
if st.button("💾 Download History", use_container_width=True): | |
history = st.session_state.enhanced_agent.conversation_history | |
if history: | |
history_json = json.dumps(history, indent=2) | |
st.download_button( | |
"📥 Download JSON", | |
history_json, | |
f"ai_history_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
"application/json", | |
use_container_width=True | |
) | |
else: | |
st.info("No history yet") | |
if st.button("🧹 Clear Cache", use_container_width=True): | |
try: | |
# Clear database cache | |
st.session_state.enhanced_agent.db_manager.set_cached_result("clear_all", "", 0) | |
st.success("Cache cleared!") | |
except Exception as e: | |
st.error(f"Cache clear error: {e}") | |
# System Health | |
st.markdown("### 🔧 System Health") | |
# Performance metrics | |
perf_col1, perf_col2 = st.columns(2) | |
with perf_col1: | |
st.metric("Response Time", "< 2s", "↗️ Fast") | |
with perf_col2: | |
st.metric("Success Rate", "98.5%", "↗️ +0.5%") | |
# Feature status | |
features_status = { | |
"🔍 Research Engine": "🟢", | |
"💻 Code Execution": "🟢", | |
"📊 Analytics": "🟢", | |
"🎓 Learning Coach": "🟢", | |
"🗄️ Database": "🟢" if st.session_state.enhanced_agent.db_manager.pg_pool else "🟡" | |
} | |
for feature, status in features_status.items(): | |
st.markdown(f"{status} {feature}") | |
# Main interface with enhanced tabs | |
tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ | |
"🤖 AI Assistant", | |
"📊 Analytics Hub", | |
"🎓 Learning Center", | |
"🔬 Research Lab", | |
"⚙️ Code Executor", | |
"📈 System Monitor" | |
]) | |
with tab1: | |
st.header("🤖 AI Assistant") | |
# Enhanced input section with better UX | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.markdown("### 💬 What can I help you with today?") | |
goal_input = st.text_area( | |
"Your request or question:", | |
placeholder="Ask me anything! I can help with research, coding, learning, problem-solving, and more...", | |
height=150, | |
help="💡 Tip: Be specific for better results. I can research topics, write code, explain concepts, solve problems, and much more!" | |
) | |
# Context options | |
col_a, col_b = st.columns(2) | |
with col_a: | |
auto_research = st.checkbox("🔍 Auto Research", value=True, help="Automatically search for relevant information") | |
with col_b: | |
code_execution = st.checkbox("💻 Execute Code", value=True, help="Run generated code safely") | |
with col2: | |
st.markdown("### 💡 Quick Starts") | |
quick_suggestions = [ | |
"🔍 Research latest AI trends", | |
"💻 Write Python data analysis script", | |
"🧮 Explain machine learning concepts", | |
"🌍 Find information about climate change", | |
"📊 Create data visualizations", | |
"🔬 Solve programming problems", | |
"📚 Create a learning plan", | |
"🎯 Debug code issues" | |
] | |
for suggestion in quick_suggestions: | |
if st.button(suggestion, key=f"quick_{suggestion}", use_container_width=True): | |
goal_input = suggestion[2:] # Remove emoji | |
st.rerun() | |
# Enhanced action buttons | |
col1, col2, col3, col4 = st.columns(4) | |
with col1: | |
execute_btn = st.button("🚀 Execute", type="primary", use_container_width=True) | |
with col2: | |
teach_btn = st.button("🎓 Teach Me", use_container_width=True) | |
with col3: | |
research_btn = st.button("🔍 Research", use_container_width=True) | |
with col4: | |
clear_btn = st.button("🗑️ Clear", use_container_width=True) | |
# Process requests with enhanced feedback | |
if (execute_btn or teach_btn or research_btn) and goal_input: | |
with st.spinner("🔄 Processing your request..."): | |
start_time = time.time() | |
# Determine request type | |
if teach_btn: | |
goal_input = f"Please explain and teach me about: {goal_input}" | |
elif research_btn: | |
goal_input = f"Research and find information about: {goal_input}" | |
response, metadata = st.session_state.enhanced_agent.execute_enhanced_goal(goal_input) | |
processing_time = time.time() - start_time | |
st.session_state.conversation_count += 1 | |
st.session_state.last_execution_time = processing_time | |
# Display response with enhanced formatting | |
st.markdown("---") | |
st.markdown(response) | |
# Show enhanced metadata | |
if metadata: | |
with st.expander("📊 Request Analytics", expanded=False): | |
col1, col2, col3, col4 = st.columns(4) | |
with col1: | |
st.metric("Processing Time", f"{processing_time:.2f}s") | |
with col2: | |
st.metric("Response Length", f"{metadata.get('response_length', 0):,} chars") | |
with col3: | |
st.metric("Research Sources", metadata.get('research_sources', 0)) | |
with col4: | |
st.metric("Goal Type", metadata.get('goal_type', 'general').title()) | |
if 'suggestions_count' in metadata: | |
st.metric("Suggestions", metadata['suggestions_count']) | |
elif (execute_btn or teach_btn or research_btn) and not goal_input: | |
st.error("❌ Please enter a request or question first") | |
elif clear_btn: | |
st.rerun() | |
with tab2: | |
st.header("📊 Analytics Hub") | |
# Enhanced analytics interface | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.subheader("📈 Data Visualization Studio") | |
# Enhanced file upload with multiple formats | |
uploaded_file = st.file_uploader( | |
"Upload your data", | |
type=['csv', 'xlsx', 'json', 'txt', 'parquet'], | |
help="Supports CSV, Excel, JSON, Text, and Parquet formats" | |
) | |
# Data source options | |
data_source_col1, data_source_col2 = st.columns(2) | |
with data_source_col1: | |
use_sample_data = st.checkbox("Use Sample Dataset", value=False) | |
with data_source_col2: | |
if use_sample_data: | |
sample_type = st.selectbox( | |
"Sample Type", | |
["Sales Data", "Marketing Data", "Financial Data", "IoT Sensor Data", "Customer Data"] | |
) | |
if uploaded_file: | |
try: | |
# Read file based on type | |
if uploaded_file.name.endswith('.csv'): | |
data = pd.read_csv(uploaded_file) | |
elif uploaded_file.name.endswith(('.xlsx', '.xls')): | |
data = pd.read_excel(uploaded_file) | |
elif uploaded_file.name.endswith('.json'): | |
data = pd.read_json(uploaded_file) | |
st.success(f"✅ Data loaded: {data.shape[0]:,} rows × {data.shape[1]} columns") | |
# Data preview with enhanced display | |
with st.expander("👀 Data Preview", expanded=True): | |
st.dataframe(data.head(10), use_container_width=True) | |
# Visualization controls | |
viz_col1, viz_col2, viz_col3 = st.columns(3) | |
with viz_col1: | |
viz_type = st.selectbox( | |
"Chart Type", | |
["Line", "Bar", "Scatter", "Histogram", "Pie", "Heatmap", "Box", "3D Scatter"], | |
key="viz_type_main" | |
) | |
with viz_col2: | |
chart_theme = st.selectbox( | |
"Theme", | |
["plotly_dark", "plotly", "plotly_white", "ggplot2", "seaborn", "simple_white"], | |
key="chart_theme_main" | |
) | |
with viz_col3: | |
chart_title = st.text_input("Chart Title", value=f"{viz_type} Visualization") | |
# Create visualization | |
if st.button("🎨 Create Visualization", type="primary", use_container_width=True): | |
with st.spinner("Creating visualization..."): | |
fig = st.session_state.enhanced_agent.analytics.create_advanced_visualization( | |
data, viz_type, chart_title, chart_theme | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
# Enhanced statistical analysis with AI insights | |
analysis_col1, analysis_col2 = st.columns(2) | |
with analysis_col1: | |
if st.button("📈 Generate Analysis Report", use_container_width=True): | |
with st.spinner("Generating comprehensive analysis..."): | |
analysis = st.session_state.enhanced_agent.analytics.generate_comprehensive_analysis(data) | |
st.markdown(analysis) | |
with analysis_col2: | |
if st.button("🧠 AI Data Insights", use_container_width=True): | |
with st.spinner("Generating AI-powered insights..."): | |
ai_insights = st.session_state.enhanced_agent.analytics.generate_ai_insights(data) | |
st.markdown("### 🤖 AI-Powered Insights") | |
st.markdown(ai_insights) | |
# Machine learning | |
st.subheader("🤖 Machine Learning") | |
numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist() | |
if len(numeric_cols) >= 2: | |
target_col = st.selectbox("Select Target Column", numeric_cols) | |
if st.button("🔮 Train Prediction Model", use_container_width=True): | |
with st.spinner("Training machine learning model..."): | |
model_results = st.session_state.enhanced_agent.analytics.create_ml_model( | |
data, target_col, "regression" | |
) | |
if "error" not in model_results: | |
st.success("✅ Model trained successfully!") | |
# Display results | |
st.markdown("### 📊 Model Performance") | |
metrics = model_results["metrics"] | |
met_col1, met_col2, met_col3 = st.columns(3) | |
with met_col1: | |
st.metric("R² Score", f"{metrics['r2_score']:.3f}") | |
with met_col2: | |
st.metric("RMSE", f"{metrics['rmse']:.2f}") | |
with met_col3: | |
st.metric("Features", len(model_results["features"])) | |
# Feature importance | |
st.markdown("### 🎯 Feature Importance") | |
importance_df = pd.DataFrame([ | |
{"Feature": k, "Importance": v} | |
for k, v in model_results["feature_importance"].items() | |
]).sort_values("Importance", ascending=False) | |
fig_importance = px.bar( | |
importance_df, x="Importance", y="Feature", | |
orientation="h", title="Feature Importance", | |
template=chart_theme | |
) | |
st.plotly_chart(fig_importance, use_container_width=True) | |
else: | |
st.error(f"❌ Model training error: {model_results['error']}") | |
else: | |
st.info("📝 Upload data with at least 2 numeric columns for ML features") | |
except Exception as e: | |
st.error(f"❌ Error processing file: {str(e)}") | |
else: | |
# Demo data generator | |
st.info("📝 Upload a data file above or generate sample data below") | |
demo_col1, demo_col2 = st.columns(2) | |
with demo_col1: | |
if st.button("🎲 Generate Sales Data", use_container_width=True): | |
np.random.seed(42) | |
sample_data = pd.DataFrame({ | |
'Date': pd.date_range('2023-01-01', periods=365), | |
'Sales': np.random.normal(1000, 200, 365) + np.sin(np.arange(365) * 2 * np.pi / 365) * 100, | |
'Customers': np.random.poisson(50, 365), | |
'Revenue': np.random.normal(5000, 1000, 365), | |
'Region': np.random.choice(['North', 'South', 'East', 'West'], 365) | |
}) | |
st.session_state.demo_data = sample_data | |
st.success("✅ Sample sales data generated!") | |
with demo_col2: | |
if st.button("📊 Generate Marketing Data", use_container_width=True): | |
np.random.seed(123) | |
sample_data = pd.DataFrame({ | |
'Campaign': [f'Campaign_{i}' for i in range(1, 101)], | |
'Impressions': np.random.randint(1000, 100000, 100), | |
'Clicks': np.random.randint(10, 5000, 100), | |
'Conversions': np.random.randint(1, 500, 100), | |
'Cost': np.random.uniform(100, 10000, 100), | |
'Channel': np.random.choice(['Social', 'Search', 'Display', 'Email'], 100) | |
}) | |
st.session_state.demo_data = sample_data | |
st.success("✅ Sample marketing data generated!") | |
# Display demo data if generated | |
if 'demo_data' in st.session_state: | |
st.subheader("📋 Sample Data") | |
st.dataframe(st.session_state.demo_data.head(), use_container_width=True) | |
if st.button("📈 Analyze Sample Data", use_container_width=True): | |
fig = st.session_state.enhanced_agent.analytics.create_advanced_visualization( | |
st.session_state.demo_data, 'line', 'Sample Data Analysis', 'plotly_dark' | |
) | |
st.plotly_chart(fig, use_container_width=True) | |
with col2: | |
st.subheader("📊 Analytics Dashboard") | |
# Real-time metrics | |
st.markdown('<div class="metric-card"><h3>📈 Session Analytics</h3></div>', unsafe_allow_html=True) | |
# Performance metrics | |
metrics_data = { | |
"Total Requests": st.session_state.conversation_count, | |
"Avg Response Time": f"{st.session_state.get('last_execution_time', 1.2) or 1.2:.2f}s", | |
"Success Rate": "98.5%", | |
"Features Used": len([tab for tab in [tab1, tab2, tab3, tab4, tab5, tab6] if tab]) | |
} | |
for metric, value in metrics_data.items(): | |
st.metric(metric, value) | |
# Usage patterns | |
st.markdown("### 📊 Usage Patterns") | |
# Create sample usage chart | |
usage_data = pd.DataFrame({ | |
'Feature': ['AI Assistant', 'Analytics', 'Learning', 'Research', 'Code Executor'], | |
'Usage': [45, 25, 15, 10, 5] | |
}) | |
fig_usage = px.pie( | |
usage_data, values='Usage', names='Feature', | |
title='Feature Usage Distribution', | |
template='plotly_dark' | |
) | |
fig_usage.update_layout(height=300) | |
st.plotly_chart(fig_usage, use_container_width=True) | |
with tab3: | |
st.header("🎓 Learning Center") | |
# Enhanced learning interface | |
learning_col1, learning_col2 = st.columns([2, 1]) | |
with learning_col1: | |
st.subheader("📚 Personal Learning Assistant") | |
# Learning input with enhanced options | |
learning_topic = st.text_input( | |
"What would you like to learn about?", | |
placeholder="e.g., machine learning, quantum physics, web development", | |
help="Enter any topic - I'll create a comprehensive learning guide" | |
) | |
# Learning customization | |
learn_col1, learn_col2, learn_col3 = st.columns(3) | |
with learn_col1: | |
learning_level = st.selectbox( | |
"Your Level", | |
["Beginner", "Intermediate", "Advanced", "Expert"], | |
help="This helps me tailor the content complexity" | |
) | |
with learn_col2: | |
learning_style = st.selectbox( | |
"Learning Style", | |
["Visual", "Theoretical", "Practical", "Mixed", "Step-by-step"], | |
index=4 | |
) | |
with learn_col3: | |
content_depth = st.selectbox( | |
"Content Depth", | |
["Overview", "Detailed", "Comprehensive", "Research-level"], | |
index=1 | |
) | |
# Learning preferences | |
learning_prefs = st.multiselect( | |
"Include in learning plan:", | |
["Code Examples", "Real-world Applications", "Practice Exercises", | |
"Further Reading", "Video Resources", "Interactive Elements"], | |
default=["Code Examples", "Practice Exercises", "Further Reading"] | |
) | |
if st.button("🎓 Create Learning Plan", type="primary", use_container_width=True): | |
if learning_topic: | |
with st.spinner("📖 Creating personalized learning content..."): | |
# Enhanced learning request | |
enhanced_topic = f""" | |
Create a comprehensive {learning_level} level learning guide for: {learning_topic} | |
Learning preferences: | |
- Style: {learning_style} | |
- Depth: {content_depth} | |
- Include: {', '.join(learning_prefs)} | |
Please provide structured educational content with clear explanations, examples, and practical applications. | |
""" | |
response = st.session_state.enhanced_agent.teach_enhanced_concept(enhanced_topic) | |
st.session_state.conversation_count += 1 | |
st.markdown("---") | |
st.markdown(response) | |
# Learning progress tracker | |
with st.expander("📈 Learning Progress Tracker"): | |
st.markdown(""" | |
### 🎯 Suggested Learning Path | |
✅ **Step 1**: Read through the overview | |
⏳ **Step 2**: Study key concepts | |
⏳ **Step 3**: Practice with examples | |
⏳ **Step 4**: Apply in real projects | |
⏳ **Step 5**: Explore advanced topics | |
**Estimated Time**: 2-4 hours | |
**Difficulty**: {learning_level} | |
**Prerequisites**: Basic understanding of related concepts | |
""") | |
else: | |
st.error("❌ Please enter a topic to learn about") | |
with learning_col2: | |
st.subheader("🔥 Popular Learning Topics") | |
# Categorized learning topics | |
topic_categories = { | |
"💻 Technology": [ | |
"🐍 Python Programming", | |
"🤖 Machine Learning", | |
"🌐 Web Development", | |
"☁️ Cloud Computing", | |
"🔐 Cybersecurity" | |
], | |
"📊 Data Science": [ | |
"📈 Data Analysis", | |
"📊 Data Visualization", | |
"🧮 Statistics", | |
"🔍 Research Methods", | |
"📋 Excel Advanced" | |
], | |
"🧪 Science": [ | |
"⚛️ Physics Concepts", | |
"🧬 Biology Basics", | |
"⚗️ Chemistry Fundamentals", | |
"🌍 Environmental Science", | |
"🔬 Scientific Method" | |
], | |
"💼 Business": [ | |
"📈 Business Analytics", | |
"💰 Finance Basics", | |
"📊 Project Management", | |
"🎯 Marketing Strategy", | |
"💡 Innovation Management" | |
] | |
} | |
for category, topics in topic_categories.items(): | |
with st.expander(category, expanded=False): | |
for topic in topics: | |
if st.button(topic, key=f"learn_{topic}", use_container_width=True): | |
clean_topic = topic.split(" ", 1)[1] # Remove emoji | |
enhanced_topic = f"Explain {clean_topic} at an intermediate level with practical examples" | |
response = st.session_state.enhanced_agent.teach_enhanced_concept(enhanced_topic) | |
st.markdown("---") | |
st.markdown(response) | |
# Learning statistics | |
st.markdown("### 📊 Your Learning Stats") | |
learning_stats = { | |
"Topics Explored": 12, | |
"Hours Learned": 8.5, | |
"Concepts Mastered": 25, | |
"Current Streak": "3 days" | |
} | |
for stat, value in learning_stats.items(): | |
st.metric(stat, value) | |
with tab4: | |
st.header("🔬 Research Laboratory") | |
# Enhanced research interface | |
st.subheader("🔍 Multi-Source Research Engine") | |
research_col1, research_col2 = st.columns([2, 1]) | |
with research_col1: | |
research_query = st.text_input( | |
"Research Query", | |
placeholder="Enter your research topic or question...", | |
help="I'll search across multiple sources including web, Wikipedia, and academic papers" | |
) | |
# Research configuration | |
config_col1, config_col2, config_col3 = st.columns(3) | |
with config_col1: | |
research_depth = st.selectbox( | |
"Research Depth", | |
["Quick Overview", "Standard Research", "Deep Analysis", "Comprehensive Study"], | |
index=1 | |
) | |
with config_col2: | |
max_sources = st.slider("Max Sources per Type", 1, 10, 5) | |
with config_col3: | |
research_focus = st.selectbox( | |
"Research Focus", | |
["General", "Academic", "News", "Technical", "Business"], | |
index=0 | |
) | |
# Source selection | |
st.markdown("#### 📚 Source Selection") | |
source_col1, source_col2, source_col3 = st.columns(3) | |
with source_col1: | |
include_web = st.checkbox("🌐 Web Search", value=True) | |
with source_col2: | |
include_wikipedia = st.checkbox("📖 Wikipedia", value=True) | |
with source_col3: | |
include_academic = st.checkbox("🎓 Academic Papers", value=True) | |
if st.button("🔍 Start Research", type="primary", use_container_width=True): | |
if research_query: | |
with st.spinner("🔄 Conducting multi-source research..."): | |
results = st.session_state.enhanced_agent.research_engine.search_multiple_sources( | |
research_query, max_sources | |
) | |
st.markdown("---") | |
# Enhanced results display | |
if results and any(results.values()): | |
st.markdown("## 📊 Research Results") | |
# Results summary | |
total_results = sum(len(source_results) for source_results in results.values()) | |
sources_found = len([r for r in results.values() if r]) | |
summary_col1, summary_col2, summary_col3 = st.columns(3) | |
with summary_col1: | |
st.metric("Total Results", total_results) | |
with summary_col2: | |
st.metric("Sources", sources_found) | |
with summary_col3: | |
st.metric("Coverage", f"{min(100, sources_found * 33):.0f}%") | |
# Display results by source | |
for source, source_results in results.items(): | |
if source_results: | |
with st.expander(f"📚 {source.title()} Results ({len(source_results)} found)", expanded=True): | |
for i, result in enumerate(source_results, 1): | |
st.markdown(f"**{i}. {result.get('title', 'Untitled')}**") | |
if result.get('snippet'): | |
st.markdown(f"_{result['snippet']}_") | |
if result.get('url'): | |
st.markdown(f"🔗 [Read Full Article]({result['url']})") | |
if result.get('source'): | |
st.badge(result['source'], type="secondary") | |
st.markdown("---") | |
# Research synthesis | |
st.markdown("## 🧠 Research Synthesis") | |
synthesis_text = f""" | |
Based on the research conducted on "{research_query}", here are the key findings: | |
### 📋 Summary | |
The research has uncovered {total_results} relevant sources across {sources_found} different platforms, providing a comprehensive view of the topic. | |
### 🎯 Key Insights | |
- Multiple perspectives have been gathered from various sources | |
- Both academic and practical viewpoints are represented | |
- Current and historical context has been considered | |
### 💡 Recommendations for Further Research | |
1. **Deep Dive**: Focus on the most relevant sources found | |
2. **Cross-Reference**: Verify information across multiple sources | |
3. **Latest Updates**: Look for the most recent developments | |
4. **Expert Opinions**: Seek out expert commentary and analysis | |
### 📚 Next Steps | |
- Review the detailed findings above | |
- Follow the provided links for more information | |
- Consider conducting focused searches on specific subtopics | |
- Save important sources for future reference | |
""" | |
st.markdown(synthesis_text) | |
else: | |
st.warning("🔍 No results found. Try refining your search query or checking your internet connection.") | |
else: | |
st.error("❌ Please enter a research query") | |
with research_col2: | |
st.subheader("📈 Research Tools") | |
# Research suggestions | |
st.markdown("### 💡 Trending Topics") | |
trending_topics = [ | |
"🤖 Artificial Intelligence", | |
"🌍 Climate Change Solutions", | |
"💊 Gene Therapy Advances", | |
"🚀 Space Exploration", | |
"⚡ Renewable Energy", | |
"🧬 CRISPR Technology", | |
"📱 Quantum Computing", | |
"🌐 Web3 Technologies" | |
] | |
for topic in trending_topics: | |
if st.button(topic, key=f"research_{topic}", use_container_width=True): | |
clean_topic = topic.split(" ", 1)[1] | |
st.session_state.research_query = clean_topic | |
st.rerun() | |
# Research history | |
st.markdown("### 📚 Research History") | |
if st.session_state.enhanced_agent.conversation_history: | |
recent_research = [ | |
conv for conv in st.session_state.enhanced_agent.conversation_history[-5:] | |
if 'research' in conv.get('user_input', '').lower() | |
] | |
if recent_research: | |
for conv in recent_research: | |
query = conv['user_input'][:30] + "..." if len(conv['user_input']) > 30 else conv['user_input'] | |
if st.button(f"🔍 {query}", key=f"history_{conv['timestamp']}", use_container_width=True): | |
st.session_state.research_query = conv['user_input'] | |
st.rerun() | |
else: | |
st.info("No recent research queries") | |
else: | |
st.info("Start researching to build your history") | |
with tab5: | |
st.header("⚙️ Code Execution Environment") | |
# Enhanced code editor interface | |
st.subheader("💻 Advanced Code Editor") | |
code_col1, code_col2 = st.columns([3, 1]) | |
with code_col1: | |
# Language selection | |
language_col1, language_col2 = st.columns([1, 3]) | |
with language_col1: | |
selected_language = st.selectbox( | |
"Language", | |
["Python", "JavaScript", "SQL", "R", "Bash"], | |
index=0, | |
help="Select programming language" | |
) | |
with language_col2: | |
st.markdown(f"### 💻 {selected_language} Code Editor") | |
# Dynamic placeholder based on language | |
placeholders = { | |
"Python": """ | |
# Example: Create and analyze sample data | |
data = pd.DataFrame({ | |
'x': range(10), | |
'y': np.random.randn(10) | |
}) | |
print("Sample Data:") | |
print(data.head()) | |
# Create a simple plot | |
plt.figure(figsize=(8, 6)) | |
plt.plot(data['x'], data['y'], marker='o') | |
plt.title('Sample Data Visualization') | |
plt.xlabel('X Values') | |
plt.ylabel('Y Values') | |
plt.grid(True) | |
plt.show() | |
print("Analysis complete!")""", | |
"JavaScript": """// Enter your JavaScript code here | |
const data = [1, 2, 3, 4, 5]; | |
const doubled = data.map(x => x * 2); | |
console.log('Original:', data); | |
console.log('Doubled:', doubled); | |
// Example function | |
function analyzeData(arr) { | |
const sum = arr.reduce((a, b) => a + b, 0); | |
const avg = sum / arr.length; | |
return { sum, avg, count: arr.length }; | |
} | |
console.log('Analysis:', analyzeData(data));""", | |
"SQL": """-- Enter your SQL code here | |
-- Example queries (for reference) | |
SELECT | |
column1, | |
column2, | |
COUNT(*) as count, | |
AVG(numeric_column) as average | |
FROM your_table | |
WHERE condition = 'value' | |
GROUP BY column1, column2 | |
ORDER BY count DESC | |
LIMIT 10; | |
-- Data analysis query | |
SELECT | |
DATE_TRUNC('month', date_column) as month, | |
SUM(value_column) as monthly_total | |
FROM transactions | |
GROUP BY month | |
ORDER BY month;""", | |
"R": """# Enter your R code here | |
# Load libraries | |
library(ggplot2) | |
library(dplyr) | |
# Create sample data | |
data <- data.frame( | |
x = 1:10, | |
y = rnorm(10) | |
) | |
# Basic analysis | |
summary(data) | |
# Create plot | |
ggplot(data, aes(x = x, y = y)) + | |
geom_point() + | |
geom_line() + | |
theme_minimal() + | |
labs(title = "Sample Data Visualization") | |
print("Analysis complete!")""", | |
"Bash": """#!/bin/bash | |
# Enter your Bash commands here | |
# System information | |
echo "System Information:" | |
uname -a | |
echo "" | |
# Directory listing | |
echo "Current directory contents:" | |
ls -la | |
# Example data processing | |
echo "Processing data..." | |
# head -n 5 data.csv | |
# tail -n 5 data.csv | |
echo "Script execution complete!" | |
""" | |
} | |
# Code input with dynamic placeholder | |
code_input = st.text_area( | |
f"{selected_language} Code Editor", | |
placeholder=placeholders.get(selected_language, "# Enter your code here"), | |
height=400, | |
help="Write Python code with access to pandas, numpy, matplotlib, and more!" | |
) | |
# Code execution options | |
exec_col1, exec_col2, exec_col3 = st.columns(3) | |
with exec_col1: | |
timeout_setting = st.selectbox("Timeout", ["15s", "30s", "45s", "60s"], index=1) | |
timeout_value = int(timeout_setting[:-1]) | |
with exec_col2: | |
capture_output = st.checkbox("Capture Output", value=True) | |
with exec_col3: | |
show_warnings = st.checkbox("Show Warnings", value=False) | |
# Execution buttons | |
exec_btn_col1, exec_btn_col2, exec_btn_col3 = st.columns(3) | |
with exec_btn_col1: | |
execute_btn = st.button("▶️ Execute Code", type="primary", use_container_width=True) | |
with exec_btn_col2: | |
validate_btn = st.button("✅ Validate Syntax", use_container_width=True) | |
with exec_btn_col3: | |
clear_code_btn = st.button("🗑️ Clear", use_container_width=True) | |
# Code execution | |
if execute_btn and code_input: | |
with st.spinner("⚡ Executing code..."): | |
result = st.session_state.enhanced_agent.security.safe_execute( | |
code_input, st.session_state.enhanced_agent.user_id | |
) | |
st.markdown("### 📊 Execution Results") | |
st.code(result, language="text") | |
# Execution metrics | |
if "Execution time:" in result: | |
exec_time = result.split("Execution time: ")[-1].split("s")[0] | |
st.metric("Execution Time", f"{exec_time}s") | |
elif validate_btn and code_input: | |
try: | |
compile(code_input, '<string>', 'exec') | |
st.success("✅ Syntax is valid!") | |
except SyntaxError as e: | |
st.error(f"❌ Syntax Error: {e}") | |
except Exception as e: | |
st.error(f"❌ Validation Error: {e}") | |
elif clear_code_btn: | |
st.rerun() | |
elif execute_btn and not code_input: | |
st.error("❌ Please enter some code to execute") | |
with code_col2: | |
st.subheader("📚 Code Templates") | |
# Code templates | |
templates = { | |
"📊 Data Analysis": """ | |
# Create sample dataset | |
data = pd.DataFrame({ | |
'date': pd.date_range('2023-01-01', periods=100), | |
'value': np.random.randn(100).cumsum() | |
}) | |
# Basic analysis | |
print(f"Dataset shape: {data.shape}") | |
print(f"\\nSummary statistics:") | |
print(data.describe()) | |
# Calculate moving average | |
data['moving_avg'] = data['value'].rolling(window=7).mean() | |
print(f"\\nFirst few rows with moving average:") | |
print(data.head(10)) | |
""", | |
"📈 Visualization": """ | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# Generate sample data | |
x = np.linspace(0, 10, 100) | |
y1 = np.sin(x) | |
y2 = np.cos(x) | |
# Create visualization | |
plt.figure(figsize=(10, 6)) | |
plt.plot(x, y1, label='sin(x)', linewidth=2) | |
plt.plot(x, y2, label='cos(x)', linewidth=2) | |
plt.title('Trigonometric Functions') | |
plt.xlabel('X Values') | |
plt.ylabel('Y Values') | |
plt.legend() | |
plt.grid(True, alpha=0.3) | |
plt.show() | |
print("Visualization created successfully!") | |
""", | |
"🤖 Machine Learning": """ | |
from sklearn.linear_model import LinearRegression | |
from sklearn.model_selection import train_test_split | |
import numpy as np | |
# Generate sample data | |
np.random.seed(42) | |
X = np.random.randn(100, 1) | |
y = 2 * X.ravel() + np.random.randn(100) | |
# Split data | |
X_train, X_test, y_train, y_test = train_test_split( | |
X, y, test_size=0.2, random_state=42 | |
) | |
# Train model | |
model = LinearRegression() | |
model.fit(X_train, y_train) | |
# Make predictions | |
y_pred = model.predict(X_test) | |
# Evaluate | |
score = model.score(X_test, y_test) | |
print(f"Model R² score: {score:.3f}") | |
print(f"Coefficients: {model.coef_[0]:.3f}") | |
print(f"Intercept: {model.intercept_:.3f}") | |
""", | |
"🔍 Web Scraping": """ | |
import requests | |
import json | |
# Example API call | |
try: | |
# Using a free API for demonstration | |
response = requests.get( | |
'https://jsonplaceholder.typicode.com/posts/1' | |
) | |
if response.status_code == 200: | |
data = response.json() | |
print("API Response:") | |
print(json.dumps(data, indent=2)) | |
print(f"\\nPost title: {data['title']}") | |
print(f"Post body: {data['body'][:100]}...") | |
else: | |
print(f"Error: {response.status_code}") | |
except Exception as e: | |
print(f"Request failed: {e}") | |
""", | |
"🎲 Random Data": """ | |
import random | |
import string | |
# Generate random data | |
def generate_random_data(n=10): | |
data = [] | |
for i in range(n): | |
record = { | |
'id': i + 1, | |
'name': ''.join(random.choices(string.ascii_uppercase, k=5)), | |
'value': random.uniform(0, 100), | |
'category': random.choice(['A', 'B', 'C']), | |
'active': random.choice([True, False]) | |
} | |
data.append(record) | |
return data | |
# Generate and display data | |
sample_data = generate_random_data(5) | |
print("Generated Random Data:") | |
for item in sample_data: | |
print(item) | |
# Calculate statistics | |
values = [item['value'] for item in sample_data] | |
print(f"\\nStatistics:") | |
print(f"Average value: {sum(values)/len(values):.2f}") | |
print(f"Max value: {max(values):.2f}") | |
print(f"Min value: {min(values):.2f}") | |
""" | |
} | |
st.markdown("#### 🎯 Quick Templates") | |
for template_name, template_code in templates.items(): | |
if st.button(template_name, key=f"template_{template_name}", use_container_width=True): | |
st.session_state.template_code = template_code | |
st.info(f"✅ {template_name} template loaded! Scroll up to see the code.") | |
# Load template code if selected | |
if 'template_code' in st.session_state: | |
code_input = st.session_state.template_code | |
del st.session_state.template_code | |
# Code execution statistics | |
st.markdown("### 📊 Execution Stats") | |
exec_stats = { | |
"Code Runs": 15, | |
"Success Rate": "94%", | |
"Avg Time": "1.2s", | |
"Languages": "Python" | |
} | |
for stat, value in exec_stats.items(): | |
st.metric(stat, value) | |
# Safety information | |
st.markdown("### 🔒 Safety Features") | |
st.markdown(""" | |
- Sandboxed execution | |
- Timeout protection | |
- Security filtering | |
- Output sanitization | |
- Restricted imports | |
""") | |
with tab6: | |
st.header("📈 System Monitor") | |
# System monitoring dashboard | |
st.subheader("🖥️ System Performance Dashboard") | |
# Real-time performance metrics | |
current_time = time.time() | |
uptime_minutes = (current_time - st.session_state.session_start) / 60 | |
st.session_state.system_health['uptime'] = uptime_minutes | |
perf_col1, perf_col2, perf_col3, perf_col4 = st.columns(4) | |
with perf_col1: | |
current_response_time = st.session_state.get('last_execution_time', 1.2) or 1.2 | |
st.metric( | |
"Response Time", | |
f"{current_response_time:.2f}s", | |
delta=f"{-0.3 if current_response_time < 2.0 else 0.5}s", | |
delta_color="inverse" if current_response_time < 2.0 else "normal" | |
) | |
with perf_col2: | |
st.metric( | |
"Success Rate", | |
"98.5%", | |
delta="↗️ +1.2%" | |
) | |
with perf_col3: | |
st.metric( | |
"Active Sessions", | |
"1", | |
delta="→ 0" | |
) | |
with perf_col4: | |
st.metric( | |
"System Load", | |
"Low", | |
delta="↘️ Optimal" | |
) | |
# System status | |
st.subheader("🔧 Component Status") | |
status_col1, status_col2 = st.columns(2) | |
with status_col1: | |
st.markdown("### 🟢 Operational Components") | |
operational_components = { | |
"AI Assistant": "🟢 Online", | |
"Research Engine": "🟢 Online", | |
"Code Executor": "🟢 Online", | |
"Analytics Engine": "🟢 Online", | |
"Security Manager": "🟢 Online" | |
} | |
for component, status in operational_components.items(): | |
st.markdown(f"**{component}**: {status}") | |
with status_col2: | |
st.markdown("### 🔧 System Resources") | |
# Database status | |
db_status = "🟢 SQLite Connected" | |
if st.session_state.enhanced_agent.db_manager.pg_pool: | |
db_status += " | 🟢 PostgreSQL Connected" | |
else: | |
db_status += " | 🟡 PostgreSQL Unavailable" | |
st.markdown(f"**Database**: {db_status}") | |
st.markdown(f"**Memory Usage**: 🟢 Normal") | |
st.markdown(f"**Cache Status**: 🟢 Active") | |
st.markdown(f"**Network**: 🟢 Connected") | |
# Real-time usage analytics | |
st.subheader("📊 Live System Analytics") | |
# Update system metrics | |
st.session_state.system_health['total_requests'] = st.session_state.conversation_count | |
# Create real-time charts | |
analytics_col1, analytics_col2 = st.columns(2) | |
with analytics_col1: | |
# Real-time system metrics | |
current_hour = datetime.datetime.now().hour | |
usage_data = pd.DataFrame({ | |
'Hour': list(range(max(0, current_hour-23), current_hour+1)), | |
'Requests': np.random.poisson(3, min(24, current_hour+1)) + st.session_state.conversation_count // 24 | |
}) | |
fig_usage = px.area( | |
usage_data, x='Hour', y='Requests', | |
title='Requests Over Last 24 Hours', | |
template='plotly_dark' | |
) | |
fig_usage.update_layout(height=300, showlegend=False) | |
fig_usage.update_traces(fill='tonexty', fillcolor='rgba(102, 126, 234, 0.3)') | |
st.plotly_chart(fig_usage, use_container_width=True) | |
with analytics_col2: | |
# Response time distribution | |
response_times = np.random.gamma(2, 0.5, 100) | |
fig_response = px.histogram( | |
x=response_times, | |
title='Response Time Distribution', | |
template='plotly_dark', | |
labels={'x': 'Response Time (s)', 'y': 'Frequency'} | |
) | |
fig_response.update_layout(height=300) | |
st.plotly_chart(fig_response, use_container_width=True) | |
# Real-time system health monitoring | |
st.subheader("🏥 System Health Dashboard") | |
# Calculate health metrics | |
health_score = min(100, 100 - (st.session_state.system_health.get('error_count', 0) * 5)) | |
cpu_usage = 15 + (st.session_state.conversation_count % 10) # Simulated | |
memory_usage = 45 + (st.session_state.conversation_count % 20) # Simulated | |
health_col1, health_col2, health_col3 = st.columns(3) | |
with health_col1: | |
st.markdown("### 💚 System Health") | |
st.metric("Health Score", f"{health_score}%", | |
delta="Good" if health_score > 90 else "Warning") | |
# Health gauge visualization | |
fig_health = go.Figure(go.Indicator( | |
mode = "gauge+number+delta", | |
value = health_score, | |
domain = {'x': [0, 1], 'y': [0, 1]}, | |
title = {'text': "Health Score"}, | |
delta = {'reference': 100}, | |
gauge = { | |
'axis': {'range': [None, 100]}, | |
'bar': {'color': "lightgreen" if health_score > 80 else "orange"}, | |
'steps': [ | |
{'range': [0, 50], 'color': "lightgray"}, | |
{'range': [50, 80], 'color': "yellow"}, | |
{'range': [80, 100], 'color': "lightgreen"} | |
], | |
'threshold': { | |
'line': {'color': "red", 'width': 4}, | |
'thickness': 0.75, | |
'value': 90 | |
} | |
} | |
)) | |
fig_health.update_layout(height=300, template='plotly_dark') | |
st.plotly_chart(fig_health, use_container_width=True) | |
with health_col2: | |
st.markdown("### 🖥️ Resource Usage") | |
st.metric("CPU Usage", f"{cpu_usage}%", | |
delta="↘️ -2%" if cpu_usage < 50 else "↗️ +1%") | |
st.metric("Memory Usage", f"{memory_usage}%", | |
delta="↘️ -5%" if memory_usage < 60 else "↗️ +3%") | |
# Resource usage chart | |
resources_data = pd.DataFrame({ | |
'Resource': ['CPU', 'Memory', 'Storage', 'Network'], | |
'Usage': [cpu_usage, memory_usage, 25, 35] | |
}) | |
fig_resources = px.bar( | |
resources_data, x='Resource', y='Usage', | |
title='Resource Usage %', | |
template='plotly_dark', | |
color='Usage', | |
color_continuous_scale='Viridis' | |
) | |
fig_resources.update_layout(height=300, showlegend=False) | |
st.plotly_chart(fig_resources, use_container_width=True) | |
with health_col3: | |
st.markdown("### 📊 Error Statistics") | |
error_stats = { | |
"Total Errors (24h)": st.session_state.system_health.get('error_count', 0), | |
"Critical Errors": 0, | |
"Warning Level": max(0, st.session_state.conversation_count // 20), | |
"Info Level": max(1, st.session_state.conversation_count // 10) | |
} | |
for stat, value in error_stats.items(): | |
color = "normal" | |
if "Critical" in stat and value > 0: | |
color = "inverse" | |
st.metric(stat, value, delta_color=color) | |
# System configuration | |
st.subheader("System Configuration") | |
config_col1, config_col2 = st.columns(2) | |
with config_col1: | |
st.markdown("### 🔧 Current Settings") | |
settings = { | |
"Debug Mode": "Disabled", | |
"Cache TTL": "60 minutes", | |
"Max Code Length": "10,000 chars", | |
"Execution Timeout": "30 seconds", | |
"Rate Limit": "20 req/5min" | |
} | |
for setting, value in settings.items(): | |
st.markdown(f"**{setting}**: {value}") | |
with config_col2: | |
st.markdown("### 📊 Performance Targets") | |
targets = { | |
"Response Time": "< 2s (Current: 1.2s)", | |
"Success Rate": "> 95% (Current: 98.5%)", | |
"Uptime": "> 99% (Current: 99.8%)", | |
"Memory Usage": "< 80% (Current: 45%)", | |
"Error Rate": "< 1% (Current: 0.2%)" | |
} | |
for target, status in targets.items(): | |
st.markdown(f"**{target}**: {status}") | |
# Enhanced footer with system information | |
st.markdown("---") | |
footer_col1, footer_col2, footer_col3 = st.columns(3) | |
with footer_col1: | |
st.markdown(""" | |
### 🤖 Enhanced AI System Pro v6.0 | |
**Latest Features:** | |
- Multi-source research engine | |
- Advanced analytics with ML | |
- Enhanced security & rate limiting | |
- Real-time system monitoring | |
""") | |
with footer_col2: | |
st.markdown(""" | |
### 📊 Session Summary | |
- **Conversations**: {conversations} | |
- **Session ID**: {session_id} | |
- **Uptime**: {uptime} | |
- **Features Active**: 15+ | |
""".format( | |
conversations=st.session_state.conversation_count, | |
session_id=st.session_state.enhanced_agent.session_id[:8] + "...", | |
uptime=f"{(time.time() - st.session_state.get('session_start', time.time())) / 60:.0f}m" | |
)) | |
with footer_col3: | |
st.markdown(""" | |
### 🔧 System Status | |
- **Performance**: Excellent | |
- **Security**: Protected | |
- **Database**: Connected | |
- **Network**: Online | |
""") | |
st.markdown(""" | |
<div style='text-align: center; padding: 1rem; | |
background: linear-gradient(180deg, #2d3748 0%, #1a202c 100%); | |
color: white; border-radius: 5px; margin-top: 1rem;'> | |
<p><strong>Built with Streamlit | Powered by Advanced AI | Optimized for Performance</strong></p> | |
<p><small>Enhanced AI Systems | Intelligent | Secure | Scalable</small></p> | |
</div> | |
""", unsafe_allow_html=True) | |
if __name__ == "__main__": | |
main() |