Final-car-boss / app.py
ishanprogs's picture
Upload 5 files
2d078c6 verified
import gradio as gr
import torch
import clip
from PIL import Image
import numpy as np
import os
import cv2
import gc # Garbage collector
import logging
import random # For annotator colors
# --- YOLOv8 Imports ---
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator # For drawing YOLO results
# --- Setup Logging ---
logging.getLogger("ultralytics").setLevel(logging.WARNING) # Reduce YOLO logging noise
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Constants ---
# Damage segmentation classes (Order MUST match the training of 'model_best.pt')
DAMAGE_CLASSES = ['Cracked', 'Scratch', 'Flaking', 'Broken part', 'Corrosion', 'Dent', 'Paint chip', 'Missing part']
NUM_DAMAGE_CLASSES = len(DAMAGE_CLASSES)
# Part segmentation classes (Order MUST match the training of 'partdetection_yolobest.pt')
CAR_PART_CLASSES = [
"Quarter-panel", "Front-wheel", "Back-window", "Trunk", "Front-door",
"Rocker-panel", "Grille", "Windshield", "Front-window", "Back-door",
"Headlight", "Back-wheel", "Back-windshield", "Hood", "Fender",
"Tail-light", "License-plate", "Front-bumper", "Back-bumper", "Mirror",
"Roof"
]
NUM_CAR_PART_CLASSES = len(CAR_PART_CLASSES)
# Paths within the Hugging Face Space repository
CLIP_TEXT_FEATURES_PATH = "./clip_text_features.pt"
DAMAGE_MODEL_WEIGHTS_PATH = "./best.pt" # <--- Your YOLOv8 damage model weights
PART_MODEL_WEIGHTS_PATH = "./partdetection_yolobest.pt" # <--- Your YOLOv8 part model weights
# Prediction Thresholds
DAMAGE_PRED_THRESHOLD = 0.4 # Threshold for showing damage masks
PART_PRED_THRESHOLD = 0.3 # Threshold for showing part masks
# --- Device Setup ---
if torch.cuda.is_available():
DEVICE = "cuda"
logger.info("CUDA available, using GPU.")
else:
DEVICE = "cpu"
logger.info("CUDA not available, using CPU.")
# --- MODEL LOADING (Load models globally ONCE on startup) ---
print("Loading models...")
clip_model = None
clip_preprocess = None
clip_text_features = None
damage_model = None
part_model = None
# --- Load CLIP Model (Model 1) ---
try:
logger.info("Loading CLIP model...")
clip_model, clip_preprocess = clip.load("ViT-B/16", device=DEVICE)
clip_model.eval()
logger.info("CLIP model loaded.")
logger.info(f"Loading CLIP text features from {CLIP_TEXT_FEATURES_PATH}...")
if not os.path.exists(CLIP_TEXT_FEATURES_PATH):
raise FileNotFoundError(f"CLIP text features not found: {CLIP_TEXT_FEATURES_PATH}.")
clip_text_features = torch.load(CLIP_TEXT_FEATURES_PATH, map_location=DEVICE)
logger.info("CLIP text features loaded.")
except Exception as e:
logger.error(f"Error loading CLIP model or features: {e}", exc_info=True)
# Allow app to continue, functions will check for None
# --- Load Damage Segmentation Model (Model 2 - YOLOv8) ---
try:
logger.info(f"Loading Damage Segmentation (YOLOv8) model from {DAMAGE_MODEL_WEIGHTS_PATH}...")
if not os.path.exists(DAMAGE_MODEL_WEIGHTS_PATH):
raise FileNotFoundError(f"Damage model weights not found: {DAMAGE_MODEL_WEIGHTS_PATH}.")
damage_model = YOLO(DAMAGE_MODEL_WEIGHTS_PATH)
damage_model.to(DEVICE) # Ensure model is on the correct device
# Verify class names match
loaded_damage_names = list(damage_model.names.values())
if loaded_damage_names != DAMAGE_CLASSES:
logger.warning(f"Mismatch between defined DAMAGE_CLASSES and names in {DAMAGE_MODEL_WEIGHTS_PATH}")
logger.warning(f" Model names: {loaded_damage_names}")
DAMAGE_CLASSES = loaded_damage_names # Use names from model file
logger.warning(f" Updated DAMAGE_CLASSES to: {DAMAGE_CLASSES}")
logger.info("Damage Segmentation (YOLOv8) model loaded.")
except Exception as e:
logger.error(f"Error loading Damage Segmentation (YOLOv8) model: {e}", exc_info=True)
damage_model = None
# --- Load Part Segmentation Model (Model 3 - YOLOv8) ---
try:
logger.info(f"Loading Part Segmentation (YOLOv8) model from {PART_MODEL_WEIGHTS_PATH}...")
if not os.path.exists(PART_MODEL_WEIGHTS_PATH):
raise FileNotFoundError(f"Part model weights not found: {PART_MODEL_WEIGHTS_PATH}.")
part_model = YOLO(PART_MODEL_WEIGHTS_PATH)
part_model.to(DEVICE) # Ensure model is on the correct device
# Verify class names match
loaded_part_names = list(part_model.names.values())
if loaded_part_names != CAR_PART_CLASSES:
logger.warning(f"Mismatch between defined CAR_PART_CLASSES and names in {PART_MODEL_WEIGHTS_PATH}")
logger.warning(f" Model names: {loaded_part_names}")
CAR_PART_CLASSES = loaded_part_names # Use names from model file
logger.warning(f" Updated CAR_PART_CLASSES to: {CAR_PART_CLASSES}")
logger.info("Part Segmentation (YOLOv8) model loaded.")
except Exception as e:
logger.error(f"Error loading Part Segmentation (YOLOv8) model: {e}", exc_info=True)
part_model = None
print("Model loading complete.")
# --- Prediction Functions ---
def classify_image_clip(image_pil):
"""Classifies image using CLIP. Returns label and probabilities."""
if clip_model is None or clip_text_features is None:
return "Error: CLIP Model Not Loaded", {"Error": 1.0}
try:
# Ensure image is RGB PIL
if image_pil.mode != "RGB":
image_pil = image_pil.convert("RGB")
image_input = clip_preprocess(image_pil).unsqueeze(0).to(DEVICE)
with torch.no_grad():
image_features = clip_model.encode_image(image_input)
image_features /= image_features.norm(dim=-1, keepdim=True)
logit_scale = clip_model.logit_scale.exp()
similarity = (image_features @ clip_text_features.T) * logit_scale
probs = similarity.softmax(dim=-1).squeeze().cpu()
predicted_label = "Car" if probs[0] > probs[1] else "Not Car"
prob_dict = {"Car": f"{probs[0]:.3f}", "Not Car": f"{probs[1]:.3f}"}
return predicted_label, prob_dict
except Exception as e:
logger.error(f"Error during CLIP prediction: {e}", exc_info=True)
return "Error during CLIP processing", {"Error": 1.0}
# --- Combined Processing and Overlap Logic ---
def process_car_image(image_np_bgr):
"""
Runs damage and part segmentation (both YOLOv8), calculates overlap, and returns results.
Returns:
- combined_image_rgb: Image with both part and damage masks drawn.
- assignment_text: String describing damage-part assignments.
"""
if damage_model is None:
logger.error("Damage YOLOv8 model not available.")
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Damage model not loaded."
if part_model is None:
logger.error("Part YOLOv8 model not available.")
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Part model not loaded."
final_assignments = []
# Use original BGR image for drawing, convert to RGB only for final display
annotated_image_bgr = image_np_bgr.copy()
img_h, img_w = image_np_bgr.shape[:2]
try:
# --- 1. Predict Damages (YOLOv8) ---
logger.info("Running Damage Segmentation (YOLOv8)...")
# Use conf threshold here
damage_results = damage_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=DAMAGE_PRED_THRESHOLD)
damage_result = damage_results[0] # Result for the first image
logger.info(f"Found {len(damage_result.boxes)} potential damages.")
damage_masks_np = damage_result.masks.data.cpu().numpy().astype(bool) if damage_result.masks is not None else np.array([])
damage_classes_ids = damage_result.boxes.cls.cpu().numpy().astype(int) if damage_result.boxes is not None else np.array([])
damage_boxes = damage_result.boxes.xyxy.cpu().numpy() if damage_result.boxes is not None else np.array([]) # For drawing
# --- 2. Predict Parts (YOLOv8) ---
logger.info("Running Part Segmentation (YOLOv8)...")
part_results = part_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=PART_PRED_THRESHOLD)
part_result = part_results[0] # Result for the first image
logger.info(f"Found {len(part_result.boxes)} potential parts.")
part_masks_np = part_result.masks.data.cpu().numpy().astype(bool) if part_result.masks is not None else np.array([]) # [N_part, H, W]
part_classes_ids = part_result.boxes.cls.cpu().numpy().astype(int) if part_result.boxes is not None else np.array([])
part_boxes = part_result.boxes.xyxy.cpu().numpy() if part_result.boxes is not None else np.array([]) # For drawing
# --- 3. Resize Masks if Necessary ---
# YOLO segmentation masks might be smaller than original image, resize them
def resize_masks(masks_np, target_h, target_w):
if masks_np.shape[0] == 0 or (masks_np.shape[1] == target_h and masks_np.shape[2] == target_w):
return masks_np # Return if empty or already correct size
logger.info(f"Resizing {masks_np.shape[0]} masks from {masks_np.shape[1:]} to {(target_h, target_w)}")
resized_masks = []
for mask in masks_np:
mask_resized = cv2.resize(mask.astype(np.uint8), (target_w, target_h), interpolation=cv2.INTER_NEAREST)
resized_masks.append(mask_resized.astype(bool))
return np.array(resized_masks)
damage_masks_np = resize_masks(damage_masks_np, img_h, img_w)
part_masks_np = resize_masks(part_masks_np, img_h, img_w)
# --- 4. Calculate Overlap ---
logger.info("Calculating overlap...")
if damage_masks_np.shape[0] > 0 and part_masks_np.shape[0] > 0:
overlap_threshold = 0.4 # Minimum overlap ratio
for i in range(len(damage_masks_np)): # Iterate through each detected damage
damage_mask = damage_masks_np[i]
damage_class_id = damage_classes_ids[i]
try:
damage_name = DAMAGE_CLASSES[damage_class_id]
except IndexError: continue # Skip if invalid class ID
damage_area = np.sum(damage_mask)
if damage_area < 10: continue # Skip tiny damage masks
max_overlap = 0
assigned_part_name = "Unknown / Outside Parts"
for j in range(len(part_masks_np)): # Iterate through each detected part
part_mask = part_masks_np[j]
part_class_id = part_classes_ids[j]
try:
part_name = CAR_PART_CLASSES[part_class_id]
except IndexError: continue # Skip if invalid class ID
intersection = np.logical_and(damage_mask, part_mask)
intersection_area = np.sum(intersection)
overlap_ratio = intersection_area / damage_area if damage_area > 0 else 0
if overlap_ratio > max_overlap:
max_overlap = overlap_ratio
if max_overlap >= overlap_threshold:
assigned_part_name = part_name
assignment_desc = f"{damage_name} in {assigned_part_name}"
if assigned_part_name == "Unknown / Outside Parts":
assignment_desc += f" (Overlap < {overlap_threshold*100:.0f}%)"
final_assignments.append(assignment_desc)
logger.info(f"Overlap result: {assignment_desc}")
elif damage_masks_np.shape[0] > 0: final_assignments.append(f"{len(damage_masks_np)} damages found, but no parts detected/matched.")
elif part_masks_np.shape[0] > 0: final_assignments.append(f"No damages detected (above threshold).")
else: final_assignments.append("No damages or parts detected.")
# --- 5. Visualization using YOLO Annotator ---
logger.info("Visualizing results...")
# Create annotator ONCE on the BGR image copy
annotator = Annotator(annotated_image_bgr, line_width=2, example=part_model.names) # Use part model names
# Draw PART masks first (Green boxes, light semi-transparent masks)
if part_result.masks is not None:
colors_part = [(0, random.randint(100, 200), 0) for _ in part_classes_ids] # Shades of green
annotator.masks(part_result.masks.data, colors=colors_part, alpha=0.3)
for box, cls_id in zip(part_boxes, part_classes_ids):
try:
label = f"{CAR_PART_CLASSES[cls_id]}"
annotator.box_label(box, label=label, color=(0, 200, 0)) # Darker green box/text
except IndexError: continue
# Draw DAMAGE masks second (Red boxes, light semi-transparent masks)
if damage_result.masks is not None:
colors_dmg = [(random.randint(100, 200), 0, 0) for _ in damage_classes_ids] # Shades of red
annotator.masks(damage_result.masks.data, colors=colors_dmg, alpha=0.4) # Slightly more opaque
for box, cls_id in zip(damage_boxes, damage_classes_ids):
try:
label = f"{DAMAGE_CLASSES[cls_id]}"
annotator.box_label(box, label=label, color=(200, 0, 0)) # Darker red box/text
except IndexError: continue
# Get the final annotated image (still BGR)
annotated_image_bgr = annotator.result()
except Exception as e:
logger.error(f"Error during combined processing: {e}", exc_info=True)
final_assignments.append("Error during processing.")
# Return original image in case of error (but annotated_image_bgr might exist)
# --- Prepare output ---
assignment_text = "\n".join(final_assignments) if final_assignments else "No specific damage assignments."
# Convert final annotated image to RGB for Gradio display
final_output_image_rgb = cv2.cvtColor(annotated_image_bgr, cv2.COLOR_BGR2RGB)
return final_output_image_rgb, assignment_text
# --- Main Gradio Function ---
def predict_pipeline(image_np_input):
"""
Main pipeline: Classify -> Segment -> Assign -> Visualize
"""
if image_np_input is None:
return "Please upload an image.", {}, None, "N/A"
logger.info("Received image for processing...")
final_output_image = None
assignment_text = "Processing..."
classification_result = "Error"
probabilities = {}
# --- Stage 1: CLIP Classification ---
try:
image_pil = Image.fromarray(cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB))
classification_result, probabilities = classify_image_clip(image_pil)
logger.info(f"CLIP Result: {classification_result}, Probs: {probabilities}")
except Exception as e:
logger.error(f"Error in CLIP stage: {e}", exc_info=True)
assignment_text = "Error during classification."
# Show original image in case of classification error
final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB)
# --- Stage 2 & 3: Segmentation and Assignment (if 'Car') ---
if classification_result == "Car":
logger.info("Image classified as Car. Running segmentation and assignment...")
try:
# Pass the original BGR numpy array
final_output_image, assignment_text = process_car_image(image_np_input)
except Exception as e:
logger.error(f"Error in segmentation/assignment stage: {e}", exc_info=True)
assignment_text = "Error during segmentation/assignment."
# Show original image in case of processing error
final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB)
elif classification_result == "Not Car":
logger.info("Image classified as Not Car.")
final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB) # Show original
assignment_text = "Image classified as Not Car."
# Else: Handle CLIP error case (already logged, show original image)
elif final_output_image is None: # Ensure image is set if CLIP error occurred
final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB)
# --- Cleanup ---
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Return all results
return classification_result, probabilities, final_output_image, assignment_text
# --- Gradio Interface ---
logger.info("Setting up Gradio interface...")
title = "πŸš— Car Damage Analysis Pipeline (YOLOv8)"
description = """
1. **Upload** an image of a vehicle.
2. **Classification:** Determines if the image contains a car (using CLIP).
3. **Segmentation:** If it's a car, detects car parts and damages (using YOLOv8 for both).
4. **Assignment:** Assigns detected damages to the corresponding car part based on mask overlap.
5. **Output:** Shows the image with overlaid masks (Green=Part, Red=Damage) and lists the damage assignments.
"""
examples = [] # Add example image paths if uploaded
# Define Inputs and Outputs
input_image = gr.Image(type="numpy", label="Upload Car Image")
output_classification = gr.Textbox(label="1. Classification Result")
output_probabilities = gr.Label(label="Classification Probabilities")
output_image_display = gr.Image(type="numpy", label="3. Segmentation Visualization")
output_assignment = gr.Textbox(label="2. Damage Assignments", lines=5, interactive=False)
# Launch the interface
iface = gr.Interface(
fn=predict_pipeline,
inputs=input_image,
outputs=[output_classification, output_probabilities, output_image_display, output_assignment],
title=title,
description=description,
examples=examples,
allow_flagging="never"
)
if __name__ == "__main__":
logger.info("Launching Gradio app...")
iface.launch()