|
from typing import Dict, List, Any, Optional, Tuple
|
|
import pandas as pd
|
|
import numpy as np
|
|
from pathlib import Path
|
|
import os
|
|
import chardet
|
|
import csv
|
|
|
|
class CSVHelpers:
|
|
"""Helper utilities for CSV preprocessing and analysis."""
|
|
|
|
@staticmethod
|
|
def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
|
|
"""Detect the encoding of a CSV file."""
|
|
with open(file_path, 'rb') as f:
|
|
raw_data = f.read(sample_size)
|
|
result = chardet.detect(raw_data)
|
|
return result['encoding']
|
|
|
|
@staticmethod
|
|
def detect_delimiter(file_path: str, encoding: str = 'utf-8') -> str:
|
|
"""Detect the delimiter used in a CSV file."""
|
|
with open(file_path, 'r', encoding=encoding) as csvfile:
|
|
sample = csvfile.read(4096)
|
|
|
|
|
|
for delimiter in [',', ';', '\t', '|']:
|
|
sniffer = csv.Sniffer()
|
|
try:
|
|
if delimiter in sample:
|
|
dialect = sniffer.sniff(sample, delimiters=delimiter)
|
|
return dialect.delimiter
|
|
except:
|
|
continue
|
|
|
|
|
|
return ','
|
|
|
|
@staticmethod
|
|
def preprocess_csv(file_path: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
|
|
"""
|
|
Preprocess a CSV file with automatic encoding and delimiter detection.
|
|
Returns the DataFrame and metadata about the preprocessing.
|
|
"""
|
|
|
|
try:
|
|
encoding = CSVHelpers.detect_encoding(file_path)
|
|
except:
|
|
encoding = 'utf-8'
|
|
|
|
|
|
try:
|
|
delimiter = CSVHelpers.detect_delimiter(file_path, encoding)
|
|
except:
|
|
delimiter = ','
|
|
|
|
|
|
df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter, low_memory=False)
|
|
|
|
|
|
metadata = {
|
|
"original_shape": df.shape,
|
|
"encoding": encoding,
|
|
"delimiter": delimiter,
|
|
"columns": list(df.columns),
|
|
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}
|
|
}
|
|
|
|
|
|
missing_counts = df.isna().sum()
|
|
metadata["missing_values"] = {col: int(count) for col, count in missing_counts.items() if count > 0}
|
|
|
|
|
|
duplicates = df.duplicated().sum()
|
|
metadata["duplicate_rows"] = int(duplicates)
|
|
|
|
return df, metadata
|
|
|
|
@staticmethod
|
|
def infer_column_types(df: pd.DataFrame) -> Dict[str, str]:
|
|
"""
|
|
Infer semantic types of columns (beyond pandas dtypes).
|
|
Examples: date, categorical, numeric, text, etc.
|
|
"""
|
|
column_types = {}
|
|
|
|
for column in df.columns:
|
|
|
|
if df[column].isna().all():
|
|
column_types[column] = "unknown"
|
|
continue
|
|
|
|
|
|
dtype = df[column].dtype
|
|
|
|
|
|
if pd.api.types.is_datetime64_dtype(df[column]):
|
|
column_types[column] = "datetime"
|
|
|
|
|
|
elif dtype == 'object':
|
|
try:
|
|
|
|
sample = df[column].dropna().head(10)
|
|
pd.to_datetime(sample)
|
|
column_types[column] = "potential_datetime"
|
|
except:
|
|
|
|
unique_ratio = df[column].nunique() / len(df)
|
|
if unique_ratio < 0.1:
|
|
column_types[column] = "categorical"
|
|
else:
|
|
column_types[column] = "text"
|
|
|
|
|
|
elif pd.api.types.is_numeric_dtype(dtype):
|
|
|
|
if df[column].nunique() == len(df) and df[column].min() >= 0:
|
|
column_types[column] = "id"
|
|
|
|
elif df[column].nunique() <= 2:
|
|
column_types[column] = "binary"
|
|
|
|
elif pd.api.types.is_integer_dtype(dtype):
|
|
column_types[column] = "integer"
|
|
else:
|
|
column_types[column] = "float"
|
|
|
|
|
|
elif pd.api.types.is_bool_dtype(dtype):
|
|
column_types[column] = "boolean"
|
|
|
|
|
|
else:
|
|
column_types[column] = "unknown"
|
|
|
|
return column_types
|
|
|
|
@staticmethod
|
|
def suggest_visualizations(df: pd.DataFrame) -> List[Dict[str, Any]]:
|
|
"""
|
|
Suggest appropriate visualizations based on data types.
|
|
Returns a list of visualization suggestions.
|
|
"""
|
|
suggestions = []
|
|
column_types = CSVHelpers.infer_column_types(df)
|
|
numeric_columns = [col for col, type in column_types.items()
|
|
if type in ["integer", "float"]]
|
|
categorical_columns = [col for col, type in column_types.items()
|
|
if type in ["categorical", "binary"]]
|
|
datetime_columns = [col for col, type in column_types.items()
|
|
if type in ["datetime", "potential_datetime"]]
|
|
|
|
|
|
for col in numeric_columns[:3]:
|
|
suggestions.append({
|
|
"chart_type": "histogram",
|
|
"column": col,
|
|
"title": f"Distribution of {col}"
|
|
})
|
|
|
|
|
|
for col in categorical_columns[:3]:
|
|
suggestions.append({
|
|
"chart_type": "bar",
|
|
"x_column": col,
|
|
"y_column": "count",
|
|
"title": f"Count by {col}"
|
|
})
|
|
|
|
|
|
if datetime_columns and numeric_columns:
|
|
suggestions.append({
|
|
"chart_type": "line",
|
|
"x_column": datetime_columns[0],
|
|
"y_column": numeric_columns[0],
|
|
"title": f"{numeric_columns[0]} over Time"
|
|
})
|
|
|
|
|
|
if len(numeric_columns) >= 2:
|
|
suggestions.append({
|
|
"chart_type": "scatter",
|
|
"x_column": numeric_columns[0],
|
|
"y_column": numeric_columns[1],
|
|
"title": f"{numeric_columns[1]} vs {numeric_columns[0]}"
|
|
})
|
|
|
|
return suggestions
|
|
|