Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import clip | |
| from PIL import Image | |
| import numpy as np | |
| import os | |
| import cv2 | |
| import gc # Garbage collector | |
| import logging | |
| import random # For annotator colors | |
| # --- YOLOv8 Imports --- | |
| from ultralytics import YOLO | |
| from ultralytics.utils.plotting import Annotator # For drawing YOLO results | |
| # --- Setup Logging --- | |
| logging.getLogger("ultralytics").setLevel(logging.WARNING) # Reduce YOLO logging noise | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- Constants --- | |
| # Damage segmentation classes (Order MUST match the training of 'model_best.pt') | |
| DAMAGE_CLASSES = ['Cracked', 'Scratch', 'Flaking', 'Broken part', 'Corrosion', 'Dent', 'Paint chip', 'Missing part'] | |
| NUM_DAMAGE_CLASSES = len(DAMAGE_CLASSES) | |
| # Part segmentation classes (Order MUST match the training of 'partdetection_yolobest.pt') | |
| CAR_PART_CLASSES = [ | |
| "Quarter-panel", "Front-wheel", "Back-window", "Trunk", "Front-door", | |
| "Rocker-panel", "Grille", "Windshield", "Front-window", "Back-door", | |
| "Headlight", "Back-wheel", "Back-windshield", "Hood", "Fender", | |
| "Tail-light", "License-plate", "Front-bumper", "Back-bumper", "Mirror", | |
| "Roof" | |
| ] | |
| NUM_CAR_PART_CLASSES = len(CAR_PART_CLASSES) | |
| # Paths within the Hugging Face Space repository | |
| CLIP_TEXT_FEATURES_PATH = "./clip_text_features.pt" | |
| DAMAGE_MODEL_WEIGHTS_PATH = "./best.pt" # <--- Your YOLOv8 damage model weights | |
| PART_MODEL_WEIGHTS_PATH = "./partdetection_yolobest.pt" # <--- Your YOLOv8 part model weights | |
| # Prediction Thresholds | |
| DAMAGE_PRED_THRESHOLD = 0.4 # Threshold for showing damage masks | |
| PART_PRED_THRESHOLD = 0.3 # Threshold for showing part masks | |
| # --- Device Setup --- | |
| if torch.cuda.is_available(): | |
| DEVICE = "cuda" | |
| logger.info("CUDA available, using GPU.") | |
| else: | |
| DEVICE = "cpu" | |
| logger.info("CUDA not available, using CPU.") | |
| # --- MODEL LOADING (Load models globally ONCE on startup) --- | |
| print("Loading models...") | |
| clip_model = None | |
| clip_preprocess = None | |
| clip_text_features = None | |
| damage_model = None | |
| part_model = None | |
| # --- Load CLIP Model (Model 1) --- | |
| try: | |
| logger.info("Loading CLIP model...") | |
| clip_model, clip_preprocess = clip.load("ViT-B/16", device=DEVICE) | |
| clip_model.eval() | |
| logger.info("CLIP model loaded.") | |
| logger.info(f"Loading CLIP text features from {CLIP_TEXT_FEATURES_PATH}...") | |
| if not os.path.exists(CLIP_TEXT_FEATURES_PATH): | |
| raise FileNotFoundError(f"CLIP text features not found: {CLIP_TEXT_FEATURES_PATH}.") | |
| clip_text_features = torch.load(CLIP_TEXT_FEATURES_PATH, map_location=DEVICE) | |
| logger.info("CLIP text features loaded.") | |
| except Exception as e: | |
| logger.error(f"Error loading CLIP model or features: {e}", exc_info=True) | |
| # Allow app to continue, functions will check for None | |
| # --- Load Damage Segmentation Model (Model 2 - YOLOv8) --- | |
| try: | |
| logger.info(f"Loading Damage Segmentation (YOLOv8) model from {DAMAGE_MODEL_WEIGHTS_PATH}...") | |
| if not os.path.exists(DAMAGE_MODEL_WEIGHTS_PATH): | |
| raise FileNotFoundError(f"Damage model weights not found: {DAMAGE_MODEL_WEIGHTS_PATH}.") | |
| damage_model = YOLO(DAMAGE_MODEL_WEIGHTS_PATH) | |
| damage_model.to(DEVICE) # Ensure model is on the correct device | |
| # Verify class names match | |
| loaded_damage_names = list(damage_model.names.values()) | |
| if loaded_damage_names != DAMAGE_CLASSES: | |
| logger.warning(f"Mismatch between defined DAMAGE_CLASSES and names in {DAMAGE_MODEL_WEIGHTS_PATH}") | |
| logger.warning(f" Model names: {loaded_damage_names}") | |
| DAMAGE_CLASSES = loaded_damage_names # Use names from model file | |
| logger.warning(f" Updated DAMAGE_CLASSES to: {DAMAGE_CLASSES}") | |
| logger.info("Damage Segmentation (YOLOv8) model loaded.") | |
| except Exception as e: | |
| logger.error(f"Error loading Damage Segmentation (YOLOv8) model: {e}", exc_info=True) | |
| damage_model = None | |
| # --- Load Part Segmentation Model (Model 3 - YOLOv8) --- | |
| try: | |
| logger.info(f"Loading Part Segmentation (YOLOv8) model from {PART_MODEL_WEIGHTS_PATH}...") | |
| if not os.path.exists(PART_MODEL_WEIGHTS_PATH): | |
| raise FileNotFoundError(f"Part model weights not found: {PART_MODEL_WEIGHTS_PATH}.") | |
| part_model = YOLO(PART_MODEL_WEIGHTS_PATH) | |
| part_model.to(DEVICE) # Ensure model is on the correct device | |
| # Verify class names match | |
| loaded_part_names = list(part_model.names.values()) | |
| if loaded_part_names != CAR_PART_CLASSES: | |
| logger.warning(f"Mismatch between defined CAR_PART_CLASSES and names in {PART_MODEL_WEIGHTS_PATH}") | |
| logger.warning(f" Model names: {loaded_part_names}") | |
| CAR_PART_CLASSES = loaded_part_names # Use names from model file | |
| logger.warning(f" Updated CAR_PART_CLASSES to: {CAR_PART_CLASSES}") | |
| logger.info("Part Segmentation (YOLOv8) model loaded.") | |
| except Exception as e: | |
| logger.error(f"Error loading Part Segmentation (YOLOv8) model: {e}", exc_info=True) | |
| part_model = None | |
| print("Model loading complete.") | |
| # --- Prediction Functions --- | |
| def classify_image_clip(image_pil): | |
| """Classifies image using CLIP. Returns label and probabilities.""" | |
| if clip_model is None or clip_text_features is None: | |
| return "Error: CLIP Model Not Loaded", {"Error": 1.0} | |
| try: | |
| # Ensure image is RGB PIL | |
| if image_pil.mode != "RGB": | |
| image_pil = image_pil.convert("RGB") | |
| image_input = clip_preprocess(image_pil).unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| image_features = clip_model.encode_image(image_input) | |
| image_features /= image_features.norm(dim=-1, keepdim=True) | |
| logit_scale = clip_model.logit_scale.exp() | |
| similarity = (image_features @ clip_text_features.T) * logit_scale | |
| probs = similarity.softmax(dim=-1).squeeze().cpu() | |
| predicted_label = "Car" if probs[0] > probs[1] else "Not Car" | |
| prob_dict = {"Car": f"{probs[0]:.3f}", "Not Car": f"{probs[1]:.3f}"} | |
| return predicted_label, prob_dict | |
| except Exception as e: | |
| logger.error(f"Error during CLIP prediction: {e}", exc_info=True) | |
| return "Error during CLIP processing", {"Error": 1.0} | |
| # --- Combined Processing and Overlap Logic --- | |
| def process_car_image(image_np_bgr): | |
| """ | |
| Runs damage and part segmentation (both YOLOv8), calculates overlap, and returns results. | |
| Returns: | |
| - combined_image_rgb: Image with both part and damage masks drawn. | |
| - assignment_text: String describing damage-part assignments. | |
| """ | |
| if damage_model is None: | |
| logger.error("Damage YOLOv8 model not available.") | |
| return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Damage model not loaded." | |
| if part_model is None: | |
| logger.error("Part YOLOv8 model not available.") | |
| return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Part model not loaded." | |
| final_assignments = [] | |
| # Use original BGR image for drawing, convert to RGB only for final display | |
| annotated_image_bgr = image_np_bgr.copy() | |
| img_h, img_w = image_np_bgr.shape[:2] | |
| try: | |
| # --- 1. Predict Damages (YOLOv8) --- | |
| logger.info("Running Damage Segmentation (YOLOv8)...") | |
| # Use conf threshold here | |
| damage_results = damage_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=DAMAGE_PRED_THRESHOLD) | |
| damage_result = damage_results[0] # Result for the first image | |
| logger.info(f"Found {len(damage_result.boxes)} potential damages.") | |
| damage_masks_np = damage_result.masks.data.cpu().numpy().astype(bool) if damage_result.masks is not None else np.array([]) | |
| damage_classes_ids = damage_result.boxes.cls.cpu().numpy().astype(int) if damage_result.boxes is not None else np.array([]) | |
| damage_boxes = damage_result.boxes.xyxy.cpu().numpy() if damage_result.boxes is not None else np.array([]) # For drawing | |
| # --- 2. Predict Parts (YOLOv8) --- | |
| logger.info("Running Part Segmentation (YOLOv8)...") | |
| part_results = part_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=PART_PRED_THRESHOLD) | |
| part_result = part_results[0] # Result for the first image | |
| logger.info(f"Found {len(part_result.boxes)} potential parts.") | |
| part_masks_np = part_result.masks.data.cpu().numpy().astype(bool) if part_result.masks is not None else np.array([]) # [N_part, H, W] | |
| part_classes_ids = part_result.boxes.cls.cpu().numpy().astype(int) if part_result.boxes is not None else np.array([]) | |
| part_boxes = part_result.boxes.xyxy.cpu().numpy() if part_result.boxes is not None else np.array([]) # For drawing | |
| # --- 3. Resize Masks if Necessary --- | |
| # YOLO segmentation masks might be smaller than original image, resize them | |
| def resize_masks(masks_np, target_h, target_w): | |
| if masks_np.shape[0] == 0 or (masks_np.shape[1] == target_h and masks_np.shape[2] == target_w): | |
| return masks_np # Return if empty or already correct size | |
| logger.info(f"Resizing {masks_np.shape[0]} masks from {masks_np.shape[1:]} to {(target_h, target_w)}") | |
| resized_masks = [] | |
| for mask in masks_np: | |
| mask_resized = cv2.resize(mask.astype(np.uint8), (target_w, target_h), interpolation=cv2.INTER_NEAREST) | |
| resized_masks.append(mask_resized.astype(bool)) | |
| return np.array(resized_masks) | |
| damage_masks_np = resize_masks(damage_masks_np, img_h, img_w) | |
| part_masks_np = resize_masks(part_masks_np, img_h, img_w) | |
| # --- 4. Calculate Overlap --- | |
| logger.info("Calculating overlap...") | |
| if damage_masks_np.shape[0] > 0 and part_masks_np.shape[0] > 0: | |
| overlap_threshold = 0.4 # Minimum overlap ratio | |
| for i in range(len(damage_masks_np)): # Iterate through each detected damage | |
| damage_mask = damage_masks_np[i] | |
| damage_class_id = damage_classes_ids[i] | |
| try: | |
| damage_name = DAMAGE_CLASSES[damage_class_id] | |
| except IndexError: continue # Skip if invalid class ID | |
| damage_area = np.sum(damage_mask) | |
| if damage_area < 10: continue # Skip tiny damage masks | |
| max_overlap = 0 | |
| assigned_part_name = "Unknown / Outside Parts" | |
| for j in range(len(part_masks_np)): # Iterate through each detected part | |
| part_mask = part_masks_np[j] | |
| part_class_id = part_classes_ids[j] | |
| try: | |
| part_name = CAR_PART_CLASSES[part_class_id] | |
| except IndexError: continue # Skip if invalid class ID | |
| intersection = np.logical_and(damage_mask, part_mask) | |
| intersection_area = np.sum(intersection) | |
| overlap_ratio = intersection_area / damage_area if damage_area > 0 else 0 | |
| if overlap_ratio > max_overlap: | |
| max_overlap = overlap_ratio | |
| if max_overlap >= overlap_threshold: | |
| assigned_part_name = part_name | |
| assignment_desc = f"{damage_name} in {assigned_part_name}" | |
| if assigned_part_name == "Unknown / Outside Parts": | |
| assignment_desc += f" (Overlap < {overlap_threshold*100:.0f}%)" | |
| final_assignments.append(assignment_desc) | |
| logger.info(f"Overlap result: {assignment_desc}") | |
| elif damage_masks_np.shape[0] > 0: final_assignments.append(f"{len(damage_masks_np)} damages found, but no parts detected/matched.") | |
| elif part_masks_np.shape[0] > 0: final_assignments.append(f"No damages detected (above threshold).") | |
| else: final_assignments.append("No damages or parts detected.") | |
| # --- 5. Visualization using YOLO Annotator --- | |
| logger.info("Visualizing results...") | |
| # Create annotator ONCE on the BGR image copy | |
| annotator = Annotator(annotated_image_bgr, line_width=2, example=part_model.names) # Use part model names | |
| # Draw PART masks first (Green boxes, light semi-transparent masks) | |
| if part_result.masks is not None: | |
| colors_part = [(0, random.randint(100, 200), 0) for _ in part_classes_ids] # Shades of green | |
| annotator.masks(part_result.masks.data, colors=colors_part, alpha=0.3) | |
| for box, cls_id in zip(part_boxes, part_classes_ids): | |
| try: | |
| label = f"{CAR_PART_CLASSES[cls_id]}" | |
| annotator.box_label(box, label=label, color=(0, 200, 0)) # Darker green box/text | |
| except IndexError: continue | |
| # Draw DAMAGE masks second (Red boxes, light semi-transparent masks) | |
| if damage_result.masks is not None: | |
| colors_dmg = [(random.randint(100, 200), 0, 0) for _ in damage_classes_ids] # Shades of red | |
| annotator.masks(damage_result.masks.data, colors=colors_dmg, alpha=0.4) # Slightly more opaque | |
| for box, cls_id in zip(damage_boxes, damage_classes_ids): | |
| try: | |
| label = f"{DAMAGE_CLASSES[cls_id]}" | |
| annotator.box_label(box, label=label, color=(200, 0, 0)) # Darker red box/text | |
| except IndexError: continue | |
| # Get the final annotated image (still BGR) | |
| annotated_image_bgr = annotator.result() | |
| except Exception as e: | |
| logger.error(f"Error during combined processing: {e}", exc_info=True) | |
| final_assignments.append("Error during processing.") | |
| # Return original image in case of error (but annotated_image_bgr might exist) | |
| # --- Prepare output --- | |
| assignment_text = "\n".join(final_assignments) if final_assignments else "No specific damage assignments." | |
| # Convert final annotated image to RGB for Gradio display | |
| final_output_image_rgb = cv2.cvtColor(annotated_image_bgr, cv2.COLOR_BGR2RGB) | |
| return final_output_image_rgb, assignment_text | |
| # --- Main Gradio Function --- | |
| def predict_pipeline(image_np_input): | |
| """ | |
| Main pipeline: Classify -> Segment -> Assign -> Visualize | |
| """ | |
| if image_np_input is None: | |
| return "Please upload an image.", {}, None, "N/A" | |
| logger.info("Received image for processing...") | |
| final_output_image = None | |
| assignment_text = "Processing..." | |
| classification_result = "Error" | |
| probabilities = {} | |
| # --- Stage 1: CLIP Classification --- | |
| try: | |
| image_pil = Image.fromarray(cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB)) | |
| classification_result, probabilities = classify_image_clip(image_pil) | |
| logger.info(f"CLIP Result: {classification_result}, Probs: {probabilities}") | |
| except Exception as e: | |
| logger.error(f"Error in CLIP stage: {e}", exc_info=True) | |
| assignment_text = "Error during classification." | |
| # Show original image in case of classification error | |
| final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB) | |
| # --- Stage 2 & 3: Segmentation and Assignment (if 'Car') --- | |
| if classification_result == "Car": | |
| logger.info("Image classified as Car. Running segmentation and assignment...") | |
| try: | |
| # Pass the original BGR numpy array | |
| final_output_image, assignment_text = process_car_image(image_np_input) | |
| except Exception as e: | |
| logger.error(f"Error in segmentation/assignment stage: {e}", exc_info=True) | |
| assignment_text = "Error during segmentation/assignment." | |
| # Show original image in case of processing error | |
| final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB) | |
| elif classification_result == "Not Car": | |
| logger.info("Image classified as Not Car.") | |
| final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB) # Show original | |
| assignment_text = "Image classified as Not Car." | |
| # Else: Handle CLIP error case (already logged, show original image) | |
| elif final_output_image is None: # Ensure image is set if CLIP error occurred | |
| final_output_image = cv2.cvtColor(image_np_input, cv2.COLOR_BGR2RGB) | |
| # --- Cleanup --- | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Return all results | |
| return classification_result, probabilities, final_output_image, assignment_text | |
| # --- Gradio Interface --- | |
| logger.info("Setting up Gradio interface...") | |
| title = "π Car Damage Analysis Pipeline (YOLOv8)" | |
| description = """ | |
| 1. **Upload** an image of a vehicle. | |
| 2. **Classification:** Determines if the image contains a car (using CLIP). | |
| 3. **Segmentation:** If it's a car, detects car parts and damages (using YOLOv8 for both). | |
| 4. **Assignment:** Assigns detected damages to the corresponding car part based on mask overlap. | |
| 5. **Output:** Shows the image with overlaid masks (Green=Part, Red=Damage) and lists the damage assignments. | |
| """ | |
| examples = [] # Add example image paths if uploaded | |
| # Define Inputs and Outputs | |
| input_image = gr.Image(type="numpy", label="Upload Car Image") | |
| output_classification = gr.Textbox(label="1. Classification Result") | |
| output_probabilities = gr.Label(label="Classification Probabilities") | |
| output_image_display = gr.Image(type="numpy", label="3. Segmentation Visualization") | |
| output_assignment = gr.Textbox(label="2. Damage Assignments", lines=5, interactive=False) | |
| # Launch the interface | |
| iface = gr.Interface( | |
| fn=predict_pipeline, | |
| inputs=input_image, | |
| outputs=[output_classification, output_probabilities, output_image_display, output_assignment], | |
| title=title, | |
| description=description, | |
| examples=examples, | |
| allow_flagging="never" | |
| ) | |
| if __name__ == "__main__": | |
| logger.info("Launching Gradio app...") | |
| iface.launch() |