| import pandas as pd |
| import numpy as np |
| from typing import Dict, List, Tuple, Any, Optional |
| from utils.logger import setup_logger |
| import io |
|
|
| logger = setup_logger(__name__) |
|
|
| class DataProcessor: |
| """Core data processing class for handling file operations and data analysis""" |
| |
| def __init__(self): |
| self.supported_formats = ['csv', 'xlsx', 'xls'] |
| |
| def read_file(self, file_content: bytes, filename: str) -> pd.DataFrame: |
| """ |
| Read uploaded file and return pandas DataFrame |
| |
| Args: |
| file_content: File content as bytes |
| filename: Name of the uploaded file |
| |
| Returns: |
| pandas DataFrame |
| |
| Raises: |
| ValueError: If file format is not supported |
| Exception: If file cannot be read |
| """ |
| try: |
| file_extension = filename.lower().split('.')[-1] |
| |
| if file_extension not in self.supported_formats: |
| raise ValueError(f"Unsupported file format: {file_extension}") |
| |
| if file_extension == 'csv': |
| |
| for encoding in ['utf-8', 'latin-1', 'cp1252']: |
| try: |
| df = pd.read_csv(io.BytesIO(file_content), encoding=encoding) |
| logger.info(f"Successfully read CSV file with {encoding} encoding") |
| break |
| except UnicodeDecodeError: |
| continue |
| else: |
| raise ValueError("Could not decode CSV file with any supported encoding") |
| |
| elif file_extension in ['xlsx', 'xls']: |
| df = pd.read_excel(io.BytesIO(file_content)) |
| logger.info(f"Successfully read Excel file: {filename}") |
| |
| logger.info(f"Loaded dataset with shape: {df.shape}") |
| return df |
| |
| except Exception as e: |
| logger.error(f"Error reading file {filename}: {str(e)}") |
| raise |
| |
| def analyze_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: |
| """ |
| Analyze data quality and return comprehensive statistics |
| |
| Args: |
| df: pandas DataFrame to analyze |
| |
| Returns: |
| Dictionary containing data quality metrics |
| """ |
| analysis = { |
| 'shape': df.shape, |
| 'columns': list(df.columns), |
| 'dtypes': df.dtypes.to_dict(), |
| 'missing_values': {}, |
| 'duplicates': df.duplicated().sum(), |
| 'column_stats': {} |
| } |
| |
| |
| for col in df.columns: |
| col_analysis = { |
| 'dtype': str(df[col].dtype), |
| 'missing_count': df[col].isnull().sum(), |
| 'missing_percentage': (df[col].isnull().sum() / len(df)) * 100, |
| 'unique_values': df[col].nunique(), |
| 'sample_values': df[col].dropna().head(5).tolist() |
| } |
| |
| |
| if df[col].dtype in ['int64', 'float64']: |
| col_analysis.update({ |
| 'mean': df[col].mean(), |
| 'median': df[col].median(), |
| 'std': df[col].std(), |
| 'min': df[col].min(), |
| 'max': df[col].max(), |
| 'outliers': self._detect_outliers(df[col]) |
| }) |
| elif df[col].dtype == 'object': |
| col_analysis.update({ |
| 'most_common': df[col].value_counts().head(3).to_dict(), |
| 'avg_length': df[col].astype(str).str.len().mean() |
| }) |
| |
| analysis['column_stats'][col] = col_analysis |
| analysis['missing_values'][col] = col_analysis['missing_count'] |
| |
| logger.info(f"Data quality analysis completed for {df.shape[0]} rows, {df.shape[1]} columns") |
| return analysis |
| |
| def _detect_outliers(self, series: pd.Series) -> int: |
| """ |
| Detect outliers using IQR method |
| |
| Args: |
| series: pandas Series with numeric data |
| |
| Returns: |
| Number of outliers detected |
| """ |
| if series.dtype not in ['int64', 'float64']: |
| return 0 |
| |
| Q1 = series.quantile(0.25) |
| Q3 = series.quantile(0.75) |
| IQR = Q3 - Q1 |
| lower_bound = Q1 - 1.5 * IQR |
| upper_bound = Q3 + 1.5 * IQR |
| |
| outliers = ((series < lower_bound) | (series > upper_bound)).sum() |
| return outliers |
| |
| def detect_data_types(self, df: pd.DataFrame) -> Dict[str, str]: |
| """ |
| Detect and suggest optimal data types for each column |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Dictionary mapping column names to suggested data types |
| """ |
| suggestions = {} |
| |
| for col in df.columns: |
| current_type = str(df[col].dtype) |
| |
| |
| if current_type in ['int64', 'float64', 'bool', 'datetime64[ns]']: |
| suggestions[col] = current_type |
| continue |
| |
| |
| non_null_series = df[col].dropna() |
| |
| if len(non_null_series) == 0: |
| suggestions[col] = 'object' |
| continue |
| |
| |
| try: |
| pd.to_numeric(non_null_series) |
| if non_null_series.astype(str).str.contains(r'\.').any(): |
| suggestions[col] = 'float64' |
| else: |
| suggestions[col] = 'int64' |
| continue |
| except (ValueError, TypeError): |
| pass |
| |
| |
| try: |
| pd.to_datetime(non_null_series) |
| suggestions[col] = 'datetime64[ns]' |
| continue |
| except (ValueError, TypeError): |
| pass |
| |
| |
| unique_vals = set(non_null_series.astype(str).str.lower()) |
| if unique_vals.issubset({'true', 'false', '1', '0', 'yes', 'no'}): |
| suggestions[col] = 'bool' |
| continue |
| |
| suggestions[col] = 'object' |
| |
| return suggestions |
|
|