|
|
"""Data loading, cleaning, and filtering helpers for the BI dashboard.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from dataclasses import dataclass |
|
|
from io import BytesIO |
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, List, Mapping, Optional, Tuple |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
from utils import ( |
|
|
ColumnTypes, |
|
|
PREVIEW_ROWS, |
|
|
coerce_datetime_columns, |
|
|
ensure_unique_columns, |
|
|
infer_column_types, |
|
|
is_supported_file, |
|
|
) |
|
|
|
|
|
SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "data" |
|
|
SAMPLE_DESCRIPTIONS = { |
|
|
"train.csv": "Weekly Walmart sales with markdowns and holidays (training set).", |
|
|
"test.csv": "Companion test set without weekly sales labels.", |
|
|
"features.csv": "Store-level features such as markdowns, CPI, unemployment.", |
|
|
"stores.csv": "Store metadata including type and size.", |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class DatasetBundle: |
|
|
"""Container storing the dataset and metadata required by the UI.""" |
|
|
|
|
|
dataframe: pd.DataFrame |
|
|
column_types: ColumnTypes |
|
|
source_name: str |
|
|
|
|
|
|
|
|
def load_dataset(file_obj) -> DatasetBundle: |
|
|
"""Load the provided uploaded file into a pandas DataFrame. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
file_obj: |
|
|
File-like object produced by the Gradio upload widget. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
DatasetBundle |
|
|
Loaded dataset alongside inferred column metadata. |
|
|
|
|
|
Raises |
|
|
------ |
|
|
ValueError |
|
|
If the file cannot be read or uses an unsupported format. |
|
|
""" |
|
|
if file_obj is None: |
|
|
raise ValueError("Please upload a CSV or Excel file.") |
|
|
|
|
|
file_name = getattr(file_obj, "name", None) |
|
|
original_name = getattr(file_obj, "orig_name", file_name) |
|
|
|
|
|
if not original_name or not is_supported_file(original_name): |
|
|
raise ValueError("Unsupported file type. Please upload a CSV or Excel file.") |
|
|
|
|
|
path_candidate = Path(str(file_name)) if file_name else None |
|
|
dataframe: Optional[pd.DataFrame] = None |
|
|
|
|
|
try: |
|
|
if path_candidate and path_candidate.exists(): |
|
|
dataframe = _read_from_path(path_candidate, original_name) |
|
|
else: |
|
|
dataframe = _read_from_buffer(file_obj, original_name) |
|
|
except Exception as exc: |
|
|
raise ValueError(f"Unable to load dataset: {exc}") from exc |
|
|
|
|
|
if dataframe is None: |
|
|
raise ValueError("Failed to load dataset. The file may be empty or corrupted.") |
|
|
|
|
|
dataframe = ensure_unique_columns(dataframe) |
|
|
dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
|
|
column_types = infer_column_types(dataframe) |
|
|
|
|
|
|
|
|
column_types = ColumnTypes( |
|
|
numeric=column_types.numeric, |
|
|
categorical=column_types.categorical, |
|
|
datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
|
|
) |
|
|
|
|
|
return DatasetBundle( |
|
|
dataframe=dataframe, |
|
|
column_types=column_types, |
|
|
source_name=Path(original_name).name, |
|
|
) |
|
|
|
|
|
|
|
|
def _read_from_path(path: Path, original_name: str) -> pd.DataFrame: |
|
|
"""Read a dataset from disk.""" |
|
|
suffix = path.suffix.lower() |
|
|
if suffix == ".csv": |
|
|
return pd.read_csv(path) |
|
|
if suffix in {".xlsx", ".xls"}: |
|
|
return pd.read_excel(path) |
|
|
raise ValueError(f"Unsupported file extension in {original_name}.") |
|
|
|
|
|
|
|
|
def _read_from_buffer(file_obj, original_name: str) -> pd.DataFrame: |
|
|
"""Read a dataset from an in-memory buffer.""" |
|
|
bytes_data = getattr(file_obj, "read", lambda: b"")() |
|
|
if not bytes_data: |
|
|
raise ValueError(f"The uploaded file '{original_name}' is empty.") |
|
|
|
|
|
buffer = BytesIO(bytes_data) |
|
|
lowered = original_name.lower() |
|
|
if lowered.endswith(".csv"): |
|
|
return pd.read_csv(buffer) |
|
|
if lowered.endswith((".xlsx", ".xls")): |
|
|
return pd.read_excel(buffer) |
|
|
|
|
|
raise ValueError("Only CSV and Excel files are supported.") |
|
|
|
|
|
|
|
|
def dataset_overview(df: pd.DataFrame) -> Dict[str, object]: |
|
|
"""Return basic information about the dataset.""" |
|
|
info = { |
|
|
"Rows": int(df.shape[0]), |
|
|
"Columns": int(df.shape[1]), |
|
|
"Memory Usage (MB)": round(df.memory_usage(deep=True).sum() / (1024**2), 2), |
|
|
} |
|
|
dtypes = pd.DataFrame({"Column": df.columns, "Type": df.dtypes.astype(str)}) |
|
|
return {"info": info, "dtypes": dtypes} |
|
|
|
|
|
|
|
|
def dataset_preview(df: pd.DataFrame, rows: int = PREVIEW_ROWS) -> Dict[str, pd.DataFrame]: |
|
|
"""Return head and tail previews of the dataset.""" |
|
|
return { |
|
|
"head": df.head(rows), |
|
|
"tail": df.tail(rows), |
|
|
} |
|
|
|
|
|
|
|
|
def numeric_summary(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Compute descriptive statistics for numeric columns.""" |
|
|
numeric_df = df.select_dtypes(include=["number"]) |
|
|
if numeric_df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
summary = pd.DataFrame( |
|
|
{ |
|
|
"count": numeric_df.count(), |
|
|
"mean": numeric_df.mean(), |
|
|
"median": numeric_df.median(), |
|
|
"std": numeric_df.std(), |
|
|
"min": numeric_df.min(), |
|
|
"25%": numeric_df.quantile(0.25), |
|
|
"75%": numeric_df.quantile(0.75), |
|
|
"max": numeric_df.max(), |
|
|
} |
|
|
) |
|
|
|
|
|
summary.index.name = "column" |
|
|
return summary.round(3) |
|
|
|
|
|
|
|
|
def categorical_summary(df: pd.DataFrame, top_values: int = 5) -> pd.DataFrame: |
|
|
"""Compute summary statistics for categorical columns.""" |
|
|
categorical_cols = df.select_dtypes(exclude=["number", "datetime64[ns]", "datetime64[ns, UTC]"]) |
|
|
if categorical_cols.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
rows: List[Dict[str, object]] = [] |
|
|
for column in categorical_cols: |
|
|
series = categorical_cols[column] |
|
|
mode_series = series.mode(dropna=True) |
|
|
mode_value = mode_series.iloc[0] if not mode_series.empty else None |
|
|
counts = series.value_counts(dropna=True).head(top_values) |
|
|
top_repr = ", ".join(f"{idx} ({count})" for idx, count in counts.items()) |
|
|
rows.append( |
|
|
{ |
|
|
"column": column, |
|
|
"unique_values": int(series.nunique(dropna=True)), |
|
|
"mode": mode_value, |
|
|
"mode_count": int(counts.iloc[0]) if not counts.empty else 0, |
|
|
f"top_{top_values}": top_repr, |
|
|
} |
|
|
) |
|
|
|
|
|
return pd.DataFrame(rows) |
|
|
|
|
|
|
|
|
def missing_value_report(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Return the count and percentage of missing values per column.""" |
|
|
missing_counts = df.isna().sum() |
|
|
if missing_counts.sum() == 0: |
|
|
return pd.DataFrame(columns=["column", "missing_count", "missing_pct"]) |
|
|
|
|
|
missing_pct = (missing_counts / len(df)) * 100 |
|
|
report = pd.DataFrame( |
|
|
{ |
|
|
"column": missing_counts.index, |
|
|
"missing_count": missing_counts.values, |
|
|
"missing_pct": missing_pct.values, |
|
|
} |
|
|
) |
|
|
return report.sort_values(by="missing_pct", ascending=False).reset_index(drop=True).round({"missing_pct": 2}) |
|
|
|
|
|
|
|
|
def correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Compute the correlation matrix for numeric columns.""" |
|
|
numeric_df = df.select_dtypes(include=["number"]) |
|
|
if numeric_df.empty or numeric_df.shape[1] < 2: |
|
|
return pd.DataFrame() |
|
|
corr = numeric_df.corr() |
|
|
return corr.round(3) |
|
|
|
|
|
|
|
|
def filter_dataframe( |
|
|
df: pd.DataFrame, |
|
|
numeric_filters: Mapping[str, Tuple[Optional[float], Optional[float]]], |
|
|
categorical_filters: Mapping[str, Iterable[str]], |
|
|
date_filters: Mapping[str, Tuple[Optional[str], Optional[str]]], |
|
|
) -> pd.DataFrame: |
|
|
"""Filter the dataset according to the provided filter definitions.""" |
|
|
filtered = df.copy() |
|
|
|
|
|
for column, bounds in numeric_filters.items(): |
|
|
if column not in filtered.columns or bounds is None: |
|
|
continue |
|
|
lower, upper = bounds |
|
|
series = filtered[column] |
|
|
if lower is not None: |
|
|
filtered = filtered[series >= lower] |
|
|
if upper is not None: |
|
|
filtered = filtered[series <= upper] |
|
|
|
|
|
for column, values in categorical_filters.items(): |
|
|
if column not in filtered.columns: |
|
|
continue |
|
|
values = list(values) |
|
|
if not values: |
|
|
continue |
|
|
filtered = filtered[filtered[column].isin(values)] |
|
|
|
|
|
for column, bounds in date_filters.items(): |
|
|
if column not in filtered.columns or bounds is None: |
|
|
continue |
|
|
start, end = bounds |
|
|
series = pd.to_datetime(filtered[column], errors="coerce") |
|
|
if start: |
|
|
filtered = filtered[series >= pd.to_datetime(start)] |
|
|
if end: |
|
|
filtered = filtered[series <= pd.to_datetime(end)] |
|
|
|
|
|
return filtered |
|
|
|
|
|
|
|
|
def filter_metadata(df: pd.DataFrame, column_types: ColumnTypes, categorical_limit: int = 200) -> Dict[str, object]: |
|
|
"""Pre-compute useful metadata for rendering filter controls.""" |
|
|
metadata: Dict[str, object] = {"numeric": {}, "categorical": {}, "datetime": {}} |
|
|
|
|
|
for column in column_types.numeric: |
|
|
series = df[column].dropna() |
|
|
if series.empty: |
|
|
continue |
|
|
metadata["numeric"][column] = { |
|
|
"min": float(series.min()), |
|
|
"max": float(series.max()), |
|
|
} |
|
|
|
|
|
for column in column_types.categorical: |
|
|
series = df[column].dropna().astype(str) |
|
|
unique_values = series.unique().tolist() |
|
|
if len(unique_values) > categorical_limit: |
|
|
unique_values = unique_values[:categorical_limit] |
|
|
metadata["categorical"][column] = unique_values |
|
|
|
|
|
for column in column_types.datetime: |
|
|
series = pd.to_datetime(df[column], errors="coerce") |
|
|
series = series.dropna() |
|
|
if series.empty: |
|
|
continue |
|
|
metadata["datetime"][column] = { |
|
|
"min": series.min().date(), |
|
|
"max": series.max().date(), |
|
|
} |
|
|
|
|
|
return metadata |
|
|
|
|
|
|
|
|
def sample_dataset_options() -> Dict[str, str]: |
|
|
"""Return available bundled datasets and their descriptions.""" |
|
|
options: Dict[str, str] = {} |
|
|
if not SAMPLE_DATA_DIR.exists(): |
|
|
return options |
|
|
|
|
|
for path in sorted(SAMPLE_DATA_DIR.iterdir()): |
|
|
if not path.is_file(): |
|
|
continue |
|
|
if path.suffix.lower() not in {".csv", ".xlsx", ".xls"}: |
|
|
continue |
|
|
description = SAMPLE_DESCRIPTIONS.get(path.name, f"Sample dataset sourced from '{path.name}'.") |
|
|
options[path.name] = description |
|
|
return options |
|
|
|
|
|
|
|
|
def load_sample_dataset(selection: str) -> DatasetBundle: |
|
|
"""Load a dataset bundled inside the local data directory.""" |
|
|
if not selection: |
|
|
raise ValueError("Please select a sample dataset from the dropdown.") |
|
|
|
|
|
path = SAMPLE_DATA_DIR / selection |
|
|
if not path.exists(): |
|
|
raise ValueError( |
|
|
f"Sample dataset '{selection}' was not found in the 'data/' directory. " |
|
|
"Ensure the file exists and try again." |
|
|
) |
|
|
|
|
|
dataframe = _read_from_path(path, selection) |
|
|
dataframe = ensure_unique_columns(dataframe) |
|
|
dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
|
|
column_types = infer_column_types(dataframe) |
|
|
column_types = ColumnTypes( |
|
|
numeric=column_types.numeric, |
|
|
categorical=column_types.categorical, |
|
|
datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
|
|
) |
|
|
|
|
|
return DatasetBundle( |
|
|
dataframe=dataframe, |
|
|
column_types=column_types, |
|
|
source_name=selection, |
|
|
) |
|
|
|