import pandas as pd from typing import Optional from sklearn.metrics import ( precision_score, recall_score, accuracy_score, classification_report, ) from tqdm.asyncio import tqdm as async_tqdm import asyncio from src.modules.vlm_inference import analyze_product_image_async from src.modules.data_processing import image_to_base64 from pathlib import Path DATA_PATH = Path(__file__).parents[2] / "data" async def _process_single_row( row_data: dict, model: str, api_key: str, provider: str, semaphore: asyncio.Semaphore, ) -> dict: """ Process a single row with semaphore control Args: row_data: Dictionary with 'image' and 'id' keys model: Model to use for inference api_key: API key for the provider provider: Provider to use semaphore: Asyncio semaphore for rate limiting Returns: dict: Prediction result """ async with semaphore: try: img_b64 = image_to_base64(row_data["image"]) prediction = await analyze_product_image_async( image_url=img_b64, model=model, api_key=api_key, provider=provider, ) return { "id": row_data["id"], "pred_masterCategory": prediction.master_category, "pred_gender": prediction.gender, "pred_subCategory": prediction.sub_category, "pred_description": prediction.description, } except Exception as e: return { "id": row_data["id"], "pred_masterCategory": None, "pred_gender": None, "pred_subCategory": None, "pred_description": f"Error: {str(e)}", } async def run_inference_on_dataframe_async( df: pd.DataFrame, model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct", api_key: Optional[str] = None, provider: str = "Fireworks", max_concurrent_requests: int = 10, ) -> pd.DataFrame: """ Run VLM inference on entire dataframe of images with concurrent requests Args: df: DataFrame containing images model: Model to use for inference api_key: API key for the provider provider: Provider to use (Fireworks or OpenAI) max_concurrent_requests: Maximum number of concurrent API requests (default: 10) Returns: pd.DataFrame: Results with columns: - id: Image ID - pred_masterCategory: Predicted master category - pred_gender: Predicted gender - pred_subCategory: Predicted sub-category - pred_description: Predicted description """ # Create semaphore for rate limiting semaphore = asyncio.Semaphore(max_concurrent_requests) # Prepare all rows as dictionaries rows_data = [ {"image": row.image, "id": row.id} for row in df.itertuples(index=False, name="columns") ] # Create all tasks (coroutines, not awaited yet) tasks = [ _process_single_row(row_data, model, api_key, provider, semaphore) for row_data in rows_data ] _model = model.split("/")[-1] # Run all tasks concurrently with progress bar results = [] for task in async_tqdm.as_completed(tasks, total=len(tasks)): result = await task results.append(result) if len(results) % 10 == 0: df_pred = pd.DataFrame(results) file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv" df_pred.to_csv(file_name, index=False) # Final save df_pred = pd.DataFrame(results) file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv" df_pred.to_csv(file_name, index=False) print(f"\nPrediction successful, dataset saved to {file_name}") return df_pred def run_inference_on_dataframe( df: pd.DataFrame, model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct", api_key: Optional[str] = None, provider: str = "Fireworks", max_concurrent_requests: int = 10, ) -> pd.DataFrame: """ Run VLM inference on entire dataframe of images (sync wrapper for async function) Args: df: DataFrame containing images model: Model to use for inference api_key: API key for the provider provider: Provider to use (Fireworks or OpenAI) max_concurrent_requests: Maximum number of concurrent API requests (default: 10) Returns: pd.DataFrame: Results with columns: - id: Image ID - pred_masterCategory: Predicted master category - pred_gender: Predicted gender - pred_subCategory: Predicted sub-category - pred_description: Predicted description """ return asyncio.run( run_inference_on_dataframe_async(df, model, api_key, provider, max_concurrent_requests) ) def calculate_metrics( df_ground_truth: pd.DataFrame, df_predictions: pd.DataFrame, column: str, id_col: str = "id", average: str = "weighted", ) -> dict: """ Calculate precision, recall, and accuracy for a specific column Args: df_ground_truth: DataFrame with ground truth labels df_predictions: DataFrame with predictions column: Column name to evaluate (e.g., 'master_category', 'gender', 'sub_category') id_col: Column name for joining the dataframes average: Averaging method for multi-class metrics ('weighted', 'macro', 'micro') Returns: dict: Dictionary containing: - accuracy: Overall accuracy - precision: Precision score - recall: Recall score - classification_report: Detailed classification report """ # Merge ground truth and predictions on ID merged = df_ground_truth[[id_col, column]].merge( df_predictions[[id_col, f"pred_{column}"]], on=id_col, how="inner" ) # Remove any rows with None predictions (failed inferences) merged = merged.dropna(subset=[f"pred_{column}"]) if len(merged) == 0: raise ValueError("No valid predictions found after merging and removing nulls") # Get ground truth and predictions y_true = merged[column] y_pred = merged[f"pred_{column}"] # Calculate metrics accuracy = accuracy_score(y_true, y_pred) precision = precision_score(y_true, y_pred, average=average, zero_division=0) recall = recall_score(y_true, y_pred, average=average, zero_division=0) # Get detailed classification report report = classification_report(y_true, y_pred, zero_division=0) return { "accuracy": accuracy, "precision": precision, "recall": recall, "classification_report": report, "num_samples": len(merged), } def evaluate_all_categories( df_ground_truth: pd.DataFrame, df_predictions: pd.DataFrame, id_col: str = "id", categories: list[str] = None, ) -> dict: """ Evaluate predictions for all category types Args: df_ground_truth: DataFrame with ground truth labels df_predictions: DataFrame with predictions id_col: Column name for joining categories: List of category columns to evaluate Returns: dict: Dictionary with metrics for each category """ if categories is None: categories = ["masterCategory", "gender", "subCategory"] results = {} for category in categories: print(f"\n{'='*60}") print(f"Evaluating: {category}") print(f"{'='*60}") try: metrics = calculate_metrics( df_ground_truth=df_ground_truth, df_predictions=df_predictions, column=category, id_col=id_col, ) results[category] = metrics # Print summary print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"Precision: {metrics['precision']:.4f}") print(f"Recall: {metrics['recall']:.4f}") print(f"Samples: {metrics['num_samples']}") print(f"\nClassification Report:\n{metrics['classification_report']}") except Exception as e: print(f"Error evaluating {category}: {e}") results[category] = {"error": str(e)} return results def create_evaluation_summary(results: dict) -> pd.DataFrame: """ Create a summary DataFrame from evaluation results Args: results: Dictionary of evaluation results from evaluate_all_categories Returns: pd.DataFrame: Summary table with metrics for each category """ summary_data = [] for category, metrics in results.items(): if "error" not in metrics: summary_data.append( { "Category": category, "Accuracy": f"{metrics['accuracy']:.4f}", "Precision": f"{metrics['precision']:.4f}", "Recall": f"{metrics['recall']:.4f}", "Samples": metrics["num_samples"], } ) else: summary_data.append( { "Category": category, "Accuracy": "Error", "Precision": "Error", "Recall": "Error", "Samples": 0, } ) return pd.DataFrame(summary_data) def extract_metrics(results_dict, model_name): """ Extract accuracy, precision, and recall for each category. Args: results_dict: Dictionary containing evaluation metrics model_name: Name of the model for identification Returns: List of dictionaries with metrics per category """ metrics_list = [] for category, metrics in results_dict.items(): metrics_list.append({ 'model': model_name, 'category': category, 'accuracy': metrics['accuracy'], 'precision': metrics['precision'], 'recall': metrics['recall'], 'num_samples': metrics['num_samples'] }) return metrics_list