from processor import Processor from model import Model import pandas as pd from datetime import datetime class Controller: def __init__(self): self.processor = Processor() self.model = None # will be initialized after CSV load self.query_history = [] self.dataset_stats = {} def load_csv(self, file) -> str: """Load CSV and initialize model""" status = self.processor.load_csv(file) if self.processor.get_connection(): self.model = Model(self.processor.get_connection()) self._update_dataset_stats() return status def _update_dataset_stats(self): """Update dataset statistics after loading""" if self.model is None: return try: # Get basic stats df = pd.read_sql("SELECT * FROM data LIMIT 1000", self.processor.get_connection()) self.dataset_stats = { 'total_rows': len(pd.read_sql("SELECT COUNT(*) as count FROM data", self.processor.get_connection())), 'total_columns': len(df.columns), 'columns': list(df.columns), 'sample_data': df.head(3).to_dict('records') if not df.empty else [], 'dtypes': df.dtypes.to_dict() if not df.empty else {} } except Exception as e: self.dataset_stats = {'error': str(e)} def get_dataset_info(self) -> str: """Return formatted dataset information""" if not self.dataset_stats: return "
No dataset loaded
" if 'error' in self.dataset_stats: return f"
Error loading dataset info: {self.dataset_stats['error']}
" stats = self.dataset_stats # Format column information columns_html = "" for col in stats.get('columns', [])[:10]: # Show first 10 columns dtype = str(stats.get('dtypes', {}).get(col, 'unknown')) columns_html += f"{col} ({dtype})" if len(stats.get('columns', [])) > 10: columns_html += f"... and {len(stats['columns']) - 10} more" # Format sample data sample_html = "" for i, row in enumerate(stats.get('sample_data', [])[:2]): # Show first 2 rows sample_html += f"
" sample_html += f"Row {i+1}: " # Show first 4 columns of each row row_items = list(row.items())[:4] for j, (key, value) in enumerate(row_items): sample_html += f"{key}: {str(value)[:30]}" if len(str(value)) > 30: sample_html += "..." if j < len(row_items) - 1: sample_html += " | " if len(row) > 4: sample_html += " | ..." sample_html += "
" return f"""
{stats.get('total_rows', 0):,}
Total Rows
{stats.get('total_columns', 0)}
Columns
Ready

📋 Columns:

{columns_html}

👀 Sample Data:

{sample_html}
""" def handle_question(self, question: str) -> str: """Handle question and return HTML result""" if self.model is None: return self.processor.card_html("❌ No CSV loaded yet. Please upload a CSV file first.", "error") if not question.strip(): return self.processor.card_html("❓ Please enter a question about your data.", "warning") # Generate SQL sql = self.model.generate_sql(question) if not sql: return self.processor.card_html("⚠️ Could not generate SQL query. Try rephrasing your question.", "warning") # Execute SQL df = self.model.execute_sql(sql) if df.empty: return self.processor.card_html("🔍 Query executed successfully but returned no results. Try a different question.", "warning") # Convert results to natural language answer_text = self.model.results_to_natural_language(question, df) # Add to history self.query_history.append({ 'question': question, 'sql': sql, 'timestamp': datetime.now().strftime("%H:%M:%S"), 'result_count': len(df) }) # Format result with data table if applicable result_html = self._format_enhanced_result(answer_text, df, sql) return result_html def handle_question_with_sql(self, question: str) -> tuple: """Handle question and return both result HTML and SQL query""" if self.model is None: return self.processor.card_html("❌ No CSV loaded yet. Please upload a CSV file first.", "error"), "" if not question.strip(): return self.processor.card_html("❓ Please enter a question about your data.", "warning"), "" # Generate SQL sql = self.model.generate_sql(question) if not sql: return self.processor.card_html("⚠️ Could not generate SQL query. Try rephrasing your question.", "warning"), "" # Execute SQL df = self.model.execute_sql(sql) if df.empty: return self.processor.card_html("🔍 Query executed successfully but returned no results. Try a different question.", "warning"), sql # Convert results to natural language answer_text = self.model.results_to_natural_language(question, df) # Add to history self.query_history.append({ 'question': question, 'sql': sql, 'timestamp': datetime.now().strftime("%H:%M:%S"), 'result_count': len(df) }) # Format result with data table result_html = self._format_enhanced_result(answer_text, df, sql) return result_html, sql def _format_enhanced_result(self, answer_text: str, df: pd.DataFrame, sql: str) -> str: """Format result with enhanced styling and data preview""" # Create a small data table if results are tabular table_html = "" if len(df) > 0: # Limit display to first 10 rows and reasonable number of columns display_df = df.head(10) if len(df.columns) > 6: display_df = display_df.iloc[:, :6] truncated_cols = True else: truncated_cols = False table_html = f"""

📊 Data Preview

Showing {len(display_df)} of {len(df)} rows {' (columns truncated)' if truncated_cols else ''}
{display_df.to_html(classes='table table-striped', table_id='results-table', escape=False, index=False)}
""" # Stats about the result stats_html = f"""
{len(df)} rows returned
{len(df.columns)} columns
Query executed successfully ✓
""" full_content = f"""
💡 {answer_text}
{stats_html} {table_html} """ return self.processor.card_html(full_content, "success") def add_to_history(self, question: str) -> str: """Return formatted query history""" if not self.query_history: return "
No queries yet
" history_html = "
" # Show last 5 queries recent_queries = self.query_history[-5:] for i, query in enumerate(reversed(recent_queries)): history_html += f"""
"{query['question'][:60]}{'...' if len(query['question']) > 60 else ''}"
{query['timestamp']} • {query['result_count']} results
""" if len(self.query_history) > 5: history_html += f"
... and {len(self.query_history) - 5} more queries
" history_html += "
" return history_html