Spaces:
Running
Running
| import pandas as pd | |
| from typing import Optional | |
| from sklearn.metrics import ( | |
| precision_score, | |
| recall_score, | |
| accuracy_score, | |
| classification_report, | |
| ) | |
| from tqdm.asyncio import tqdm as async_tqdm | |
| import asyncio | |
| from src.modules.vlm_inference import analyze_product_image_async | |
| from src.modules.data_processing import image_to_base64 | |
| from pathlib import Path | |
| DATA_PATH = Path(__file__).parents[2] / "data" | |
| async def _process_single_row( | |
| row_data: dict, | |
| model: str, | |
| api_key: str, | |
| provider: str, | |
| semaphore: asyncio.Semaphore, | |
| ) -> dict: | |
| """ | |
| Process a single row with semaphore control | |
| Args: | |
| row_data: Dictionary with 'image' and 'id' keys | |
| model: Model to use for inference | |
| api_key: API key for the provider | |
| provider: Provider to use | |
| semaphore: Asyncio semaphore for rate limiting | |
| Returns: | |
| dict: Prediction result | |
| """ | |
| async with semaphore: | |
| try: | |
| img_b64 = image_to_base64(row_data["image"]) | |
| prediction = await analyze_product_image_async( | |
| image_url=img_b64, | |
| model=model, | |
| api_key=api_key, | |
| provider=provider, | |
| ) | |
| return { | |
| "id": row_data["id"], | |
| "pred_masterCategory": prediction.master_category, | |
| "pred_gender": prediction.gender, | |
| "pred_subCategory": prediction.sub_category, | |
| "pred_description": prediction.description, | |
| } | |
| except Exception as e: | |
| return { | |
| "id": row_data["id"], | |
| "pred_masterCategory": None, | |
| "pred_gender": None, | |
| "pred_subCategory": None, | |
| "pred_description": f"Error: {str(e)}", | |
| } | |
| async def run_inference_on_dataframe_async( | |
| df: pd.DataFrame, | |
| model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct", | |
| api_key: Optional[str] = None, | |
| provider: str = "Fireworks", | |
| max_concurrent_requests: int = 10, | |
| ) -> pd.DataFrame: | |
| """ | |
| Run VLM inference on entire dataframe of images with concurrent requests | |
| Args: | |
| df: DataFrame containing images | |
| model: Model to use for inference | |
| api_key: API key for the provider | |
| provider: Provider to use (Fireworks or OpenAI) | |
| max_concurrent_requests: Maximum number of concurrent API requests (default: 10) | |
| Returns: | |
| pd.DataFrame: Results with columns: | |
| - id: Image ID | |
| - pred_masterCategory: Predicted master category | |
| - pred_gender: Predicted gender | |
| - pred_subCategory: Predicted sub-category | |
| - pred_description: Predicted description | |
| """ | |
| # Create semaphore for rate limiting | |
| semaphore = asyncio.Semaphore(max_concurrent_requests) | |
| # Prepare all rows as dictionaries | |
| rows_data = [ | |
| {"image": row.image, "id": row.id} | |
| for row in df.itertuples(index=False, name="columns") | |
| ] | |
| # Create all tasks (coroutines, not awaited yet) | |
| tasks = [ | |
| _process_single_row(row_data, model, api_key, provider, semaphore) | |
| for row_data in rows_data | |
| ] | |
| _model = model.split("/")[-1] | |
| # Run all tasks concurrently with progress bar | |
| results = [] | |
| for task in async_tqdm.as_completed(tasks, total=len(tasks)): | |
| result = await task | |
| results.append(result) | |
| if len(results) % 10 == 0: | |
| df_pred = pd.DataFrame(results) | |
| file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv" | |
| df_pred.to_csv(file_name, index=False) | |
| # Final save | |
| df_pred = pd.DataFrame(results) | |
| file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv" | |
| df_pred.to_csv(file_name, index=False) | |
| print(f"\nPrediction successful, dataset saved to {file_name}") | |
| return df_pred | |
| def run_inference_on_dataframe( | |
| df: pd.DataFrame, | |
| model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct", | |
| api_key: Optional[str] = None, | |
| provider: str = "Fireworks", | |
| max_concurrent_requests: int = 10, | |
| ) -> pd.DataFrame: | |
| """ | |
| Run VLM inference on entire dataframe of images (sync wrapper for async function) | |
| Args: | |
| df: DataFrame containing images | |
| model: Model to use for inference | |
| api_key: API key for the provider | |
| provider: Provider to use (Fireworks or OpenAI) | |
| max_concurrent_requests: Maximum number of concurrent API requests (default: 10) | |
| Returns: | |
| pd.DataFrame: Results with columns: | |
| - id: Image ID | |
| - pred_masterCategory: Predicted master category | |
| - pred_gender: Predicted gender | |
| - pred_subCategory: Predicted sub-category | |
| - pred_description: Predicted description | |
| """ | |
| return asyncio.run( | |
| run_inference_on_dataframe_async(df, model, api_key, provider, max_concurrent_requests) | |
| ) | |
| def calculate_metrics( | |
| df_ground_truth: pd.DataFrame, | |
| df_predictions: pd.DataFrame, | |
| column: str, | |
| id_col: str = "id", | |
| average: str = "weighted", | |
| ) -> dict: | |
| """ | |
| Calculate precision, recall, and accuracy for a specific column | |
| Args: | |
| df_ground_truth: DataFrame with ground truth labels | |
| df_predictions: DataFrame with predictions | |
| column: Column name to evaluate (e.g., 'master_category', 'gender', 'sub_category') | |
| id_col: Column name for joining the dataframes | |
| average: Averaging method for multi-class metrics ('weighted', 'macro', 'micro') | |
| Returns: | |
| dict: Dictionary containing: | |
| - accuracy: Overall accuracy | |
| - precision: Precision score | |
| - recall: Recall score | |
| - classification_report: Detailed classification report | |
| """ | |
| # Merge ground truth and predictions on ID | |
| merged = df_ground_truth[[id_col, column]].merge( | |
| df_predictions[[id_col, f"pred_{column}"]], on=id_col, how="inner" | |
| ) | |
| # Remove any rows with None predictions (failed inferences) | |
| merged = merged.dropna(subset=[f"pred_{column}"]) | |
| if len(merged) == 0: | |
| raise ValueError("No valid predictions found after merging and removing nulls") | |
| # Get ground truth and predictions | |
| y_true = merged[column] | |
| y_pred = merged[f"pred_{column}"] | |
| # Calculate metrics | |
| accuracy = accuracy_score(y_true, y_pred) | |
| precision = precision_score(y_true, y_pred, average=average, zero_division=0) | |
| recall = recall_score(y_true, y_pred, average=average, zero_division=0) | |
| # Get detailed classification report | |
| report = classification_report(y_true, y_pred, zero_division=0) | |
| return { | |
| "accuracy": accuracy, | |
| "precision": precision, | |
| "recall": recall, | |
| "classification_report": report, | |
| "num_samples": len(merged), | |
| } | |
| def evaluate_all_categories( | |
| df_ground_truth: pd.DataFrame, | |
| df_predictions: pd.DataFrame, | |
| id_col: str = "id", | |
| categories: list[str] = None, | |
| ) -> dict: | |
| """ | |
| Evaluate predictions for all category types | |
| Args: | |
| df_ground_truth: DataFrame with ground truth labels | |
| df_predictions: DataFrame with predictions | |
| id_col: Column name for joining | |
| categories: List of category columns to evaluate | |
| Returns: | |
| dict: Dictionary with metrics for each category | |
| """ | |
| if categories is None: | |
| categories = ["masterCategory", "gender", "subCategory"] | |
| results = {} | |
| for category in categories: | |
| print(f"\n{'='*60}") | |
| print(f"Evaluating: {category}") | |
| print(f"{'='*60}") | |
| try: | |
| metrics = calculate_metrics( | |
| df_ground_truth=df_ground_truth, | |
| df_predictions=df_predictions, | |
| column=category, | |
| id_col=id_col, | |
| ) | |
| results[category] = metrics | |
| # Print summary | |
| print(f"Accuracy: {metrics['accuracy']:.4f}") | |
| print(f"Precision: {metrics['precision']:.4f}") | |
| print(f"Recall: {metrics['recall']:.4f}") | |
| print(f"Samples: {metrics['num_samples']}") | |
| print(f"\nClassification Report:\n{metrics['classification_report']}") | |
| except Exception as e: | |
| print(f"Error evaluating {category}: {e}") | |
| results[category] = {"error": str(e)} | |
| return results | |
| def create_evaluation_summary(results: dict) -> pd.DataFrame: | |
| """ | |
| Create a summary DataFrame from evaluation results | |
| Args: | |
| results: Dictionary of evaluation results from evaluate_all_categories | |
| Returns: | |
| pd.DataFrame: Summary table with metrics for each category | |
| """ | |
| summary_data = [] | |
| for category, metrics in results.items(): | |
| if "error" not in metrics: | |
| summary_data.append( | |
| { | |
| "Category": category, | |
| "Accuracy": f"{metrics['accuracy']:.4f}", | |
| "Precision": f"{metrics['precision']:.4f}", | |
| "Recall": f"{metrics['recall']:.4f}", | |
| "Samples": metrics["num_samples"], | |
| } | |
| ) | |
| else: | |
| summary_data.append( | |
| { | |
| "Category": category, | |
| "Accuracy": "Error", | |
| "Precision": "Error", | |
| "Recall": "Error", | |
| "Samples": 0, | |
| } | |
| ) | |
| return pd.DataFrame(summary_data) | |
| def extract_metrics(results_dict, model_name): | |
| """ | |
| Extract accuracy, precision, and recall for each category. | |
| Args: | |
| results_dict: Dictionary containing evaluation metrics | |
| model_name: Name of the model for identification | |
| Returns: | |
| List of dictionaries with metrics per category | |
| """ | |
| metrics_list = [] | |
| for category, metrics in results_dict.items(): | |
| metrics_list.append({ | |
| 'model': model_name, | |
| 'category': category, | |
| 'accuracy': metrics['accuracy'], | |
| 'precision': metrics['precision'], | |
| 'recall': metrics['recall'], | |
| 'num_samples': metrics['num_samples'] | |
| }) | |
| return metrics_list |