import pandas as pd
import sqlite3
import io
from typing import Optional
class Processor:
def __init__(self):
self.conn: Optional[sqlite3.Connection] = None
self.dataset_name = None
self.load_timestamp = None
def load_csv(self, file) -> str:
"""Load CSV file with enhanced error handling and validation"""
if not file:
return "❌ No file provided. Please select a CSV file."
try:
# Try different methods to read the file
df = None
file_size = 0
# Check if it's a file path or file object
if hasattr(file, 'name') and hasattr(file, 'read'):
# It's a file object
file.seek(0) # Reset file pointer
content = file.read()
file_size = len(content)
# Convert bytes to string if needed
if isinstance(content, bytes):
content = content.decode('utf-8')
# Create StringIO object
csv_buffer = io.StringIO(content)
df = pd.read_csv(csv_buffer)
self.dataset_name = getattr(file, 'name', 'uploaded_data.csv')
else:
# It's likely a file path
df = pd.read_csv(file)
self.dataset_name = file
# Get file size
try:
import os
file_size = os.path.getsize(file)
except:
file_size = 0
# Validate the DataFrame
if df is None or df.empty:
return "❌ The CSV file appears to be empty. Please check your file."
# Check for reasonable size limits
if len(df) > 100000: # More than 100k rows
return f"⚠️ Large dataset detected ({len(df):,} rows). Consider using a smaller sample for better performance."
if len(df.columns) > 50: # More than 50 columns
return f"⚠️ Wide dataset detected ({len(df.columns)} columns). Some features may be limited."
# Clean column names (remove special characters, spaces)
original_columns = df.columns.tolist()
df.columns = [self._clean_column_name(col) for col in df.columns]
# Check for duplicate column names
if len(df.columns) != len(set(df.columns)):
return "❌ Duplicate column names detected. Please ensure all columns have unique names."
# Create in-memory SQLite database
self.conn = sqlite3.connect(":memory:", check_same_thread=False)
# Load data into SQLite
df.to_sql("data", self.conn, index=False, if_exists="replace")
# Store timestamp
from datetime import datetime
self.load_timestamp = datetime.now()
# Create success message with detailed info
file_size_mb = file_size / (1024 * 1024) if file_size > 0 else 0
success_msg = f"""
✅ CSV loaded successfully!
📊 {len(df):,} rows × {len(df.columns)} columns
📁 {file_size_mb:.2f} MB • Loaded at {self.load_timestamp.strftime('%H:%M:%S')}
🚀 Ready for queries!
"""
# Check for any column name changes
renamed_columns = [(orig, clean) for orig, clean in zip(original_columns, df.columns) if orig != clean]
if renamed_columns:
success_msg += f"
ℹ️ {len(renamed_columns)} column(s) renamed for compatibility"
return success_msg
except pd.errors.EmptyDataError:
return "❌ The CSV file is empty or contains no data."
except pd.errors.ParserError as e:
return f"❌ CSV parsing error: {str(e)}. Please check your file format."
except UnicodeDecodeError:
return "❌ File encoding error. Please ensure your CSV is in UTF-8 format."
except MemoryError:
return "❌ File too large to process. Please try with a smaller dataset."
except Exception as e:
return f"❌ Unexpected error loading CSV: {str(e)}"
def _clean_column_name(self, col_name: str) -> str:
"""Clean column names for SQL compatibility"""
if pd.isna(col_name):
return "unnamed_column"
# Convert to string and strip whitespace
clean_name = str(col_name).strip()
# Replace problematic characters
clean_name = clean_name.replace(' ', '_')
clean_name = clean_name.replace('-', '_')
clean_name = clean_name.replace('.', '_')
clean_name = clean_name.replace('(', '_')
clean_name = clean_name.replace(')', '_')
clean_name = clean_name.replace('[', '_')
clean_name = clean_name.replace(']', '_')
clean_name = clean_name.replace('/', '_')
clean_name = clean_name.replace('\\', '_')
clean_name = clean_name.replace('?', '_')
clean_name = clean_name.replace('!', '_')
clean_name = clean_name.replace('@', '_')
clean_name = clean_name.replace('#', '_')
clean_name = clean_name.replace(',', '_')
clean_name = clean_name.replace('%', '_')
clean_name = clean_name.replace('^', '_')
clean_name = clean_name.replace('&', '_')
clean_name = clean_name.replace('*', '_')
clean_name = clean_name.replace('+', '_')
clean_name = clean_name.replace('=', '_')
clean_name = clean_name.replace('|', '_')
clean_name = clean_name.replace(';', '_')
clean_name = clean_name.replace(':', '_')
clean_name = clean_name.replace('"', '_')
clean_name = clean_name.replace("'", '_')
clean_name = clean_name.replace('<', '_')
clean_name = clean_name.replace('>', '_')
clean_name = clean_name.replace(',', '_')
# Remove multiple underscores
while '__' in clean_name:
clean_name = clean_name.replace('__', '_')
# Remove leading/trailing underscores
clean_name = clean_name.strip('_')
# Ensure it's not empty
if not clean_name:
clean_name = "unnamed_column"
# Ensure it doesn't start with a number
if clean_name[0].isdigit():
clean_name = f"col_{clean_name}"
return clean_name
def get_connection(self):
"""Get the database connection"""
return self.conn
def get_dataset_info(self) -> dict:
"""Get information about the loaded dataset"""
if not self.conn:
return {}
try:
# Get table info
cursor = self.conn.cursor()
cursor.execute("PRAGMA table_info(data)")
columns_info = cursor.fetchall()
# Get row count
cursor.execute("SELECT COUNT(*) FROM data")
row_count = cursor.fetchone()[0]
return {
'name': self.dataset_name,
'rows': row_count,
'columns': [col[1] for col in columns_info],
'column_types': {col[1]: col[2] for col in columns_info},
'loaded_at': self.load_timestamp
}
except Exception:
return {}
@staticmethod
def card_html(content: str, kind: str = "info") -> str:
"""Generate enhanced HTML cards with modern styling"""
colors = {
"info": {
"border": "rgba(52, 152, 219, 0.4)",
"bg": "rgba(52, 152, 219, 0.05)",
"accent": "#3498db"
},
"success": {
"border": "rgba(46, 204, 113, 0.4)",
"bg": "rgba(46, 204, 113, 0.05)",
"accent": "#2ecc71"
},
"warning": {
"border": "rgba(241, 196, 15, 0.4)",
"bg": "rgba(241, 196, 15, 0.05)",
"accent": "#f1c40f"
},
"error": {
"border": "rgba(231, 76, 60, 0.4)",
"bg": "rgba(231, 76, 60, 0.05)",
"accent": "#e74c3c"
},
}
c = colors.get(kind, colors["info"])
# Add icon based on kind
icons = {
"info": "ℹ️",
"success": "✅",
"warning": "⚠️",
"error": "❌"
}
icon = icons.get(kind, "ℹ️")
return f"""