|
|
""" |
|
|
EDA Analysis Module |
|
|
|
|
|
This module handles all dataset processing and analysis, providing structured information |
|
|
about the dataset that can be used for visualization and LLM prompting. |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from typing import Dict, List, Tuple, Any, Optional |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from io import BytesIO |
|
|
import base64 |
|
|
|
|
|
class DatasetAnalyzer: |
|
|
"""Class for analyzing datasets and extracting key information""" |
|
|
|
|
|
def __init__(self, df: pd.DataFrame = None): |
|
|
"""Initialize with an optional dataframe""" |
|
|
self.df = df |
|
|
self.analysis_results = {} |
|
|
|
|
|
def load_dataframe(self, df: pd.DataFrame) -> None: |
|
|
"""Load a dataframe for analysis""" |
|
|
self.df = df |
|
|
|
|
|
self.analysis_results = {} |
|
|
|
|
|
def analyze_dataset(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform comprehensive analysis on the dataset |
|
|
|
|
|
Returns: |
|
|
Dict: Dictionary containing all analysis results |
|
|
""" |
|
|
if self.df is None: |
|
|
raise ValueError("No dataframe loaded. Please load a dataframe first.") |
|
|
|
|
|
|
|
|
self.analysis_results["shape"] = self.df.shape |
|
|
self.analysis_results["columns"] = list(self.df.columns) |
|
|
self.analysis_results["dtypes"] = {col: str(self.df[col].dtype) for col in self.df.columns} |
|
|
|
|
|
|
|
|
self.analysis_results["missing_values"] = self._analyze_missing_values() |
|
|
|
|
|
|
|
|
self.analysis_results["basic_stats"] = self._generate_basic_stats() |
|
|
|
|
|
|
|
|
self.analysis_results["correlations"] = self._analyze_correlations() |
|
|
|
|
|
|
|
|
self.analysis_results["sample_data"] = self.df.head().to_string() |
|
|
|
|
|
|
|
|
self.analysis_results["categorical_columns"] = self._identify_categorical_columns() |
|
|
self.analysis_results["numerical_columns"] = self._identify_numerical_columns() |
|
|
self.analysis_results["unique_values"] = self._count_unique_values() |
|
|
|
|
|
return self.analysis_results |
|
|
|
|
|
def _analyze_missing_values(self) -> Dict[str, Tuple[int, float]]: |
|
|
""" |
|
|
Analyze missing values in the dataset |
|
|
|
|
|
Returns: |
|
|
Dict: Column names as keys, tuples of (count, percentage) as values |
|
|
""" |
|
|
missing_values = {} |
|
|
for col in self.df.columns: |
|
|
count = self.df[col].isna().sum() |
|
|
percentage = round((count / len(self.df)) * 100, 2) |
|
|
missing_values[col] = (count, percentage) |
|
|
|
|
|
return missing_values |
|
|
|
|
|
def _generate_basic_stats(self) -> str: |
|
|
""" |
|
|
Generate basic statistics for the dataset |
|
|
|
|
|
Returns: |
|
|
str: String representation of basic statistics |
|
|
""" |
|
|
|
|
|
num_stats = self.df.describe().to_string() |
|
|
|
|
|
|
|
|
cat_columns = self._identify_categorical_columns() |
|
|
cat_stats = "" |
|
|
if cat_columns: |
|
|
cat_stats = "\n\nCategorical columns statistics:\n" |
|
|
for col in cat_columns: |
|
|
value_counts = self.df[col].value_counts().head(10) |
|
|
cat_stats += f"\n{col} - Top values:\n{value_counts.to_string()}\n" |
|
|
|
|
|
return num_stats + cat_stats |
|
|
|
|
|
def _analyze_correlations(self) -> str: |
|
|
""" |
|
|
Analyze correlations between numerical features |
|
|
|
|
|
Returns: |
|
|
str: String representation of top correlations |
|
|
""" |
|
|
num_columns = self._identify_numerical_columns() |
|
|
|
|
|
if not num_columns or len(num_columns) < 2: |
|
|
return "Not enough numerical columns for correlation analysis." |
|
|
|
|
|
corr_matrix = self.df[num_columns].corr() |
|
|
|
|
|
|
|
|
corr_pairs = [] |
|
|
for i in range(len(num_columns)): |
|
|
for j in range(i+1, len(num_columns)): |
|
|
col1, col2 = num_columns[i], num_columns[j] |
|
|
corr_value = corr_matrix.loc[col1, col2] |
|
|
if not np.isnan(corr_value): |
|
|
corr_pairs.append((col1, col2, corr_value)) |
|
|
|
|
|
|
|
|
corr_pairs.sort(key=lambda x: abs(x[2]), reverse=True) |
|
|
|
|
|
|
|
|
result = "Top correlations:\n" |
|
|
for col1, col2, corr in corr_pairs[:10]: |
|
|
result += f"{col1} -- {col2}: {corr:.4f}\n" |
|
|
|
|
|
return result |
|
|
|
|
|
def _identify_categorical_columns(self) -> List[str]: |
|
|
""" |
|
|
Identify categorical columns in the dataset |
|
|
|
|
|
Returns: |
|
|
List[str]: List of categorical column names |
|
|
""" |
|
|
cat_columns = [] |
|
|
for col in self.df.columns: |
|
|
|
|
|
if self.df[col].dtype == 'object' or self.df[col].dtype == 'category' or self.df[col].dtype == 'bool': |
|
|
cat_columns.append(col) |
|
|
|
|
|
elif (self.df[col].dtype == 'int64' or self.df[col].dtype == 'float64') and \ |
|
|
self.df[col].nunique() < 10 and self.df[col].nunique() / len(self.df) < 0.05: |
|
|
cat_columns.append(col) |
|
|
|
|
|
return cat_columns |
|
|
|
|
|
def _identify_numerical_columns(self) -> List[str]: |
|
|
""" |
|
|
Identify numerical columns in the dataset |
|
|
|
|
|
Returns: |
|
|
List[str]: List of numerical column names |
|
|
""" |
|
|
num_columns = [] |
|
|
cat_columns = self._identify_categorical_columns() |
|
|
|
|
|
for col in self.df.columns: |
|
|
if col not in cat_columns and pd.api.types.is_numeric_dtype(self.df[col].dtype): |
|
|
num_columns.append(col) |
|
|
|
|
|
return num_columns |
|
|
|
|
|
def _count_unique_values(self) -> Dict[str, int]: |
|
|
""" |
|
|
Count unique values for each column |
|
|
|
|
|
Returns: |
|
|
Dict: Column names as keys, unique count as values |
|
|
""" |
|
|
return {col: self.df[col].nunique() for col in self.df.columns} |
|
|
|
|
|
def generate_eda_visualizations(self) -> Dict[str, str]: |
|
|
""" |
|
|
Generate common EDA visualizations |
|
|
|
|
|
Returns: |
|
|
Dict: Dictionary of visualization titles and their base64-encoded images |
|
|
""" |
|
|
if self.df is None: |
|
|
raise ValueError("No dataframe loaded. Please load a dataframe first.") |
|
|
|
|
|
visualizations = {} |
|
|
|
|
|
|
|
|
visualizations["missing_values_heatmap"] = self._plot_missing_values() |
|
|
|
|
|
|
|
|
num_columns = self._identify_numerical_columns() |
|
|
for i, col in enumerate(num_columns[:5]): |
|
|
visualizations[f"distribution_{col}"] = self._plot_distribution(col) |
|
|
|
|
|
|
|
|
visualizations["correlation_heatmap"] = self._plot_correlation_heatmap() |
|
|
|
|
|
|
|
|
cat_columns = self._identify_categorical_columns() |
|
|
for i, col in enumerate(cat_columns[:5]): |
|
|
visualizations[f"categorical_{col}"] = self._plot_categorical_distribution(col) |
|
|
|
|
|
|
|
|
if len(num_columns) >= 2: |
|
|
visualizations["scatter_plot"] = self._plot_scatter_correlation() |
|
|
|
|
|
return visualizations |
|
|
|
|
|
def _plot_missing_values(self) -> str: |
|
|
"""Generate missing values heatmap""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
sns.heatmap(self.df.isnull(), cmap='viridis', yticklabels=False, cbar=True, cbar_kws={'label': 'Missing Data'}) |
|
|
plt.tight_layout() |
|
|
plt.title('Missing Values Heatmap') |
|
|
|
|
|
|
|
|
return self._fig_to_base64(plt.gcf()) |
|
|
|
|
|
def _plot_distribution(self, column: str) -> str: |
|
|
"""Generate distribution plot for a numerical column""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
|
|
|
|
|
sns.histplot(data=self.df, x=column, kde=True) |
|
|
|
|
|
plt.title(f'Distribution of {column}') |
|
|
plt.xlabel(column) |
|
|
plt.ylabel('Frequency') |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
return self._fig_to_base64(plt.gcf()) |
|
|
|
|
|
def _plot_correlation_heatmap(self) -> str: |
|
|
"""Generate correlation heatmap""" |
|
|
num_columns = self._identify_numerical_columns() |
|
|
|
|
|
if not num_columns or len(num_columns) < 2: |
|
|
return "" |
|
|
|
|
|
plt.figure(figsize=(12, 10)) |
|
|
corr_matrix = self.df[num_columns].corr() |
|
|
mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) |
|
|
|
|
|
|
|
|
cmap = sns.diverging_palette(230, 20, as_cmap=True) |
|
|
|
|
|
|
|
|
sns.heatmap(corr_matrix, mask=mask, cmap=cmap, vmax=1, vmin=-1, center=0, |
|
|
square=True, linewidths=.5, annot=True, fmt=".2f") |
|
|
|
|
|
plt.title('Correlation Heatmap') |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
return self._fig_to_base64(plt.gcf()) |
|
|
|
|
|
def _plot_categorical_distribution(self, column: str) -> str: |
|
|
"""Generate bar plot for categorical column""" |
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
|
|
|
|
|
value_counts = self.df[column].value_counts() |
|
|
if len(value_counts) > 10: |
|
|
|
|
|
top_categories = value_counts.nlargest(9).index |
|
|
data = self.df.copy() |
|
|
data[column] = data[column].apply(lambda x: x if x in top_categories else 'Other') |
|
|
sns.countplot(y=column, data=data, order=data[column].value_counts().index) |
|
|
else: |
|
|
sns.countplot(y=column, data=self.df, order=value_counts.index) |
|
|
|
|
|
plt.title(f'Distribution of {column}') |
|
|
plt.xlabel('Count') |
|
|
plt.ylabel(column) |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
return self._fig_to_base64(plt.gcf()) |
|
|
|
|
|
def _plot_scatter_correlation(self) -> str: |
|
|
"""Generate scatter plot of two most correlated features""" |
|
|
num_columns = self._identify_numerical_columns() |
|
|
|
|
|
if not num_columns or len(num_columns) < 2: |
|
|
return "" |
|
|
|
|
|
|
|
|
corr_matrix = self.df[num_columns].corr().abs() |
|
|
|
|
|
|
|
|
mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) |
|
|
corr_matrix = corr_matrix.mask(mask) |
|
|
|
|
|
|
|
|
max_corr = corr_matrix.max().max() |
|
|
max_corr_idx = corr_matrix.stack().idxmax() |
|
|
|
|
|
if pd.isna(max_corr): |
|
|
return "" |
|
|
|
|
|
|
|
|
col1, col2 = max_corr_idx |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
|
|
|
|
|
sns.regplot(x=col1, y=col2, data=self.df, scatter_kws={'alpha': 0.5}) |
|
|
|
|
|
plt.title(f'Scatter plot of {col1} vs {col2} (correlation: {corr_matrix.loc[col1, col2]:.2f})') |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
return self._fig_to_base64(plt.gcf()) |
|
|
|
|
|
def _fig_to_base64(self, fig) -> str: |
|
|
"""Convert matplotlib figure to base64 string""" |
|
|
buf = BytesIO() |
|
|
fig.savefig(buf, format='png', bbox_inches='tight') |
|
|
buf.seek(0) |
|
|
img_str = base64.b64encode(buf.read()).decode('utf-8') |
|
|
plt.close(fig) |
|
|
return img_str |
|
|
|
|
|
def suggest_data_preprocessing(self) -> Dict[str, List[str]]: |
|
|
""" |
|
|
Suggest preprocessing steps based on dataset analysis |
|
|
|
|
|
Returns: |
|
|
Dict: Dictionary of preprocessing suggestions for each column type |
|
|
""" |
|
|
if not self.analysis_results: |
|
|
self.analyze_dataset() |
|
|
|
|
|
suggestions = { |
|
|
"numerical": [], |
|
|
"categorical": [], |
|
|
"missing_values": [], |
|
|
"outliers": [], |
|
|
"general": [] |
|
|
} |
|
|
|
|
|
|
|
|
missing_cols = [col for col, (count, _) in self.analysis_results["missing_values"].items() if count > 0] |
|
|
if missing_cols: |
|
|
suggestions["missing_values"].append(f"Found {len(missing_cols)} columns with missing values.") |
|
|
if len(missing_cols) > 5: |
|
|
suggestions["missing_values"].append(f"Columns with highest missing values: {', '.join(missing_cols[:5])}...") |
|
|
else: |
|
|
suggestions["missing_values"].append(f"Columns with missing values: {', '.join(missing_cols)}") |
|
|
|
|
|
suggestions["missing_values"].append("Consider these strategies for handling missing values:") |
|
|
suggestions["missing_values"].append("- Imputation (mean/median for numerical, mode for categorical)") |
|
|
suggestions["missing_values"].append("- Creating missing value indicators as new features") |
|
|
suggestions["missing_values"].append("- Removing rows or columns with too many missing values") |
|
|
|
|
|
|
|
|
num_cols = self.analysis_results["numerical_columns"] |
|
|
if num_cols: |
|
|
suggestions["numerical"].append(f"Found {len(num_cols)} numerical columns.") |
|
|
suggestions["numerical"].append("Consider these preprocessing steps:") |
|
|
suggestions["numerical"].append("- Scaling (StandardScaler or MinMaxScaler)") |
|
|
suggestions["numerical"].append("- Check for skewness and apply log or Box-Cox transformation if needed") |
|
|
suggestions["numerical"].append("- Create binned versions of continuous variables") |
|
|
|
|
|
|
|
|
for col in num_cols: |
|
|
if col in self.df.columns: |
|
|
q1 = self.df[col].quantile(0.25) |
|
|
q3 = self.df[col].quantile(0.75) |
|
|
iqr = q3 - q1 |
|
|
outlier_count = ((self.df[col] < (q1 - 1.5 * iqr)) | (self.df[col] > (q3 + 1.5 * iqr))).sum() |
|
|
|
|
|
if outlier_count > 0: |
|
|
percentage = round((outlier_count / len(self.df)) * 100, 2) |
|
|
if percentage > 5: |
|
|
suggestions["outliers"].append(f"Column '{col}' has {outlier_count} potential outliers ({percentage}%).") |
|
|
|
|
|
|
|
|
cat_cols = self.analysis_results["categorical_columns"] |
|
|
if cat_cols: |
|
|
suggestions["categorical"].append(f"Found {len(cat_cols)} categorical columns.") |
|
|
|
|
|
|
|
|
high_cardinality = [] |
|
|
for col in cat_cols: |
|
|
unique_count = self.analysis_results["unique_values"].get(col, 0) |
|
|
if unique_count > 10: |
|
|
high_cardinality.append((col, unique_count)) |
|
|
|
|
|
if high_cardinality: |
|
|
suggestions["categorical"].append("High cardinality columns (many unique values):") |
|
|
for col, count in sorted(high_cardinality, key=lambda x: x[1], reverse=True)[:5]: |
|
|
suggestions["categorical"].append(f"- {col}: {count} unique values") |
|
|
|
|
|
suggestions["categorical"].append("For high cardinality columns, consider:") |
|
|
suggestions["categorical"].append("- Grouping less frequent categories") |
|
|
suggestions["categorical"].append("- Target encoding or embedding techniques") |
|
|
|
|
|
suggestions["categorical"].append("General categorical encoding strategies:") |
|
|
suggestions["categorical"].append("- One-hot encoding for low cardinality columns") |
|
|
suggestions["categorical"].append("- Label encoding for ordinal variables") |
|
|
|
|
|
|
|
|
suggestions["general"].append("General preprocessing recommendations:") |
|
|
suggestions["general"].append("- Check for duplicate rows and remove if necessary") |
|
|
suggestions["general"].append("- Normalize text fields (lowercase, remove special characters)") |
|
|
suggestions["general"].append("- Create feature interactions for highly correlated features") |
|
|
|
|
|
return suggestions |
|
|
|
|
|
def generate_feature_engineering_ideas(self) -> List[str]: |
|
|
""" |
|
|
Generate feature engineering ideas based on dataset analysis |
|
|
|
|
|
Returns: |
|
|
List[str]: List of feature engineering suggestions |
|
|
""" |
|
|
if not self.analysis_results: |
|
|
self.analyze_dataset() |
|
|
|
|
|
ideas = [] |
|
|
|
|
|
|
|
|
num_cols = self.analysis_results["numerical_columns"] |
|
|
cat_cols = self.analysis_results["categorical_columns"] |
|
|
|
|
|
|
|
|
if len(num_cols) >= 2: |
|
|
ideas.append("### Numerical Feature Transformations:") |
|
|
ideas.append("1. Create polynomial features for continuous variables") |
|
|
ideas.append("2. Apply mathematical transformations (log, sqrt, square) to handle skewed distributions") |
|
|
ideas.append("3. Create binned versions of continuous features to capture non-linear relationships") |
|
|
|
|
|
|
|
|
time_related_cols = [col for col in self.df.columns if any(x in col.lower() for x in ['date', 'time', 'year', 'month', 'day'])] |
|
|
if time_related_cols: |
|
|
ideas.append("\n### Time-Based Features:") |
|
|
ideas.append(f"Detected potential date/time columns: {', '.join(time_related_cols)}") |
|
|
ideas.append("1. Extract components like year, month, day, weekday, quarter") |
|
|
ideas.append("2. Create cyclical features using sine/cosine transformations for periodic time components") |
|
|
ideas.append("3. Calculate time since specific events or time differences between dates") |
|
|
|
|
|
|
|
|
if len(cat_cols) >= 2: |
|
|
ideas.append("\n### Categorical Feature Engineering:") |
|
|
ideas.append("1. Create interaction features by combining categorical variables") |
|
|
ideas.append("2. Use target encoding for high cardinality categorical features") |
|
|
ideas.append("3. Combine rare categories into an 'Other' category to reduce dimensionality") |
|
|
|
|
|
|
|
|
if num_cols and cat_cols: |
|
|
ideas.append("\n### Feature Interactions:") |
|
|
ideas.append("1. Create group-based statistics (mean, median, min, max) of numerical features grouped by categorical features") |
|
|
ideas.append("2. Calculate the difference from group means for numerical features") |
|
|
ideas.append("3. Create ratio or difference features between related numerical columns") |
|
|
|
|
|
|
|
|
if len(num_cols) > 10: |
|
|
ideas.append("\n### Dimensionality Reduction:") |
|
|
ideas.append("1. Apply PCA to reduce dimensionality and create principal components") |
|
|
ideas.append("2. Use feature selection methods (information gain, chi-square, mutual information)") |
|
|
ideas.append("3. Try UMAP or t-SNE for non-linear dimensionality reduction") |
|
|
|
|
|
|
|
|
text_cols = [col for col in self.df.columns if self.df[col].dtype == 'object' and |
|
|
self.df[col].apply(lambda x: isinstance(x, str) and len(x.split()) > 3).mean() > 0.5] |
|
|
if text_cols: |
|
|
ideas.append("\n### Text Feature Engineering:") |
|
|
ideas.append(f"Detected potential text columns: {', '.join(text_cols)}") |
|
|
ideas.append("1. Create bag-of-words or TF-IDF representations") |
|
|
ideas.append("2. Extract text length, word count, and other statistical features") |
|
|
ideas.append("3. Consider pretrained word embeddings or sentence transformers") |
|
|
|
|
|
return ideas |
|
|
|