import pandas as pd import sqlite3 import io from typing import Optional class Processor: def __init__(self): self.conn: Optional[sqlite3.Connection] = None self.dataset_name = None self.load_timestamp = None def load_csv(self, file) -> str: """Load CSV file with enhanced error handling and validation""" if not file: return "❌ No file provided. Please select a CSV file." try: # Try different methods to read the file df = None file_size = 0 # Check if it's a file path or file object if hasattr(file, 'name') and hasattr(file, 'read'): # It's a file object file.seek(0) # Reset file pointer content = file.read() file_size = len(content) # Convert bytes to string if needed if isinstance(content, bytes): content = content.decode('utf-8') # Create StringIO object csv_buffer = io.StringIO(content) df = pd.read_csv(csv_buffer) self.dataset_name = getattr(file, 'name', 'uploaded_data.csv') else: # It's likely a file path df = pd.read_csv(file) self.dataset_name = file # Get file size try: import os file_size = os.path.getsize(file) except: file_size = 0 # Validate the DataFrame if df is None or df.empty: return "❌ The CSV file appears to be empty. Please check your file." # Check for reasonable size limits if len(df) > 100000: # More than 100k rows return f"⚠️ Large dataset detected ({len(df):,} rows). Consider using a smaller sample for better performance." if len(df.columns) > 50: # More than 50 columns return f"⚠️ Wide dataset detected ({len(df.columns)} columns). Some features may be limited." # Clean column names (remove special characters, spaces) original_columns = df.columns.tolist() df.columns = [self._clean_column_name(col) for col in df.columns] # Check for duplicate column names if len(df.columns) != len(set(df.columns)): return "❌ Duplicate column names detected. Please ensure all columns have unique names." # Create in-memory SQLite database self.conn = sqlite3.connect(":memory:", check_same_thread=False) # Load data into SQLite df.to_sql("data", self.conn, index=False, if_exists="replace") # Store timestamp from datetime import datetime self.load_timestamp = datetime.now() # Create success message with detailed info file_size_mb = file_size / (1024 * 1024) if file_size > 0 else 0 success_msg = f""" ✅ CSV loaded successfully!
📊 {len(df):,} rows × {len(df.columns)} columns
📁 {file_size_mb:.2f} MB • Loaded at {self.load_timestamp.strftime('%H:%M:%S')}
🚀 Ready for queries! """ # Check for any column name changes renamed_columns = [(orig, clean) for orig, clean in zip(original_columns, df.columns) if orig != clean] if renamed_columns: success_msg += f"
ℹ️ {len(renamed_columns)} column(s) renamed for compatibility" return success_msg except pd.errors.EmptyDataError: return "❌ The CSV file is empty or contains no data." except pd.errors.ParserError as e: return f"❌ CSV parsing error: {str(e)}. Please check your file format." except UnicodeDecodeError: return "❌ File encoding error. Please ensure your CSV is in UTF-8 format." except MemoryError: return "❌ File too large to process. Please try with a smaller dataset." except Exception as e: return f"❌ Unexpected error loading CSV: {str(e)}" def _clean_column_name(self, col_name: str) -> str: """Clean column names for SQL compatibility""" if pd.isna(col_name): return "unnamed_column" # Convert to string and strip whitespace clean_name = str(col_name).strip() # Replace problematic characters clean_name = clean_name.replace(' ', '_') clean_name = clean_name.replace('-', '_') clean_name = clean_name.replace('.', '_') clean_name = clean_name.replace('(', '_') clean_name = clean_name.replace(')', '_') clean_name = clean_name.replace('[', '_') clean_name = clean_name.replace(']', '_') clean_name = clean_name.replace('/', '_') clean_name = clean_name.replace('\\', '_') clean_name = clean_name.replace('?', '_') clean_name = clean_name.replace('!', '_') clean_name = clean_name.replace('@', '_') clean_name = clean_name.replace('#', '_') clean_name = clean_name.replace(',', '_') clean_name = clean_name.replace('%', '_') clean_name = clean_name.replace('^', '_') clean_name = clean_name.replace('&', '_') clean_name = clean_name.replace('*', '_') clean_name = clean_name.replace('+', '_') clean_name = clean_name.replace('=', '_') clean_name = clean_name.replace('|', '_') clean_name = clean_name.replace(';', '_') clean_name = clean_name.replace(':', '_') clean_name = clean_name.replace('"', '_') clean_name = clean_name.replace("'", '_') clean_name = clean_name.replace('<', '_') clean_name = clean_name.replace('>', '_') clean_name = clean_name.replace(',', '_') # Remove multiple underscores while '__' in clean_name: clean_name = clean_name.replace('__', '_') # Remove leading/trailing underscores clean_name = clean_name.strip('_') # Ensure it's not empty if not clean_name: clean_name = "unnamed_column" # Ensure it doesn't start with a number if clean_name[0].isdigit(): clean_name = f"col_{clean_name}" return clean_name def get_connection(self): """Get the database connection""" return self.conn def get_dataset_info(self) -> dict: """Get information about the loaded dataset""" if not self.conn: return {} try: # Get table info cursor = self.conn.cursor() cursor.execute("PRAGMA table_info(data)") columns_info = cursor.fetchall() # Get row count cursor.execute("SELECT COUNT(*) FROM data") row_count = cursor.fetchone()[0] return { 'name': self.dataset_name, 'rows': row_count, 'columns': [col[1] for col in columns_info], 'column_types': {col[1]: col[2] for col in columns_info}, 'loaded_at': self.load_timestamp } except Exception: return {} @staticmethod def card_html(content: str, kind: str = "info") -> str: """Generate enhanced HTML cards with modern styling""" colors = { "info": { "border": "rgba(52, 152, 219, 0.4)", "bg": "rgba(52, 152, 219, 0.05)", "accent": "#3498db" }, "success": { "border": "rgba(46, 204, 113, 0.4)", "bg": "rgba(46, 204, 113, 0.05)", "accent": "#2ecc71" }, "warning": { "border": "rgba(241, 196, 15, 0.4)", "bg": "rgba(241, 196, 15, 0.05)", "accent": "#f1c40f" }, "error": { "border": "rgba(231, 76, 60, 0.4)", "bg": "rgba(231, 76, 60, 0.05)", "accent": "#e74c3c" }, } c = colors.get(kind, colors["info"]) # Add icon based on kind icons = { "info": "ℹ️", "success": "✅", "warning": "⚠️", "error": "❌" } icon = icons.get(kind, "ℹ️") return f"""
{icon}
{content}
""" @staticmethod def format_error(error_msg: str, suggestion: str = None) -> str: """Format error messages with helpful suggestions""" formatted_error = f"Error: {error_msg}" if suggestion: formatted_error += f"

💡 Suggestion: {suggestion}" return formatted_error @staticmethod def format_data_table(df: pd.DataFrame, max_rows: int = 10, max_cols: int = 8) -> str: """Format DataFrame as HTML table with styling""" if df.empty: return "
No data to display
" # Limit the display display_df = df.head(max_rows) if len(df.columns) > max_cols: display_df = display_df.iloc[:, :max_cols] truncated = True else: truncated = False # Convert to HTML table_html = display_df.to_html( classes="custom-table", escape=False, index=False, table_id="data-preview" ) # Add custom styling styled_table = f""" {table_html} """ if truncated or len(df) > max_rows: info_text = f"Showing {len(display_df)} of {len(df)} rows" if truncated: info_text += f", {max_cols} of {len(df.columns)} columns" styled_table += f"""
{info_text}
""" return styled_table