from processor import Processor
from model import Model
import pandas as pd
from datetime import datetime
class Controller:
def __init__(self):
self.processor = Processor()
self.model = None # will be initialized after CSV load
self.query_history = []
self.dataset_stats = {}
def load_csv(self, file) -> str:
"""Load CSV and initialize model"""
status = self.processor.load_csv(file)
if self.processor.get_connection():
self.model = Model(self.processor.get_connection())
self._update_dataset_stats()
return status
def _update_dataset_stats(self):
"""Update dataset statistics after loading"""
if self.model is None:
return
try:
# Get basic stats
df = pd.read_sql("SELECT * FROM data LIMIT 1000", self.processor.get_connection())
self.dataset_stats = {
'total_rows': len(pd.read_sql("SELECT COUNT(*) as count FROM data", self.processor.get_connection())),
'total_columns': len(df.columns),
'columns': list(df.columns),
'sample_data': df.head(3).to_dict('records') if not df.empty else [],
'dtypes': df.dtypes.to_dict() if not df.empty else {}
}
except Exception as e:
self.dataset_stats = {'error': str(e)}
def get_dataset_info(self) -> str:
"""Return formatted dataset information"""
if not self.dataset_stats:
return "
No dataset loaded
"
if 'error' in self.dataset_stats:
return f"Error loading dataset info: {self.dataset_stats['error']}
"
stats = self.dataset_stats
# Format column information
columns_html = ""
for col in stats.get('columns', [])[:10]: # Show first 10 columns
dtype = str(stats.get('dtypes', {}).get(col, 'unknown'))
columns_html += f"{col} ({dtype})"
if len(stats.get('columns', [])) > 10:
columns_html += f"... and {len(stats['columns']) - 10} more"
# Format sample data
sample_html = ""
for i, row in enumerate(stats.get('sample_data', [])[:2]): # Show first 2 rows
sample_html += f""
sample_html += f"Row {i+1}: "
# Show first 4 columns of each row
row_items = list(row.items())[:4]
for j, (key, value) in enumerate(row_items):
sample_html += f"{key}: {str(value)[:30]}"
if len(str(value)) > 30:
sample_html += "..."
if j < len(row_items) - 1:
sample_html += " | "
if len(row) > 4:
sample_html += " | ..."
sample_html += "
"
return f"""
{stats.get('total_rows', 0):,}
Total Rows
{stats.get('total_columns', 0)}
Columns
📋 Columns:
{columns_html}
👀 Sample Data:
{sample_html}
"""
def handle_question(self, question: str) -> str:
"""Handle question and return HTML result"""
if self.model is None:
return self.processor.card_html("❌ No CSV loaded yet. Please upload a CSV file first.", "error")
if not question.strip():
return self.processor.card_html("❓ Please enter a question about your data.", "warning")
# Generate SQL
sql = self.model.generate_sql(question)
if not sql:
return self.processor.card_html("⚠️ Could not generate SQL query. Try rephrasing your question.", "warning")
# Execute SQL
df = self.model.execute_sql(sql)
if df.empty:
return self.processor.card_html("🔍 Query executed successfully but returned no results. Try a different question.", "warning")
# Convert results to natural language
answer_text = self.model.results_to_natural_language(question, df)
# Add to history
self.query_history.append({
'question': question,
'sql': sql,
'timestamp': datetime.now().strftime("%H:%M:%S"),
'result_count': len(df)
})
# Format result with data table if applicable
result_html = self._format_enhanced_result(answer_text, df, sql)
return result_html
def handle_question_with_sql(self, question: str) -> tuple:
"""Handle question and return both result HTML and SQL query"""
if self.model is None:
return self.processor.card_html("❌ No CSV loaded yet. Please upload a CSV file first.", "error"), ""
if not question.strip():
return self.processor.card_html("❓ Please enter a question about your data.", "warning"), ""
# Generate SQL
sql = self.model.generate_sql(question)
if not sql:
return self.processor.card_html("⚠️ Could not generate SQL query. Try rephrasing your question.", "warning"), ""
# Execute SQL
df = self.model.execute_sql(sql)
if df.empty:
return self.processor.card_html("🔍 Query executed successfully but returned no results. Try a different question.", "warning"), sql
# Convert results to natural language
answer_text = self.model.results_to_natural_language(question, df)
# Add to history
self.query_history.append({
'question': question,
'sql': sql,
'timestamp': datetime.now().strftime("%H:%M:%S"),
'result_count': len(df)
})
# Format result with data table
result_html = self._format_enhanced_result(answer_text, df, sql)
return result_html, sql
def _format_enhanced_result(self, answer_text: str, df: pd.DataFrame, sql: str) -> str:
"""Format result with enhanced styling and data preview"""
# Create a small data table if results are tabular
table_html = ""
if len(df) > 0:
# Limit display to first 10 rows and reasonable number of columns
display_df = df.head(10)
if len(df.columns) > 6:
display_df = display_df.iloc[:, :6]
truncated_cols = True
else:
truncated_cols = False
table_html = f"""
📊 Data Preview
Showing {len(display_df)} of {len(df)} rows
{' (columns truncated)' if truncated_cols else ''}
{display_df.to_html(classes='table table-striped', table_id='results-table', escape=False, index=False)}
"""
# Stats about the result
stats_html = f"""
{len(df)} rows returned
{len(df.columns)} columns
Query executed successfully ✓
"""
full_content = f"""
💡 {answer_text}
{stats_html}
{table_html}
"""
return self.processor.card_html(full_content, "success")
def add_to_history(self, question: str) -> str:
"""Return formatted query history"""
if not self.query_history:
return "No queries yet
"
history_html = ""
# Show last 5 queries
recent_queries = self.query_history[-5:]
for i, query in enumerate(reversed(recent_queries)):
history_html += f"""
"{query['question'][:60]}{'...' if len(query['question']) > 60 else ''}"
{query['timestamp']} • {query['result_count']} results
"""
if len(self.query_history) > 5:
history_html += f"
... and {len(self.query_history) - 5} more queries
"
history_html += "
"
return history_html