Spaces:
No application file
No application file
| # processor.py | |
| """ | |
| Main processing orchestrator that ties together all the manufacturing priority logic. | |
| This module provides high-level functions that both the CLI and Gradio interfaces can use. | |
| """ | |
| import os | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional | |
| from datetime import datetime | |
| from config import WEIGHTS | |
| from sheet_reader import list_sheets, read_sheet | |
| from priority_logic import compute_priority | |
| from output_writer import save_with_instructions | |
| from utils import prompt_weights | |
| class ManufacturingProcessor: | |
| """ | |
| Main processor class for manufacturing priority calculations. | |
| Encapsulates all the logic needed to process Excel files and generate priority rankings. | |
| """ | |
| def __init__(self, weights: Optional[Dict[str, int]] = None): | |
| """Initialize processor with weights""" | |
| self.weights = weights or WEIGHTS.copy() | |
| self.validate_weights() | |
| def validate_weights(self) -> None: | |
| """Ensure weights sum to 100""" | |
| total = sum(self.weights.values()) | |
| if total != 100: | |
| raise ValueError(f"Weights must sum to 100, got {total}") | |
| def get_file_info(self, file_path: str) -> Dict: | |
| """Get information about the Excel file""" | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| try: | |
| sheets = list_sheets(file_path) | |
| file_size = os.path.getsize(file_path) | |
| return { | |
| "file_path": file_path, | |
| "file_name": os.path.basename(file_path), | |
| "file_size": file_size, | |
| "sheets": sheets, | |
| "sheet_count": len(sheets) | |
| } | |
| except Exception as e: | |
| raise Exception(f"Error reading file info: {e}") | |
| def validate_sheet_data(self, df: pd.DataFrame) -> Dict: | |
| """Validate that the sheet has required columns and data""" | |
| from priority_logic import REQUIRED_COLS | |
| # Normalize column names | |
| df_norm = df.copy() | |
| df_norm.columns = [str(c).strip() for c in df_norm.columns] | |
| # Check required columns | |
| missing_cols = [col for col in REQUIRED_COLS if col not in df_norm.columns] | |
| # Basic data validation | |
| validation_result = { | |
| "valid": len(missing_cols) == 0, | |
| "missing_columns": missing_cols, | |
| "available_columns": list(df.columns), | |
| "row_count": len(df), | |
| "empty_rows": df.isnull().all(axis=1).sum(), | |
| "data_issues": [] | |
| } | |
| if validation_result["valid"]: | |
| # Check for data quality issues | |
| try: | |
| # Check date column | |
| date_col = "Oldest Product Required First" | |
| date_issues = pd.to_datetime(df_norm[date_col], errors='coerce').isnull().sum() | |
| if date_issues > 0: | |
| validation_result["data_issues"].append(f"{date_issues} invalid dates in '{date_col}'") | |
| # Check quantity column | |
| qty_col = "Quantity of Each Component" | |
| qty_numeric = pd.to_numeric(df_norm[qty_col], errors='coerce') | |
| qty_issues = qty_numeric.isnull().sum() | |
| if qty_issues > 0: | |
| validation_result["data_issues"].append(f"{qty_issues} non-numeric values in '{qty_col}'") | |
| # Check for completely empty required columns | |
| for col in REQUIRED_COLS: | |
| if col in df_norm.columns: | |
| empty_count = df_norm[col].isnull().sum() | |
| if empty_count == len(df_norm): | |
| validation_result["data_issues"].append(f"Column '{col}' is completely empty") | |
| except Exception as e: | |
| validation_result["data_issues"].append(f"Data validation error: {e}") | |
| return validation_result | |
| def process_file(self, | |
| file_path: str, | |
| sheet_name: str, | |
| min_qty: int = 50, | |
| custom_weights: Dict[str, int] = None) -> Tuple[pd.DataFrame, Dict]: | |
| """ | |
| Process a single sheet from an Excel file and return prioritized results. | |
| Returns: | |
| Tuple of (processed_dataframe, processing_info) | |
| """ | |
| # Use custom weights if provided | |
| weights = custom_weights or self.weights | |
| if custom_weights: | |
| temp_weights = custom_weights.copy() | |
| if sum(temp_weights.values()) != 100: | |
| raise ValueError("Custom weights must sum to 100") | |
| else: | |
| temp_weights = weights | |
| # Read the data | |
| df = read_sheet(file_path, sheet_name) | |
| if df is None or df.empty: | |
| raise ValueError("Sheet is empty or could not be read") | |
| # Validate data | |
| validation = self.validate_sheet_data(df) | |
| if not validation["valid"]: | |
| raise ValueError(f"Data validation failed: Missing columns {validation['missing_columns']}") | |
| # Process priority calculation | |
| try: | |
| processed_df = compute_priority(df, min_qty=min_qty, weights=temp_weights) | |
| except Exception as e: | |
| raise Exception(f"Priority calculation failed: {e}") | |
| # Generate processing info | |
| processing_info = { | |
| "timestamp": datetime.now().isoformat(), | |
| "file_name": os.path.basename(file_path), | |
| "sheet_name": sheet_name, | |
| "weights_used": temp_weights, | |
| "min_quantity": min_qty, | |
| "total_products": len(df), | |
| "products_above_threshold": sum(processed_df["QtyThresholdOK"]), | |
| "highest_priority_score": processed_df["PriorityScore"].max(), | |
| "lowest_priority_score": processed_df["PriorityScore"].min(), | |
| "validation_info": validation | |
| } | |
| return processed_df, processing_info | |
| def save_results(self, | |
| processed_df: pd.DataFrame, | |
| output_path: str, | |
| processing_info: Dict) -> str: | |
| """Save processed results with full documentation""" | |
| try: | |
| save_with_instructions( | |
| processed_df, | |
| output_path, | |
| min_qty=processing_info["min_quantity"], | |
| weights=processing_info["weights_used"] | |
| ) | |
| # Add processing log sheet | |
| self._add_processing_log(output_path, processing_info) | |
| return output_path | |
| except Exception as e: | |
| raise Exception(f"Failed to save results: {e}") | |
| def _add_processing_log(self, output_path: str, processing_info: Dict): | |
| """Add a processing log sheet to the output file""" | |
| try: | |
| # Read existing file and add log sheet | |
| with pd.ExcelWriter(output_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer: | |
| log_data = [] | |
| log_data.append(["PROCESSING LOG"]) | |
| log_data.append([""]) | |
| log_data.append(["Processing Timestamp", processing_info["timestamp"]]) | |
| log_data.append(["Source File", processing_info["file_name"]]) | |
| log_data.append(["Sheet Processed", processing_info["sheet_name"]]) | |
| log_data.append([""]) | |
| log_data.append(["SETTINGS USED"]) | |
| log_data.append(["Age Weight", f"{processing_info['weights_used']['AGE_WEIGHT']}%"]) | |
| log_data.append(["Component Weight", f"{processing_info['weights_used']['COMPONENT_WEIGHT']}%"]) | |
| log_data.append(["Manual Weight", f"{processing_info['weights_used']['MANUAL_WEIGHT']}%"]) | |
| log_data.append(["Minimum Quantity", processing_info["min_quantity"]]) | |
| log_data.append([""]) | |
| log_data.append(["RESULTS SUMMARY"]) | |
| log_data.append(["Total Products", processing_info["total_products"]]) | |
| log_data.append(["Above Threshold", processing_info["products_above_threshold"]]) | |
| log_data.append(["Highest Priority Score", f"{processing_info['highest_priority_score']:.4f}"]) | |
| log_data.append(["Lowest Priority Score", f"{processing_info['lowest_priority_score']:.4f}"]) | |
| if processing_info["validation_info"]["data_issues"]: | |
| log_data.append([""]) | |
| log_data.append(["DATA ISSUES FOUND"]) | |
| for issue in processing_info["validation_info"]["data_issues"]: | |
| log_data.append(["", issue]) | |
| log_df = pd.DataFrame(log_data, columns=["Parameter", "Value"]) | |
| log_df.to_excel(writer, sheet_name='Processing_Log', index=False) | |
| except Exception as e: | |
| # If adding log fails, don't fail the whole operation | |
| print(f"Warning: Could not add processing log: {e}") | |
| # Convenience functions for easy import | |
| def quick_process(file_path: str, | |
| sheet_name: str, | |
| output_path: str = None, | |
| min_qty: int = 50, | |
| weights: Optional[Dict[str, int]] = None) -> str: | |
| """ | |
| Quick processing function that handles the full workflow. | |
| Args: | |
| file_path: Path to Excel file | |
| sheet_name: Name of sheet to process | |
| output_path: Where to save results (optional, will auto-generate if not provided) | |
| min_qty: Minimum quantity threshold | |
| weights: Custom weights dict (optional) | |
| Returns: | |
| Path to generated output file | |
| """ | |
| processor = ManufacturingProcessor(weights) | |
| # Process the data | |
| processed_df, processing_info = processor.process_file( | |
| file_path, sheet_name, min_qty, weights | |
| ) | |
| # Generate output path if not provided | |
| if output_path is None: | |
| base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| output_dir = os.path.dirname(file_path) | |
| output_path = os.path.join(output_dir, f"{base_name}_PRIORITY.xlsx") | |
| # Save results | |
| return processor.save_results(processed_df, output_path, processing_info) | |
| def get_file_preview(file_path: str, sheet_name: str, max_rows: int = 5) -> Dict: | |
| """ | |
| Get a preview of the file data for validation purposes. | |
| Returns: | |
| Dict containing preview info and sample data | |
| """ | |
| processor = ManufacturingProcessor() | |
| # Get file info | |
| file_info = processor.get_file_info(file_path) | |
| # Read sample data | |
| df = read_sheet(file_path, sheet_name) | |
| sample_df = df.head(max_rows) if df is not None else pd.DataFrame() | |
| # Validate data | |
| validation = processor.validate_sheet_data(df) if df is not None else {"valid": False} | |
| return { | |
| "file_info": file_info, | |
| "sample_data": sample_df, | |
| "validation": validation, | |
| "preview_rows": len(sample_df) | |
| } |