Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| CSV Utilities for robust loading of CSV files | |
| """ | |
| import pandas as pd | |
| import os | |
| import tempfile | |
| def is_excel_file(filepath): | |
| """ | |
| Check if a file is an Excel file based on its extension. | |
| Args: | |
| filepath: Path to the file | |
| Returns: | |
| True if file has Excel extension, False otherwise | |
| """ | |
| excel_extensions = ['.xls', '.xlsx', '.xlsm', '.xlsb', '.xltx', '.xltm'] | |
| ext = os.path.splitext(filepath)[1].lower() | |
| return ext in excel_extensions | |
| def convert_excel_to_csv(excel_filepath): | |
| """ | |
| Convert an Excel file to CSV format. | |
| Args: | |
| excel_filepath: Path to the Excel file | |
| Returns: | |
| Path to the converted CSV file (temporary file) | |
| """ | |
| print(f"Detected Excel file: {os.path.basename(excel_filepath)}") | |
| print("Converting to CSV format...") | |
| try: | |
| # Read the Excel file | |
| # Try reading with different engines for compatibility | |
| try: | |
| df = pd.read_excel(excel_filepath, engine='openpyxl') | |
| except: | |
| try: | |
| df = pd.read_excel(excel_filepath, engine='xlrd') | |
| except: | |
| df = pd.read_excel(excel_filepath) | |
| # Create a temporary CSV file | |
| temp_csv = tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) | |
| temp_csv_path = temp_csv.name | |
| temp_csv.close() | |
| # Save as CSV | |
| df.to_csv(temp_csv_path, index=False) | |
| print(f"Successfully converted to temporary CSV: {temp_csv_path}") | |
| return temp_csv_path | |
| except Exception as e: | |
| raise Exception(f"Failed to convert Excel file to CSV: {str(e)}") | |
| def robust_csv_loader(filepath, required_columns=None, max_skip_rows=3): | |
| """ | |
| Robustly load a CSV file by trying different skiprows values. | |
| Also handles Excel files by converting them to CSV first. | |
| Args: | |
| filepath: Path to the CSV or Excel file | |
| required_columns: List of column names that must be present | |
| max_skip_rows: Maximum number of rows to try skipping | |
| Returns: | |
| DataFrame if successful, raises exception if all attempts fail | |
| """ | |
| # Check if file is Excel format and convert if necessary | |
| original_filepath = filepath | |
| temp_csv_path = None | |
| if is_excel_file(filepath): | |
| temp_csv_path = convert_excel_to_csv(filepath) | |
| filepath = temp_csv_path | |
| try: | |
| # Convert required_columns to lowercase for case-insensitive matching | |
| if required_columns: | |
| required_columns = [col.lower() for col in required_columns] | |
| for skip_rows in range(max_skip_rows + 1): | |
| try: | |
| if skip_rows == 0: | |
| print(f"Trying to read {os.path.basename(original_filepath)} without skipping rows...") | |
| else: | |
| print(f"Trying to read {os.path.basename(original_filepath)} with skiprows={skip_rows}...") | |
| df = pd.read_csv(filepath, skiprows=skip_rows if skip_rows > 0 else None) | |
| # Print columns for debugging | |
| print(f"Columns found: {df.columns.tolist()}") | |
| # Check if required columns exist | |
| if required_columns: | |
| missing_cols = [col for col in required_columns if col not in [c.lower() for c in df.columns]] | |
| if missing_cols: | |
| raise ValueError(f"Missing required columns: {missing_cols}") | |
| print(f"Successfully loaded {os.path.basename(original_filepath)}") | |
| return df | |
| except Exception as e: | |
| if skip_rows == max_skip_rows: | |
| raise Exception(f"Failed to load {original_filepath} after trying {max_skip_rows + 1} skiprows values") from e | |
| continue | |
| finally: | |
| # Clean up temporary CSV file if it was created | |
| if temp_csv_path and os.path.exists(temp_csv_path): | |
| try: | |
| os.remove(temp_csv_path) | |
| print(f"Cleaned up temporary CSV file") | |
| except: | |
| pass | |
| def find_email_column(df): | |
| """Find the email column in the dataframe.""" | |
| for col in df.columns: | |
| if 'email' in col.lower(): | |
| return col | |
| return None | |
| def find_ipm_columns(df): | |
| """Find FY IPM columns in the dataframe.""" | |
| fy2425_col = None | |
| fy2324_col = None | |
| for col in df.columns: | |
| if 'FY24/25' in col and 'IPM' in col: | |
| fy2425_col = col | |
| if 'FY23/24' in col and 'IPM' in col: | |
| fy2324_col = col | |
| if not fy2425_col or not fy2324_col: | |
| print("\nWarning: Could not find IPM columns automatically") | |
| print("Available columns containing 'FY' and 'IPM':") | |
| for col in df.columns: | |
| if 'FY' in col and 'IPM' in col: | |
| print(f" - {col}") | |
| # Use the expected column names | |
| fy2425_col = 'FY24/25 全年IPM' | |
| fy2324_col = 'FY23/24 全年IPM' | |
| return fy2425_col, fy2324_col |