SPARKNOVA / data_engine.py
Tamannathakur's picture
Upload 7 files
75bedb4 verified
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import os
import tempfile
def clean_numeric(df):
df = df.copy()
for col in df.columns:
if pd.api.types.is_string_dtype(df[col]) or df[col].dtype == object:
s = df[col].astype(str).str.strip()
if s.str.contains("%", na=False).any():
numeric_vals = pd.to_numeric(s.str.replace("%", "", regex=False), errors="coerce")
if numeric_vals.notna().sum() / len(df) > 0.5:
df[col] = numeric_vals / 100.0
continue
cleaned = s.str.replace(",", "", regex=False).str.replace("₹", "", regex=False).str.replace("$", "", regex=False)
numeric_vals = pd.to_numeric(cleaned, errors="coerce")
if numeric_vals.notna().sum() / len(df) > 0.5:
df[col] = numeric_vals
return df
def run_analysis(analysis_type, selected_columns, uploaded_df):
if uploaded_df is None:
return "Please upload a dataset first.", None
if analysis_type == "None" or analysis_type is None:
return "", None
if 'title' in uploaded_df.columns:
title_nulls = uploaded_df['title'].isnull().sum()
print(f"DEBUG: Title column has {title_nulls} null values at analysis time")
whole_dataset_analyses = ["Summary", "Top 5 Rows", "Bottom 5 Rows", "Missing Values"]
if analysis_type in whole_dataset_analyses:
df_to_analyze = uploaded_df
else:
if not selected_columns:
return f"Please select columns for {analysis_type} analysis.", None
df_to_analyze = uploaded_df[selected_columns]
try:
if analysis_type == "Summary":
numeric_cols = uploaded_df.select_dtypes(include=[np.number]).columns
categorical_cols = uploaded_df.select_dtypes(include=['object', 'category']).columns
result = f"Dataset Summary:\nRows: {len(uploaded_df):,}\nColumns: {len(uploaded_df.columns)}\nNumeric Columns: {len(numeric_cols)}\nText Columns: {len(categorical_cols)}\n\n"
if len(numeric_cols) > 0:
result += "Numeric Columns: " + ", ".join(numeric_cols.tolist()) + "\n"
if len(categorical_cols) > 0:
result += "Text Columns: " + ", ".join(categorical_cols.tolist())
return result, None
elif analysis_type == "Describe":
result = "Column Description:\n" + "=" * 30 + "\n\n"
for col in selected_columns:
if col in df_to_analyze.columns:
result += f"Column: {col}\n"
if pd.api.types.is_numeric_dtype(df_to_analyze[col]):
stats = df_to_analyze[col].describe()
result += f" Type: Numeric\n Count: {stats['count']:.0f}\n Mean: {stats['mean']:.3f}\n Std: {stats['std']:.3f}\n Min: {stats['min']:.3f}\n 25%: {stats['25%']:.3f}\n 50%: {stats['50%']:.3f}\n 75%: {stats['75%']:.3f}\n Max: {stats['max']:.3f}\n\n"
else:
unique_count = df_to_analyze[col].nunique()
null_count = df_to_analyze[col].isnull().sum()
most_common = df_to_analyze[col].mode().iloc[0] if len(df_to_analyze[col].mode()) > 0 else "N/A"
result += f" Type: Categorical/Text\n Unique Values: {unique_count}\n Missing Values: {null_count}\n Most Common: {most_common}\n"
top_values = df_to_analyze[col].value_counts().head(5)
result += " Top Values:\n"
for val, count in top_values.items():
result += f" {val}: {count} times\n"
result += "\n"
return result, None
elif analysis_type == "Top 5 Rows":
return "Top 5 Rows - See data table below", df_to_analyze.head(5)
elif analysis_type == "Bottom 5 Rows":
return "Bottom 5 Rows - See data table below", df_to_analyze.tail(5)
elif analysis_type == "Missing Values":
result = "Missing Values Analysis:\n" + "=" * 30 + "\n\n"
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
for col in uploaded_df.columns:
nan_count = uploaded_df[col].isnull().sum()
pseudo_missing_count = 0
non_null_data = uploaded_df[col].dropna()
if len(non_null_data) > 0:
col_str = non_null_data.astype(str).str.strip()
empty_count = (col_str == '').sum()
pattern_count = 0
for pattern in patterns:
if pattern != '':
pattern_count += (col_str.str.lower() == pattern.lower()).sum()
pseudo_missing_count = empty_count + pattern_count
total_missing = nan_count + pseudo_missing_count
missing_percent = (total_missing / len(uploaded_df)) * 100
if col == 'title':
print(f"DEBUG: Title analysis - NaN: {nan_count}, Pseudo: {pseudo_missing_count}, Total: {total_missing}")
if total_missing > 0:
details = []
if nan_count > 0:
details.append(f"{nan_count} NaN")
if pseudo_missing_count > 0:
details.append(f"{pseudo_missing_count} text-missing")
detail_str = f" ({', '.join(details)})"
else:
detail_str = ""
result += f"{col}: {total_missing} missing ({missing_percent:.2f}%){detail_str}\n"
return result, None
elif analysis_type == "Highest Correlation":
numeric_cols = df_to_analyze.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return "Need at least 2 numeric columns for correlation analysis.", None
corr_matrix = df_to_analyze[numeric_cols].corr()
result = "Highest Correlations:\n" + "=" * 25 + "\n\n"
correlations = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
col1, col2 = corr_matrix.columns[i], corr_matrix.columns[j]
corr_val = corr_matrix.iloc[i, j]
correlations.append((abs(corr_val), col1, col2, corr_val))
correlations.sort(reverse=True)
for _, col1, col2, corr_val in correlations[:10]:
result += f"{col1}{col2}: {corr_val:.3f}\n"
return result, None
elif analysis_type == "Group & Aggregate":
if not selected_columns:
result = "Please select columns for grouping and aggregation."
else:
categorical_cols = [col for col in selected_columns if not pd.api.types.is_numeric_dtype(df_to_analyze[col])]
numeric_cols = [col for col in selected_columns if pd.api.types.is_numeric_dtype(df_to_analyze[col])]
if categorical_cols and numeric_cols:
group_col = categorical_cols[0]
agg_col = numeric_cols[0]
grouped = df_to_analyze.groupby(group_col)[agg_col].agg(['count', 'mean', 'sum']).round(2)
result = f"Group & Aggregate Analysis:\n" + "=" * 35 + "\n\n"
result += f"Grouped by: {group_col}\nAggregated: {agg_col}\n\n"
result += grouped.to_string()
elif categorical_cols:
group_col = categorical_cols[0]
grouped = df_to_analyze[group_col].value_counts()
result = f"Group Count Analysis:\n" + "=" * 25 + "\n\n"
result += grouped.to_string()
else:
result = "Please select at least one categorical column for grouping."
return result, None
elif analysis_type == "Calculate Expressions":
numeric_cols = df_to_analyze.select_dtypes(include=[np.number]).columns
if len(numeric_cols) >= 2:
col1, col2 = numeric_cols[0], numeric_cols[1]
df_calc = df_to_analyze.copy()
df_calc['Sum'] = df_calc[col1] + df_calc[col2]
df_calc['Difference'] = df_calc[col1] - df_calc[col2]
result = f"Calculated Expressions:\n" + "=" * 30 + "\n\n"
result += f"Using columns: {col1} and {col2}\n\n"
result += f"New calculated columns:\nSum = {col1} + {col2}\nDifference = {col1} - {col2}\n\n"
result += "Sample results:\n"
result += df_calc[['Sum', 'Difference']].head().to_string()
else:
result = "Need at least 2 numeric columns for calculations."
return result, None
else:
return f"Analysis type '{analysis_type}' is under development.", None
except Exception as e:
return f"Error in analysis: {str(e)}", None
def create_chart_explanation(viz_type, df_to_plot, selected_columns, fig_data=None):
try:
if viz_type == "Bar Chart" and len(selected_columns) >= 2:
x_col, y_col = selected_columns[0], selected_columns[1]
if pd.api.types.is_numeric_dtype(df_to_plot[y_col]):
max_val_idx = df_to_plot[y_col].idxmax()
max_category = df_to_plot.loc[max_val_idx, x_col]
max_value = df_to_plot[y_col].max()
y_mean = df_to_plot[y_col].mean()
else:
grouped = df_to_plot.groupby(x_col)[y_col].count()
max_category = grouped.idxmax()
max_value = grouped.max()
y_mean = grouped.mean()
return f"BAR CHART: {y_col} by {x_col}\nHighest: {max_category} ({max_value:.2f})\nAverage: {y_mean:.2f}\nCategories: {df_to_plot[x_col].nunique()}"
elif viz_type == "Line Chart" and fig_data is not None:
max_combo = fig_data.loc[fig_data['Count'].idxmax()]
min_combo = fig_data.loc[fig_data['Count'].idxmin()]
return f"LINE CHART: Distribution\nHighest: {max_combo[selected_columns[1]]} in {max_combo[selected_columns[0]]} ({max_combo['Count']})\nLowest: {min_combo[selected_columns[1]]} in {min_combo[selected_columns[0]]} ({min_combo['Count']})\nTotal: {len(df_to_plot)}"
except:
pass
return f"{viz_type} visualization\nShows data patterns and relationships"
def create_visualization(viz_type, selected_columns, uploaded_df):
if uploaded_df is None or viz_type == "None":
return None, "", None
if not selected_columns:
return None, "Please select columns for visualization.", None
df_to_plot = uploaded_df[selected_columns]
try:
if viz_type == "Bar Chart":
if len(selected_columns) >= 2:
x_col, y_col = selected_columns[0], selected_columns[1]
color_col = selected_columns[2] if len(selected_columns) > 2 else None
# Handle different data type combinations
if pd.api.types.is_numeric_dtype(df_to_plot[y_col]):
# Numeric Y-axis: use as-is
plot_data = df_to_plot.head(100)
fig = px.bar(plot_data, x=x_col, y=y_col, color=color_col, title=f"{y_col} by {x_col}")
else:
# Non-numeric Y-axis: count occurrences
if pd.api.types.is_numeric_dtype(df_to_plot[x_col]):
# If X is numeric, group and count Y values
grouped = df_to_plot.groupby(x_col)[y_col].count().reset_index()
grouped.columns = [x_col, f'Count of {y_col}']
fig = px.bar(grouped, x=x_col, y=f'Count of {y_col}', title=f"Count of {y_col} by {x_col}")
else:
# Both categorical: cross-tabulation
crosstab = pd.crosstab(df_to_plot[x_col], df_to_plot[y_col])
crosstab_reset = crosstab.reset_index().melt(id_vars=[x_col], var_name=y_col, value_name='Count')
fig = px.bar(crosstab_reset, x=x_col, y='Count', color=y_col, title=f"{y_col} distribution by {x_col}")
explanation = create_chart_explanation(viz_type, df_to_plot, selected_columns)
else:
col = selected_columns[0]
if pd.api.types.is_numeric_dtype(df_to_plot[col]):
fig = px.histogram(df_to_plot, x=col, title=f"Distribution of {col}")
else:
value_counts = df_to_plot[col].value_counts().head(15)
fig = px.bar(x=value_counts.index, y=value_counts.values, title=f"Top Values in {col}")
explanation = f"Chart showing distribution of {col}"
fig.update_layout(width=800, height=500)
return fig, explanation, fig
elif viz_type == "Pie Chart":
col = selected_columns[0]
if len(selected_columns) >= 2 and pd.api.types.is_numeric_dtype(df_to_plot[selected_columns[1]]):
grouped_data = df_to_plot.groupby(col)[selected_columns[1]].sum().reset_index()
fig = px.pie(grouped_data, values=selected_columns[1], names=col, title=f"Total {selected_columns[1]} by {col}")
legend_title = f"{col} Categories"
else:
value_counts = df_to_plot[col].value_counts().head(10)
fig = px.pie(values=value_counts.values, names=value_counts.index, title=f"Distribution of {col}")
legend_title = f"{col} Values"
fig.update_layout(
width=800,
height=500,
showlegend=True,
legend=dict(
title=dict(text=legend_title, font=dict(size=14, color="black")),
orientation="v",
yanchor="middle",
y=0.5,
xanchor="left",
x=1.05,
font=dict(size=12)
)
)
explanation = f"PIE CHART: {col} Distribution\nShows proportion of each category\nUse to understand category distribution patterns"
return fig, explanation, fig
elif viz_type == "Scatter Plot":
if len(selected_columns) >= 2:
x_col, y_col = selected_columns[0], selected_columns[1]
color_col = selected_columns[2] if len(selected_columns) > 2 else None
# Check if both columns are suitable for scatter plot
if not (pd.api.types.is_numeric_dtype(df_to_plot[x_col]) and pd.api.types.is_numeric_dtype(df_to_plot[y_col])):
return None, f"Scatter plot requires numeric data. {x_col} and {y_col} must be numeric.", None
fig = px.scatter(df_to_plot, x=x_col, y=y_col, color=color_col, title=f"{y_col} vs {x_col}")
explanation = f"Scatter plot showing relationship between {x_col} and {y_col}"
else:
return None, "Scatter plot requires at least 2 columns.", None
fig.update_layout(width=800, height=500)
return fig, explanation, fig
elif viz_type == "Line Chart":
if len(selected_columns) >= 2:
x_col, y_col = selected_columns[0], selected_columns[1]
if pd.api.types.is_numeric_dtype(df_to_plot[y_col]):
# Numeric Y: sort by X and plot trend
sorted_data = df_to_plot.sort_values(x_col)
fig = px.line(sorted_data, x=x_col, y=y_col, title=f"Trend of {y_col} over {x_col}", markers=True)
explanation = f"Line chart showing trend of {y_col} over {x_col}"
else:
# Non-numeric Y: create cross-tabulation
crosstab = pd.crosstab(df_to_plot[x_col], df_to_plot[y_col])
melted = pd.melt(crosstab.reset_index(), id_vars=[x_col], var_name=y_col, value_name='Count')
fig = px.line(melted, x=x_col, y='Count', color=y_col, title=f"Distribution of {y_col} across {x_col}", markers=True)
explanation = create_chart_explanation(viz_type, df_to_plot, selected_columns, melted)
else:
return None, "Line chart requires at least 2 columns.", None
fig.update_layout(width=800, height=500)
return fig, explanation, fig
elif viz_type == "Histogram":
col = selected_columns[0]
if pd.api.types.is_numeric_dtype(df_to_plot[col]):
fig = px.histogram(df_to_plot, x=col, title=f"Distribution of {col}", nbins=30)
explanation = f"Histogram showing distribution of {col}"
else:
return None, f"Histogram requires numeric data. Try Bar Chart instead.", None
fig.update_layout(width=800, height=500)
return fig, explanation, fig
elif viz_type == "Heat Map":
if len(selected_columns) >= 2:
numeric_cols = [col for col in selected_columns if pd.api.types.is_numeric_dtype(df_to_plot[col])]
if len(numeric_cols) >= 2:
corr_matrix = df_to_plot[numeric_cols].corr()
fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", title="Correlation Heatmap", color_continuous_scale='RdBu')
explanation = f"Heatmap showing correlations between numeric columns"
else:
x_col, y_col = selected_columns[0], selected_columns[1]
crosstab = pd.crosstab(df_to_plot[x_col], df_to_plot[y_col])
fig = px.imshow(crosstab.values, x=crosstab.columns, y=crosstab.index, text_auto=True, aspect="auto", title=f"Cross-tabulation: {y_col} vs {x_col}")
explanation = f"Heatmap showing cross-tabulation between {x_col} and {y_col}"
else:
return None, "Heat map requires at least 2 columns.", None
fig.update_layout(width=800, height=500)
return fig, explanation, fig
elif viz_type == "Box Plot":
if len(selected_columns) >= 1:
y_col = selected_columns[0]
if not pd.api.types.is_numeric_dtype(df_to_plot[y_col]):
return None, f"Box plot requires numeric Y-axis. {y_col} is not numeric.", None
x_col = selected_columns[1] if len(selected_columns) > 1 else None
fig = px.box(df_to_plot, x=x_col, y=y_col, title=f"Box Plot of {y_col}" + (f" by {x_col}" if x_col else ""))
explanation = f"Box plot showing distribution of {y_col}" + (f" grouped by {x_col}" if x_col else "")
else:
return None, "Box plot requires at least 1 column.", None
fig.update_layout(width=800, height=500)
return fig, explanation, fig
else:
return None, f"Visualization type '{viz_type}' is under development.", None
except Exception as e:
return None, f"Error creating visualization: {str(e)}", None
def handle_missing_data(method, selected_columns, constant_value, uploaded_df, change_history):
print(f"DEBUG: Starting {method} on columns {selected_columns}")
if uploaded_df is None:
return "Please upload a dataset first.", uploaded_df, change_history
if method == "None":
return "", uploaded_df, change_history
if not selected_columns:
return "Please select columns to apply data handling.", uploaded_df, change_history
try:
change_history.append(uploaded_df.copy())
df_copy = uploaded_df.copy()
if method == "Clean All Missing":
return "Clean All Missing is not available", uploaded_df, change_history
processed_columns = []
dropped_columns = []
for col in selected_columns:
if col not in df_copy.columns:
continue
if method == "Forward Fill":
if col == 'title':
print(f"DEBUG: Skipping title column due to data inconsistencies")
continue
if df_copy[col].dtype == 'object':
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
for pattern in patterns:
df_copy[col] = df_copy[col].replace(pattern, np.nan)
df_copy[col] = df_copy[col].replace('', np.nan)
df_copy[col] = df_copy[col].ffill()
processed_columns.append(col)
elif method == "Backward Fill":
if df_copy[col].dtype == 'object':
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
for pattern in patterns:
df_copy[col] = df_copy[col].replace(pattern, np.nan)
df_copy[col] = df_copy[col].replace('', np.nan)
df_copy[col] = df_copy[col].bfill()
processed_columns.append(col)
elif method == "Constant Fill":
if df_copy[col].dtype == 'object':
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
for pattern in patterns:
df_copy[col] = df_copy[col].replace(pattern, np.nan)
df_copy[col] = df_copy[col].replace('', np.nan)
fill_val = constant_value.strip() if constant_value else "Unknown"
df_copy[col] = df_copy[col].fillna(fill_val)
processed_columns.append(col)
elif method == "Mean Fill":
if pd.api.types.is_numeric_dtype(df_copy[col]):
if not df_copy[col].isna().all():
mean_val = df_copy[col].mean()
df_copy[col] = df_copy[col].fillna(mean_val)
processed_columns.append(col)
else:
numeric_col = pd.to_numeric(df_copy[col], errors='coerce')
if not numeric_col.isna().all():
mean_val = numeric_col.mean()
df_copy[col] = numeric_col.fillna(mean_val)
processed_columns.append(col)
elif method == "Median Fill":
if pd.api.types.is_numeric_dtype(df_copy[col]):
if not df_copy[col].isna().all():
median_val = df_copy[col].median()
df_copy[col] = df_copy[col].fillna(median_val)
processed_columns.append(col)
else:
numeric_col = pd.to_numeric(df_copy[col], errors='coerce')
if not numeric_col.isna().all():
median_val = numeric_col.median()
df_copy[col] = numeric_col.fillna(median_val)
processed_columns.append(col)
elif method == "Mode Fill":
patterns = ['UNKNOWN', 'unknown', 'ERROR', 'error', 'NULL', 'null', 'NA', 'na', 'N/A',
'Not Given', 'not given', 'NOT GIVEN', '', ' ', '-', '?', 'NaN', 'nan',
'None', 'none', 'NONE', '#N/A', 'n/a', 'N.A.', 'n.a.']
valid_values = df_copy[col][~df_copy[col].isin(patterns) & df_copy[col].notna()]
if len(valid_values) > 0:
mode_value = valid_values.mode()
if len(mode_value) > 0:
most_common = mode_value.iloc[0]
print(f"DEBUG: Mode Fill - Most common value for {col}: {most_common}")
for pattern in patterns:
df_copy[col] = df_copy[col].replace(pattern, most_common)
df_copy[col] = df_copy[col].fillna(most_common)
processed_columns.append(col)
elif method == "Drop Columns":
df_copy = df_copy.drop(columns=[col])
dropped_columns.append(col)
uploaded_df = df_copy
remaining_cols = [col for col in selected_columns if col not in dropped_columns]
if 'title' in uploaded_df.columns:
title_check = uploaded_df['title'].astype(str).str.contains('UNKNOWN', case=False, na=False).sum()
print(f"DEBUG: After update, title has {title_check} UNKNOWN values")
if processed_columns:
result = f"Applied {method} to: {', '.join(processed_columns)}"
for col in processed_columns:
if col in uploaded_df.columns:
after_missing = uploaded_df[col].isnull().sum()
result += f"\n- {col}: {after_missing} missing values remaining"
elif dropped_columns:
result = f"Dropped columns: {', '.join(dropped_columns)}"
else:
result = "No columns processed - check column selection or data types"
return result, uploaded_df, change_history
except Exception as e:
return f"Error: {str(e)}", uploaded_df, change_history
def undo_last_change(uploaded_df, change_history):
if not change_history:
return "No changes to undo.", uploaded_df, change_history
uploaded_df = change_history.pop()
return f"Undid last change. Dataset now has {uploaded_df.shape[0]} rows × {uploaded_df.shape[1]} columns", uploaded_df, change_history
def undo_all_changes(original_df, change_history):
if original_df is None:
return "No original dataset to restore.", None, change_history
uploaded_df = original_df.copy()
change_history = []
return f"Dataset restored to original state ({uploaded_df.shape[0]} rows × {uploaded_df.shape[1]} columns)", uploaded_df, change_history
def download_dataset(uploaded_df, dataset_name):
if uploaded_df is None:
return None
if dataset_name:
base_name = dataset_name.replace('.csv', '').replace('.xlsx', '').replace('.xls', '')
filename = f"{base_name}_modified.csv"
else:
filename = "modified_dataset.csv"
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
uploaded_df.to_csv(filepath, index=False)
return filepath
def display_data_format(format_type, selected_columns, uploaded_df):
if uploaded_df is None or format_type == "None":
return None
if selected_columns and len(selected_columns) > 0:
df_to_show = uploaded_df[selected_columns]
else:
df_to_show = uploaded_df
return df_to_show.head(100) if format_type == "DataFrame" else None
def display_text_format(format_type, selected_columns, uploaded_df):
if uploaded_df is None or format_type == "None":
return ""
if selected_columns and len(selected_columns) > 0:
df_to_show = uploaded_df[selected_columns]
else:
df_to_show = uploaded_df
if format_type == "JSON":
return df_to_show.head(20).to_json(orient='records', indent=2)
elif format_type == "Dictionary":
return str(df_to_show.head(20).to_dict(orient='records'))