RobertoBarrosoLuque
Add notebook with evals
5515ef5
import pandas as pd
from typing import Optional
from sklearn.metrics import (
precision_score,
recall_score,
accuracy_score,
classification_report,
)
from tqdm.asyncio import tqdm as async_tqdm
import asyncio
from src.modules.vlm_inference import analyze_product_image_async
from src.modules.data_processing import image_to_base64
from pathlib import Path
DATA_PATH = Path(__file__).parents[2] / "data"
async def _process_single_row(
row_data: dict,
model: str,
api_key: str,
provider: str,
semaphore: asyncio.Semaphore,
) -> dict:
"""
Process a single row with semaphore control
Args:
row_data: Dictionary with 'image' and 'id' keys
model: Model to use for inference
api_key: API key for the provider
provider: Provider to use
semaphore: Asyncio semaphore for rate limiting
Returns:
dict: Prediction result
"""
async with semaphore:
try:
img_b64 = image_to_base64(row_data["image"])
prediction = await analyze_product_image_async(
image_url=img_b64,
model=model,
api_key=api_key,
provider=provider,
)
return {
"id": row_data["id"],
"pred_masterCategory": prediction.master_category,
"pred_gender": prediction.gender,
"pred_subCategory": prediction.sub_category,
"pred_description": prediction.description,
}
except Exception as e:
return {
"id": row_data["id"],
"pred_masterCategory": None,
"pred_gender": None,
"pred_subCategory": None,
"pred_description": f"Error: {str(e)}",
}
async def run_inference_on_dataframe_async(
df: pd.DataFrame,
model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct",
api_key: Optional[str] = None,
provider: str = "Fireworks",
max_concurrent_requests: int = 10,
) -> pd.DataFrame:
"""
Run VLM inference on entire dataframe of images with concurrent requests
Args:
df: DataFrame containing images
model: Model to use for inference
api_key: API key for the provider
provider: Provider to use (Fireworks or OpenAI)
max_concurrent_requests: Maximum number of concurrent API requests (default: 10)
Returns:
pd.DataFrame: Results with columns:
- id: Image ID
- pred_masterCategory: Predicted master category
- pred_gender: Predicted gender
- pred_subCategory: Predicted sub-category
- pred_description: Predicted description
"""
# Create semaphore for rate limiting
semaphore = asyncio.Semaphore(max_concurrent_requests)
# Prepare all rows as dictionaries
rows_data = [
{"image": row.image, "id": row.id}
for row in df.itertuples(index=False, name="columns")
]
# Create all tasks (coroutines, not awaited yet)
tasks = [
_process_single_row(row_data, model, api_key, provider, semaphore)
for row_data in rows_data
]
_model = model.split("/")[-1]
# Run all tasks concurrently with progress bar
results = []
for task in async_tqdm.as_completed(tasks, total=len(tasks)):
result = await task
results.append(result)
if len(results) % 10 == 0:
df_pred = pd.DataFrame(results)
file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv"
df_pred.to_csv(file_name, index=False)
# Final save
df_pred = pd.DataFrame(results)
file_name = DATA_PATH / f"df_pred_{provider}_{_model}.csv"
df_pred.to_csv(file_name, index=False)
print(f"\nPrediction successful, dataset saved to {file_name}")
return df_pred
def run_inference_on_dataframe(
df: pd.DataFrame,
model: str = "accounts/fireworks/models/qwen2p5-vl-72b-instruct",
api_key: Optional[str] = None,
provider: str = "Fireworks",
max_concurrent_requests: int = 10,
) -> pd.DataFrame:
"""
Run VLM inference on entire dataframe of images (sync wrapper for async function)
Args:
df: DataFrame containing images
model: Model to use for inference
api_key: API key for the provider
provider: Provider to use (Fireworks or OpenAI)
max_concurrent_requests: Maximum number of concurrent API requests (default: 10)
Returns:
pd.DataFrame: Results with columns:
- id: Image ID
- pred_masterCategory: Predicted master category
- pred_gender: Predicted gender
- pred_subCategory: Predicted sub-category
- pred_description: Predicted description
"""
return asyncio.run(
run_inference_on_dataframe_async(df, model, api_key, provider, max_concurrent_requests)
)
def calculate_metrics(
df_ground_truth: pd.DataFrame,
df_predictions: pd.DataFrame,
column: str,
id_col: str = "id",
average: str = "weighted",
) -> dict:
"""
Calculate precision, recall, and accuracy for a specific column
Args:
df_ground_truth: DataFrame with ground truth labels
df_predictions: DataFrame with predictions
column: Column name to evaluate (e.g., 'master_category', 'gender', 'sub_category')
id_col: Column name for joining the dataframes
average: Averaging method for multi-class metrics ('weighted', 'macro', 'micro')
Returns:
dict: Dictionary containing:
- accuracy: Overall accuracy
- precision: Precision score
- recall: Recall score
- classification_report: Detailed classification report
"""
# Merge ground truth and predictions on ID
merged = df_ground_truth[[id_col, column]].merge(
df_predictions[[id_col, f"pred_{column}"]], on=id_col, how="inner"
)
# Remove any rows with None predictions (failed inferences)
merged = merged.dropna(subset=[f"pred_{column}"])
if len(merged) == 0:
raise ValueError("No valid predictions found after merging and removing nulls")
# Get ground truth and predictions
y_true = merged[column]
y_pred = merged[f"pred_{column}"]
# Calculate metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average=average, zero_division=0)
recall = recall_score(y_true, y_pred, average=average, zero_division=0)
# Get detailed classification report
report = classification_report(y_true, y_pred, zero_division=0)
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"classification_report": report,
"num_samples": len(merged),
}
def evaluate_all_categories(
df_ground_truth: pd.DataFrame,
df_predictions: pd.DataFrame,
id_col: str = "id",
categories: list[str] = None,
) -> dict:
"""
Evaluate predictions for all category types
Args:
df_ground_truth: DataFrame with ground truth labels
df_predictions: DataFrame with predictions
id_col: Column name for joining
categories: List of category columns to evaluate
Returns:
dict: Dictionary with metrics for each category
"""
if categories is None:
categories = ["masterCategory", "gender", "subCategory"]
results = {}
for category in categories:
print(f"\n{'='*60}")
print(f"Evaluating: {category}")
print(f"{'='*60}")
try:
metrics = calculate_metrics(
df_ground_truth=df_ground_truth,
df_predictions=df_predictions,
column=category,
id_col=id_col,
)
results[category] = metrics
# Print summary
print(f"Accuracy: {metrics['accuracy']:.4f}")
print(f"Precision: {metrics['precision']:.4f}")
print(f"Recall: {metrics['recall']:.4f}")
print(f"Samples: {metrics['num_samples']}")
print(f"\nClassification Report:\n{metrics['classification_report']}")
except Exception as e:
print(f"Error evaluating {category}: {e}")
results[category] = {"error": str(e)}
return results
def create_evaluation_summary(results: dict) -> pd.DataFrame:
"""
Create a summary DataFrame from evaluation results
Args:
results: Dictionary of evaluation results from evaluate_all_categories
Returns:
pd.DataFrame: Summary table with metrics for each category
"""
summary_data = []
for category, metrics in results.items():
if "error" not in metrics:
summary_data.append(
{
"Category": category,
"Accuracy": f"{metrics['accuracy']:.4f}",
"Precision": f"{metrics['precision']:.4f}",
"Recall": f"{metrics['recall']:.4f}",
"Samples": metrics["num_samples"],
}
)
else:
summary_data.append(
{
"Category": category,
"Accuracy": "Error",
"Precision": "Error",
"Recall": "Error",
"Samples": 0,
}
)
return pd.DataFrame(summary_data)
def extract_metrics(results_dict, model_name):
"""
Extract accuracy, precision, and recall for each category.
Args:
results_dict: Dictionary containing evaluation metrics
model_name: Name of the model for identification
Returns:
List of dictionaries with metrics per category
"""
metrics_list = []
for category, metrics in results_dict.items():
metrics_list.append({
'model': model_name,
'category': category,
'accuracy': metrics['accuracy'],
'precision': metrics['precision'],
'recall': metrics['recall'],
'num_samples': metrics['num_samples']
})
return metrics_list