import io import os import gc import re import cv2 import time import zipfile import tempfile import traceback import numpy as np import gradio as gr import imgutils.detect.person as person_detector import imgutils.detect.halfbody as halfbody_detector import imgutils.detect.head as head_detector import imgutils.detect.face as face_detector import imgutils.metrics.ccip as ccip_analyzer import imgutils.metrics.dbaesthetic as dbaesthetic_analyzer import imgutils.metrics.lpips as lpips_module from PIL import Image from typing import List, Tuple, Dict, Any, Union, Optional, Iterator # --- Constants for File Types --- IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.tif', '.gif') VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.webm', '.mpeg', '.mpg') # --- Helper Functions --- def sanitize_filename(filename: str, max_len: int = 50) -> str: """Removes invalid characters and shortens a filename for safe use.""" # Remove path components base_name = os.path.basename(filename) # Remove extension name_part, _ = os.path.splitext(base_name) # Replace spaces and problematic characters with underscores sanitized = re.sub(r'[\\/*?:"<>|\s]+', '_', name_part) # Remove leading/trailing underscores/periods sanitized = sanitized.strip('._') # Limit length (important for temp paths and OS limits) sanitized = sanitized[:max_len] # Ensure it's not empty after sanitization if not sanitized: return "file" return sanitized def convert_to_pil(frame: np.ndarray) -> Image.Image: """Converts an OpenCV frame (BGR) to a PIL Image (RGB).""" # Add error handling for potentially empty frames if frame is None or frame.size == 0: raise ValueError("Cannot convert empty frame to PIL Image") try: return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) except Exception as e: # Re-raise with more context if conversion fails raise RuntimeError(f"Failed to convert frame to PIL Image: {e}") def image_to_bytes(img: Image.Image, format: str = 'PNG') -> bytes: """Converts a PIL Image to bytes.""" if img is None: raise ValueError("Cannot convert None image to bytes") byte_arr = io.BytesIO() img.save(byte_arr, format=format) return byte_arr.getvalue() def create_zip_file(image_data: Dict[str, bytes], output_path: str) -> None: """ Creates a zip file containing the provided images directly at the output_path. Args: image_data: A dictionary where keys are filenames (including paths within zip) and values are image bytes. output_path: The full path where the zip file should be created. """ if not image_data: raise ValueError("No image data provided to create zip file.") if not output_path: raise ValueError("No output path provided for the zip file.") print(f"Creating zip file at: {output_path}") try: # Ensure parent directory exists (useful if output_path is nested) # Though NamedTemporaryFile usually handles this for its own path. parent_dir = os.path.dirname(output_path) if parent_dir: # Check if there is a parent directory component os.makedirs(parent_dir, exist_ok=True) with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Sort items for potentially better organization and predictability for filename, img_bytes in sorted(image_data.items()): zipf.writestr(filename, img_bytes) print(f"Successfully created zip file with {len(image_data)} items at {output_path}.") # No return value needed as we are writing to a path except Exception as e: print(f"Error creating zip file at {output_path}: {e}") # If zip creation fails, attempt to remove the partially created file if os.path.exists(output_path): try: os.remove(output_path) print(f"Removed partially created/failed zip file: {output_path}") except OSError as remove_err: print(f"Warning: Could not remove failed zip file {output_path}: {remove_err}") raise # Re-raise the original exception def generate_filename( base_name: str, # Should be the core identifier, e.g., "frame_X_person_Y_scoreZ" aesthetic_label: Optional[str] = None, ccip_cluster_id_for_lpips_logic: Optional[int] = None, # Original CCIP ID, used to decide if LPIPS is sub-cluster ccip_folder_naming_index: Optional[int] = None, # The new 000, 001, ... index based on image count source_prefix_for_ccip_folder: Optional[str] = None, # The source filename prefix for CCIP folder lpips_folder_naming_index: Optional[Union[int, str]] = None, # New: Can be int (0,1,2...) or "noise" file_extension: str = '.png', # Suffix flags for this specific image: is_halfbody_primary_target_type: bool = False, # If this image itself was a halfbody primary target is_derived_head_crop: bool = False, is_derived_face_crop: bool = False, ) -> str: """ Generates the final filename, incorporating aesthetic label, cluster directory, and crop indicators. CCIP and LPIPS folder names are sorted by image count. """ filename_stem = base_name # Add suffixes for derived crops. # For halfbody primary targets, the base_name should already contain "halfbody". # This flag is more for potentially adding a suffix if desired, but currently not used to add a suffix. # if is_halfbody_primary_target_type: # filename_stem += "_halfbody" # Potentially redundant if base_name good. if is_derived_head_crop: filename_stem += "_headCrop" if is_derived_face_crop: filename_stem += "_faceCrop" filename_with_extension = filename_stem + file_extension path_parts = [] # New CCIP folder naming based on source prefix and sorted index if ccip_folder_naming_index is not None and source_prefix_for_ccip_folder is not None: path_parts.append(f"{source_prefix_for_ccip_folder}_ccip_{ccip_folder_naming_index:03d}") # LPIPS folder naming based on the new sorted index or "noise" if lpips_folder_naming_index is not None: lpips_folder_name_part_str: Optional[str] = None if isinstance(lpips_folder_naming_index, str) and lpips_folder_naming_index == "noise": lpips_folder_name_part_str = "noise" elif isinstance(lpips_folder_naming_index, int): lpips_folder_name_part_str = f"{lpips_folder_naming_index:03d}" if lpips_folder_name_part_str is not None: # Determine prefix based on whether the item was originally in a CCIP cluster if ccip_cluster_id_for_lpips_logic is not None: # LPIPS is sub-cluster if item had an original CCIP ID lpips_folder_name_base = "lpips_sub_" else: # No CCIP, LPIPS is primary lpips_folder_name_base = "lpips_" path_parts.append(f"{lpips_folder_name_base}{lpips_folder_name_part_str}") final_filename_part = filename_with_extension if aesthetic_label: final_filename_part = f"{aesthetic_label}_{filename_with_extension}" if path_parts: return f"{'/'.join(path_parts)}/{final_filename_part}" else: return final_filename_part # --- Core Processing Function for a single source (video or image sequence) --- def _process_input_source_frames( source_file_prefix: str, # Sanitized name for this source (e.g., "myvideo" or "ImageGroup123") # Iterator yielding: (PIL.Image, frame_identifier_string, current_item_index, total_items_for_desc) # For videos, current_item_index is the 1-based raw frame number. # For images, current_item_index is the 1-based image number in the sequence. frames_provider: Iterator[Tuple[Image.Image, int, int, int]], is_video_source: bool, # To adjust some logging/stats messages # Person Detection enable_person_detection: bool, min_target_width_person_percentage: float, person_model_name: str, person_conf_threshold: float, person_iou_threshold: float, # Half-Body Detection enable_halfbody_detection: bool, enable_halfbody_cropping: bool, min_target_width_halfbody_percentage: float, halfbody_model_name: str, halfbody_conf_threshold: float, halfbody_iou_threshold: float, # Head Detection enable_head_detection: bool, enable_head_cropping: bool, min_crop_width_head_percentage: float, enable_head_filtering: bool, head_model_name: str, head_conf_threshold: float, head_iou_threshold: float, # Face Detection enable_face_detection: bool, enable_face_cropping: bool, min_crop_width_face_percentage: float, enable_face_filtering: bool, face_model_name: str, face_conf_threshold: float, face_iou_threshold: float, # CCIP Classification enable_ccip_classification: bool, ccip_model_name: str, ccip_threshold: float, # LPIPS Clustering enable_lpips_clustering: bool, lpips_threshold: float, # Aesthetic Analysis enable_aesthetic_analysis: bool, aesthetic_model_name: str, # Gradio Progress (specific to this source's processing) progress_updater # Function: (progress_value: float, desc: str) -> None ) -> Tuple[str | None, str]: """ Processes frames from a given source (video or image sequence) according to the specified parameters. Order: Person => Half-Body (alternative) => Face Detection => Head Detection => CCIP => Aesthetic. Returns: A tuple containing: - Path to the output zip file (or None if error). - Status message string. """ # This list will hold data for images that pass all filters, BEFORE LPIPS and final zipping images_pending_final_processing: List[Dict[str, Any]] = [] # CCIP specific data ccip_clusters_info: List[Tuple[int, np.ndarray]] = [] next_ccip_cluster_id = 0 # Stats processed_items_count = 0 total_persons_detected_raw, total_halfbodies_detected_raw = 0, 0 person_targets_processed_count, halfbody_targets_processed_count, fullframe_targets_processed_count = 0, 0, 0 total_faces_detected_on_targets, total_heads_detected_on_targets = 0, 0 # These count items added to images_pending_final_processing main_targets_pending_count, face_crops_pending_count, head_crops_pending_count = 0, 0, 0 items_filtered_by_face_count, items_filtered_by_head_count = 0, 0 ccip_applied_count, aesthetic_applied_count = 0, 0 # LPIPS stats lpips_images_subject_to_clustering, total_lpips_clusters_created, total_lpips_noise_samples = 0, 0, 0 gc_interval = 100 # items from provider start_time = time.time() # Progress update for initializing this specific video progress_updater(0, desc=f"Initializing {source_file_prefix}...") output_zip_path_temp = None output_zip_path_final = None try: # --- Main Loop for processing items from the frames_provider --- for pil_image_full_frame, frame_specific_index, current_item_index, total_items_for_desc in frames_provider: progress_value_for_updater = (current_item_index) / total_items_for_desc if total_items_for_desc > 0 else 1.0 # The description string should reflect what current_item_index means item_description = "" if is_video_source: # For video, total_items_in_source_for_description is total raw frames. # current_item_index is the raw frame index of the *sampled* frame. # We also need a counter for *sampled* frames for a "processed X of Y (sampled)" message. # processed_items_count counts sampled frames. item_description = f"Scanning frame {current_item_index}/{total_items_for_desc} (processed {processed_items_count + 1} sampled)" else: # For images item_description = f"image {current_item_index}/{total_items_for_desc}" progress_updater( min(progress_value_for_updater, 1.0), # Cap progress at 1.0 desc=f"Processing {item_description} for {source_file_prefix}" ) # processed_items_count still counts how many items are yielded by the provider # (i.e., how many sampled frames for video, or how many images for image sequence) processed_items_count += 1 try: full_frame_width = pil_image_full_frame.width # Store for percentage calculations print(f"--- Processing item ID {frame_specific_index} (Width: {full_frame_width}px) for {source_file_prefix} ---") # List to hold PIL images that are the primary subjects for this frame # Each element: {'pil': Image, 'base_name': str, 'source_type': 'person'/'halfbody'/'fullframe'} primary_targets_for_frame: List[Dict[str, Any]] = [] processed_primary_source_this_frame = False # Flag if Person or HalfBody yielded targets # --- 1. Person Detection --- if enable_person_detection and full_frame_width > 0: print(" Attempting Person Detection...") min_person_target_px_width = full_frame_width * min_target_width_person_percentage person_detections = person_detector.detect_person( pil_image_full_frame, model_name=person_model_name, conf_threshold=person_conf_threshold, iou_threshold=person_iou_threshold ) total_persons_detected_raw += len(person_detections) if person_detections: print(f" Detected {len(person_detections)} raw persons.") valid_person_targets = 0 for i, (bbox, _, score) in enumerate(person_detections): # Check width before full crop for minor optimization detected_person_width = bbox[2] - bbox[0] if detected_person_width >= min_person_target_px_width: primary_targets_for_frame.append({ 'pil': pil_image_full_frame.crop(bbox), 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_person_{i}_score{int(score*100)}", 'source_type': 'person'}) person_targets_processed_count +=1 valid_person_targets +=1 else: print(f" Person {i} width {detected_person_width}px < min {min_person_target_px_width:.0f}px. Skipping.") if valid_person_targets > 0: processed_primary_source_this_frame = True print(f" Added {valid_person_targets} persons as primary targets.") # --- 2. Half-Body Detection (if Person not processed and HBD enabled) --- if not processed_primary_source_this_frame and enable_halfbody_detection and full_frame_width > 0: print(" Attempting Half-Body Detection (on full item)...") min_halfbody_target_px_width = full_frame_width * min_target_width_halfbody_percentage halfbody_detections = halfbody_detector.detect_halfbody( pil_image_full_frame, model_name=halfbody_model_name, conf_threshold=halfbody_conf_threshold, iou_threshold=halfbody_iou_threshold ) total_halfbodies_detected_raw += len(halfbody_detections) if halfbody_detections: print(f" Detected {len(halfbody_detections)} raw half-bodies.") valid_halfbody_targets = 0 for i, (bbox, _, score) in enumerate(halfbody_detections): detected_hb_width = bbox[2] - bbox[0] # Cropping must be enabled and width must be sufficient for it to be a target if enable_halfbody_cropping and detected_hb_width >= min_halfbody_target_px_width: primary_targets_for_frame.append({ 'pil': pil_image_full_frame.crop(bbox), 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_halfbody_{i}_score{int(score*100)}", 'source_type': 'halfbody'}) halfbody_targets_processed_count +=1 valid_halfbody_targets +=1 elif enable_halfbody_cropping: print(f" Half-body {i} width {detected_hb_width}px < min {min_halfbody_target_px_width:.0f}px. Skipping.") if valid_halfbody_targets > 0: processed_primary_source_this_frame = True print(f" Added {valid_halfbody_targets} half-bodies as primary targets.") # --- 3. Full Frame/Image (fallback) --- if not processed_primary_source_this_frame: print(" Processing Full Item as primary target.") primary_targets_for_frame.append({ 'pil': pil_image_full_frame.copy(), 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_full", 'source_type': 'fullframe'}) fullframe_targets_processed_count += 1 # --- Process each identified primary_target_for_frame --- for target_data in primary_targets_for_frame: current_pil: Image.Image = target_data['pil'] current_base_name: str = target_data['base_name'] # Base name for this main target current_source_type: str = target_data['source_type'] current_pil_width = current_pil.width # For sub-crop percentage calculations print(f" Processing target: {current_base_name} (type: {current_source_type}, width: {current_pil_width}px)") # Store PILs of successful crops from current_pil for this target keep_this_target = True item_area = current_pil_width * current_pil.height potential_face_crops_pil: List[Image.Image] = [] potential_head_crops_pil: List[Image.Image] = [] # --- A. Face Detection --- if keep_this_target and enable_face_detection and current_pil_width > 0: print(f" Detecting faces in {current_base_name}...") min_face_crop_px_width = current_pil_width * min_crop_width_face_percentage face_detections = face_detector.detect_faces( current_pil, model_name=face_model_name, conf_threshold=face_conf_threshold, iou_threshold=face_iou_threshold ) total_faces_detected_on_targets += len(face_detections) if not face_detections and enable_face_filtering: keep_this_target = False items_filtered_by_face_count += 1 print(f" FILTERING TARGET {current_base_name} (no face).") elif face_detections and enable_face_cropping: for f_idx, (f_bbox, _, _) in enumerate(face_detections): if (f_bbox[2]-f_bbox[0]) >= min_face_crop_px_width: potential_face_crops_pil.append(current_pil.crop(f_bbox)) else: print(f" Face {f_idx} too small. Skipping crop.") # --- B. Head Detection --- if keep_this_target and enable_head_detection and current_pil_width > 0: print(f" Detecting heads in {current_base_name}...") min_head_crop_px_width = current_pil_width * min_crop_width_head_percentage head_detections = head_detector.detect_heads( current_pil, model_name=head_model_name, conf_threshold=head_conf_threshold, iou_threshold=head_iou_threshold ) total_heads_detected_on_targets += len(head_detections) if not head_detections and enable_head_filtering: keep_this_target = False items_filtered_by_head_count += 1 print(f" FILTERING TARGET {current_base_name} (no head).") potential_face_crops_pil.clear() # Clear faces if head filter removed target elif head_detections and enable_head_cropping: for h_idx, (h_bbox, _, _) in enumerate(head_detections): h_w = h_bbox[2]-h_bbox[0] # h_h = h_bbox[3]-h_bbox[1] if h_w >= min_head_crop_px_width and item_area > 0: potential_head_crops_pil.append(current_pil.crop(h_bbox)) else: print(f" Head {h_idx} too small or too large relative to parent. Skipping crop.") # --- If target is filtered, clean up and skip to next target --- if not keep_this_target: print(f" Target {current_base_name} was filtered by face/head presence rules. Discarding it and its potential crops.") if current_pil is not None: del current_pil potential_face_crops_pil.clear() potential_head_crops_pil.clear() continue # To the next primary_target_for_frame # --- C. CCIP Classification (on current_pil, if it's kept) --- assigned_ccip_id = None # This is the original CCIP ID if enable_ccip_classification: print(f" Classifying {current_base_name} with CCIP...") try: feature = ccip_analyzer.ccip_extract_feature(current_pil, model=ccip_model_name) best_match_cid = None min_diff = float('inf') # Find the best potential match among existing clusters if ccip_clusters_info: # Only loop if there are clusters to compare against for cid, rep_f in ccip_clusters_info: diff = ccip_analyzer.ccip_difference(feature, rep_f, model=ccip_model_name) if diff < min_diff: min_diff = diff best_match_cid = cid # Decide whether to use the best match or create a new cluster if best_match_cid is not None and min_diff < ccip_threshold: assigned_ccip_id = best_match_cid print(f" -> Matched Cluster {assigned_ccip_id} (Diff: {min_diff:.6f} <= Threshold {ccip_threshold:.3f})") else: # No suitable match found (either no clusters existed, or the best match's diff was strictly greater than threshold) # Create a new cluster assigned_ccip_id = next_ccip_cluster_id ccip_clusters_info.append((assigned_ccip_id, feature)) if not ccip_clusters_info or len(ccip_clusters_info) == 1: print(f" -> New Cluster {assigned_ccip_id} (First item or no prior suitable clusters)") else: # MODIFIED: Log message reflecting that new cluster is formed if diff > threshold print(f" -> New Cluster {assigned_ccip_id} (Min diff to others: {min_diff:.6f} > Threshold {ccip_threshold:.3f})") next_ccip_cluster_id += 1 print(f" CCIP: Target {current_base_name} -> Original Cluster ID {assigned_ccip_id}") del feature ccip_applied_count += 1 except Exception as e_ccip: print(f" Error CCIP: {e_ccip}") # --- D. Aesthetic Analysis (on current_pil, if it's kept) --- item_aesthetic_label = None if enable_aesthetic_analysis: print(f" Analyzing {current_base_name} for aesthetics...") try: res = dbaesthetic_analyzer.anime_dbaesthetic(current_pil, model_name=aesthetic_model_name) if isinstance(res, tuple) and len(res) >= 1: item_aesthetic_label = res[0] print(f" Aesthetic: Target {current_base_name} -> {item_aesthetic_label}") aesthetic_applied_count += 1 except Exception as e_aes: print(f" Error Aesthetic: {e_aes}") add_current_pil_to_pending_list = True if current_source_type == 'fullframe': can_skip_fullframe_target = False if enable_face_detection or enable_head_detection: found_valid_sub_crop_from_enabled_detector = False if enable_face_detection and len(potential_face_crops_pil) > 0: found_valid_sub_crop_from_enabled_detector = True if not found_valid_sub_crop_from_enabled_detector and \ enable_head_detection and len(potential_head_crops_pil) > 0: found_valid_sub_crop_from_enabled_detector = True if not found_valid_sub_crop_from_enabled_detector: # No valid crops from any enabled sub-detector can_skip_fullframe_target = True # All enabled sub-detectors failed if can_skip_fullframe_target: add_current_pil_to_pending_list = False print(f" Skipping save of fullframe target '{current_base_name}' because all enabled sub-detectors (Face/Head) yielded no valid-width crops.") if add_current_pil_to_pending_list: # --- E. Save current_pil (if it passed all filters) --- # Add main target to pending list images_pending_final_processing.append({ 'pil_image': current_pil.copy(), 'base_name_for_filename': current_base_name, 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, 'is_halfbody_primary_target_type': (current_source_type == 'halfbody'), 'is_derived_head_crop': False, 'is_derived_face_crop': False, 'lpips_cluster_id': None, # Will be filled by LPIPS clustering 'lpips_folder_naming_index': None # Will be filled by LPIPS renaming }) main_targets_pending_count +=1 # --- F. Save Face Crops (derived from current_pil) --- for i, fc_pil in enumerate(potential_face_crops_pil): images_pending_final_processing.append({ 'pil_image': fc_pil, 'base_name_for_filename': f"{current_base_name}_face{i}", 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, 'is_halfbody_primary_target_type': False, 'is_derived_head_crop': False, 'is_derived_face_crop': True, 'lpips_cluster_id': None, 'lpips_folder_naming_index': None }) face_crops_pending_count+=1 potential_face_crops_pil.clear() # --- G. Save Head Crops (derived from current_pil) --- for i, hc_pil in enumerate(potential_head_crops_pil): images_pending_final_processing.append({ 'pil_image': hc_pil, 'base_name_for_filename': f"{current_base_name}_head{i}", 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, 'is_halfbody_primary_target_type': False, 'is_derived_head_crop': True, 'is_derived_face_crop': False, 'lpips_cluster_id': None, 'lpips_folder_naming_index': None }) head_crops_pending_count+=1 potential_head_crops_pil.clear() if current_pil is not None: # Ensure current_pil exists before attempting to delete del current_pil # Clean up the PIL for this target_data primary_targets_for_frame.clear() except Exception as item_proc_err: print(f"!! Major Error processing item ID {frame_specific_index} for {source_file_prefix}: {item_proc_err}") traceback.print_exc() # Cleanup local vars for this item if error if 'primary_targets_for_frame' in locals(): primary_targets_for_frame.clear() # Also ensure current_pil from inner loop is cleaned up if error happened mid-loop if 'current_pil' in locals() and current_pil is not None: del current_pil if processed_items_count % gc_interval == 0: gc.collect() print(f" [GC triggered at {processed_items_count} items for {source_file_prefix}]") # --- End of Main Item Processing Loop --- print(f"\nRunning final GC before LPIPS/Zipping for {source_file_prefix}...") gc.collect() if not images_pending_final_processing: status_message = f"Processing for {source_file_prefix} finished, but no images were generated or passed filters for LPIPS/Zipping." print(status_message) return None, status_message # --- LPIPS Clustering Stage --- print(f"\n--- LPIPS Clustering Stage for {source_file_prefix} (Images pending: {len(images_pending_final_processing)}) ---") if enable_lpips_clustering: print(f" LPIPS Clustering enabled with threshold: {lpips_threshold}") lpips_images_subject_to_clustering = len(images_pending_final_processing) if enable_ccip_classification and next_ccip_cluster_id > 0: # CCIP was used print(" LPIPS clustering within CCIP clusters.") images_by_ccip: Dict[Optional[int], List[int]] = {} # ccip_id -> list of original indices for i, item_data in enumerate(images_pending_final_processing): ccip_id = item_data['ccip_cluster_id'] # Original CCIP ID if ccip_id not in images_by_ccip: images_by_ccip[ccip_id] = [] images_by_ccip[ccip_id].append(i) for ccip_id, indices_in_ccip_cluster in images_by_ccip.items(): pils_for_lpips_sub_cluster = [images_pending_final_processing[idx]['pil_image'] for idx in indices_in_ccip_cluster] if len(pils_for_lpips_sub_cluster) > 1: print(f" Clustering {len(pils_for_lpips_sub_cluster)} images in CCIP cluster {ccip_id}...") try: lpips_sub_ids = lpips_module.lpips_clustering(pils_for_lpips_sub_cluster, threshold=lpips_threshold) for i_sub, lpips_id in enumerate(lpips_sub_ids): original_idx = indices_in_ccip_cluster[i_sub] images_pending_final_processing[original_idx]['lpips_cluster_id'] = lpips_id except Exception as e_lpips_sub: print(f" Error LPIPS sub-cluster CCIP {ccip_id}: {e_lpips_sub}") elif len(pils_for_lpips_sub_cluster) == 1: images_pending_final_processing[indices_in_ccip_cluster[0]]['lpips_cluster_id'] = 0 # type: ignore del images_by_ccip if 'pils_for_lpips_sub_cluster' in locals(): del pils_for_lpips_sub_cluster # Ensure cleanup else: # LPIPS on all images globally print(" LPIPS clustering on all collected images.") all_pils_for_global_lpips = [item['pil_image'] for item in images_pending_final_processing] if len(all_pils_for_global_lpips) > 1: try: lpips_global_ids = lpips_module.lpips_clustering(all_pils_for_global_lpips, threshold=lpips_threshold) for i, lpips_id in enumerate(lpips_global_ids): images_pending_final_processing[i]['lpips_cluster_id'] = lpips_id except Exception as e_lpips_global: print(f" Error LPIPS global: {e_lpips_global}") elif len(all_pils_for_global_lpips) == 1: images_pending_final_processing[0]['lpips_cluster_id'] = 0 # type: ignore del all_pils_for_global_lpips # Calculate LPIPS stats all_final_lpips_ids = [item.get('lpips_cluster_id') for item in images_pending_final_processing if item.get('lpips_cluster_id') is not None] if all_final_lpips_ids: unique_lpips_clusters = set(filter(lambda x: x != -1, all_final_lpips_ids)) total_lpips_clusters_created = len(unique_lpips_clusters) total_lpips_noise_samples = sum(1 for x in all_final_lpips_ids if x == -1) else: print(" LPIPS Clustering disabled.") # --- CCIP Folder Renaming Logic --- original_ccip_id_to_new_naming_index: Dict[int, int] = {} if enable_ccip_classification: print(f" Preparing CCIP folder renaming for {source_file_prefix}...") ccip_image_counts: Dict[int, int] = {} # original_ccip_id -> count of images in it for item_data_for_count in images_pending_final_processing: original_ccip_id_val = item_data_for_count.get('ccip_cluster_id') if original_ccip_id_val is not None: ccip_image_counts[original_ccip_id_val] = ccip_image_counts.get(original_ccip_id_val, 0) + 1 if ccip_image_counts: # Sort original ccip_ids by their counts in descending order sorted_ccip_groups_by_count: List[Tuple[int, int]] = sorted( ccip_image_counts.items(), key=lambda item: item[1], # Sort by count reverse=True ) for new_idx, (original_id, count) in enumerate(sorted_ccip_groups_by_count): original_ccip_id_to_new_naming_index[original_id] = new_idx print(f" CCIP Remap for {source_file_prefix}: Original ID {original_id} (count: {count}) -> New Naming Index {new_idx:03d}") else: print(f" No CCIP-assigned images found for {source_file_prefix} to perform renaming.") # --- LPIPS Folder Renaming Logic --- if enable_lpips_clustering: print(f" Preparing LPIPS folder renaming for {source_file_prefix}...") # Initialize/Reset lpips_folder_naming_index for all items for item_data in images_pending_final_processing: item_data['lpips_folder_naming_index'] = None if enable_ccip_classification and next_ccip_cluster_id > 0: # LPIPS within CCIP print(f" LPIPS renaming within CCIP clusters for {source_file_prefix}.") items_grouped_by_original_ccip: Dict[Optional[int], List[Dict[str, Any]]] = {} for item_data in images_pending_final_processing: original_ccip_id = item_data.get('ccip_cluster_id') if original_ccip_id not in items_grouped_by_original_ccip: items_grouped_by_original_ccip[original_ccip_id] = [] items_grouped_by_original_ccip[original_ccip_id].append(item_data) for original_ccip_id, items_in_ccip in items_grouped_by_original_ccip.items(): lpips_counts_in_ccip: Dict[int, int] = {} # original_lpips_id (non-noise) -> count for item_data in items_in_ccip: lpips_id = item_data.get('lpips_cluster_id') if lpips_id is not None and lpips_id != -1: lpips_counts_in_ccip[lpips_id] = lpips_counts_in_ccip.get(lpips_id, 0) + 1 lpips_id_to_naming_in_ccip: Dict[int, Union[int, str]] = {} if lpips_counts_in_ccip: sorted_lpips = sorted(lpips_counts_in_ccip.items(), key=lambda x: x[1], reverse=True) for new_idx, (lpips_id, count) in enumerate(sorted_lpips): lpips_id_to_naming_in_ccip[lpips_id] = new_idx ccip_disp = f"OrigCCIP-{original_ccip_id}" if original_ccip_id is not None else "NoCCIP" print(f" LPIPS Remap in {ccip_disp}: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") for item_data in items_in_ccip: lpips_id = item_data.get('lpips_cluster_id') if lpips_id is not None: if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" elif lpips_id in lpips_id_to_naming_in_ccip: item_data['lpips_folder_naming_index'] = lpips_id_to_naming_in_ccip[lpips_id] del items_grouped_by_original_ccip else: # Global LPIPS print(f" Global LPIPS renaming for {source_file_prefix}.") global_lpips_counts: Dict[int, int] = {} for item_data in images_pending_final_processing: lpips_id = item_data.get('lpips_cluster_id') if lpips_id is not None and lpips_id != -1: global_lpips_counts[lpips_id] = global_lpips_counts.get(lpips_id, 0) + 1 global_lpips_id_to_naming: Dict[int, Union[int, str]] = {} if global_lpips_counts: sorted_global_lpips = sorted(global_lpips_counts.items(), key=lambda x: x[1], reverse=True) for new_idx, (lpips_id, count) in enumerate(sorted_global_lpips): global_lpips_id_to_naming[lpips_id] = new_idx print(f" Global LPIPS Remap: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") for item_data in images_pending_final_processing: lpips_id = item_data.get('lpips_cluster_id') if lpips_id is not None: if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" elif lpips_id in global_lpips_id_to_naming: item_data['lpips_folder_naming_index'] = global_lpips_id_to_naming[lpips_id] gc.collect() # --- Final Zipping Stage --- images_to_zip: Dict[str, bytes] = {} print(f"\n--- Final Zipping Stage for {source_file_prefix} ({len(images_pending_final_processing)} items) ---") for item_data in images_pending_final_processing: original_ccip_id_for_item = item_data.get('ccip_cluster_id') current_ccip_naming_idx_for_folder: Optional[int] = None if enable_ccip_classification and original_ccip_id_for_item is not None and \ original_ccip_id_for_item in original_ccip_id_to_new_naming_index: current_ccip_naming_idx_for_folder = original_ccip_id_to_new_naming_index[original_ccip_id_for_item] current_lpips_naming_idx_for_folder = item_data.get('lpips_folder_naming_index') final_filename = generate_filename( base_name=item_data['base_name_for_filename'], aesthetic_label=item_data.get('aesthetic_label'), ccip_cluster_id_for_lpips_logic=original_ccip_id_for_item, ccip_folder_naming_index=current_ccip_naming_idx_for_folder, source_prefix_for_ccip_folder=source_file_prefix if current_ccip_naming_idx_for_folder is not None else None, lpips_folder_naming_index=current_lpips_naming_idx_for_folder, is_halfbody_primary_target_type=item_data['is_halfbody_primary_target_type'], is_derived_head_crop=item_data['is_derived_head_crop'], is_derived_face_crop=item_data['is_derived_face_crop'] ) try: images_to_zip[final_filename] = image_to_bytes(item_data['pil_image']) except Exception as e_bytes: print(f" Error converting/adding {final_filename} to zip: {e_bytes}") finally: if 'pil_image' in item_data and item_data['pil_image'] is not None: del item_data['pil_image'] images_pending_final_processing.clear() if not images_to_zip: status_message = f"Processing for {source_file_prefix} finished, but no images were converted for zipping." print(status_message) return None, status_message print(f"Preparing zip file for {source_file_prefix} with {len(images_to_zip)} images...") progress_updater(1.0, desc=f"Creating Zip File for {source_file_prefix}...") zip_start_time = time.time() # Use NamedTemporaryFile with delete=False for the final output path # This file will persist until manually cleaned or OS cleanup temp_zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") output_zip_path_temp = temp_zip_file.name temp_zip_file.close() # Close the handle, but file remains try: # Write data to the temporary file path create_zip_file(images_to_zip, output_zip_path_temp) zip_duration = time.time() - zip_start_time print(f"Temporary zip file for {source_file_prefix} created in {zip_duration:.2f} seconds at {output_zip_path_temp}") # Construct the new, desired filename temp_dir = os.path.dirname(output_zip_path_temp) timestamp = int(time.time()) desired_filename = f"{source_file_prefix}_processed_{timestamp}.zip" output_zip_path_final = os.path.join(temp_dir, desired_filename) # Rename the temporary file to the desired name print(f"Renaming temp file for {source_file_prefix} to: {output_zip_path_final}") os.rename(output_zip_path_temp, output_zip_path_final) print("Rename successful.") output_zip_path_temp = None # Clear temp path as it's been renamed except Exception as zip_or_rename_err: print(f"Error during zip creation or renaming for {source_file_prefix}: {zip_or_rename_err}") # Clean up the *original* temp file if it still exists and renaming failed if output_zip_path_temp and os.path.exists(output_zip_path_temp): try: os.remove(output_zip_path_temp) except OSError: pass if output_zip_path_final and os.path.exists(output_zip_path_final): # Check if rename partially happened try: os.remove(output_zip_path_final) except OSError: pass raise zip_or_rename_err # Re-raise the error # --- Prepare Status Message --- processing_duration = time.time() - start_time - zip_duration # Exclude zipping time from processing time total_duration = time.time() - start_time # Includes zipping/renaming # --- Build final status message --- person_stats = "N/A" if enable_person_detection: person_stats = f"{total_persons_detected_raw} raw, {person_targets_processed_count} targets (>{min_target_width_person_percentage*100:.1f}% itemW)" halfbody_stats = "N/A" if enable_halfbody_detection: halfbody_stats = f"{total_halfbodies_detected_raw} raw, {halfbody_targets_processed_count} targets (>{min_target_width_halfbody_percentage*100:.1f}% itemW)" fullframe_stats = f"{fullframe_targets_processed_count} targets" face_stats = "N/A" if enable_face_detection: face_stats = f"{total_faces_detected_on_targets} on targets, {face_crops_pending_count} crops pending (>{min_crop_width_face_percentage*100:.1f}% parentW)" if enable_face_filtering: face_stats += f", {items_filtered_by_face_count} targets filtered" head_stats = "N/A" if enable_head_detection: head_stats = f"{total_heads_detected_on_targets} on targets, {head_crops_pending_count} crops pending (>{min_crop_width_head_percentage*100:.1f}% parentW)" if enable_head_filtering: head_stats += f", {items_filtered_by_head_count} targets filtered" ccip_stats = "N/A" if enable_ccip_classification: ccip_stats = f"{next_ccip_cluster_id} original clusters created, on {ccip_applied_count} targets. Folders renamed by image count." lpips_stats = "N/A" if enable_lpips_clustering: lpips_stats = f"{lpips_images_subject_to_clustering} images processed, {total_lpips_clusters_created} clusters, {total_lpips_noise_samples} noise. Folders renamed by image count." aesthetic_stats = "N/A" if enable_aesthetic_analysis: aesthetic_stats = f"On {aesthetic_applied_count} targets" item_desc_for_stats = "Items from Provider" if not is_video_source else "Sampled Frames" status_message = ( f"Processing for '{source_file_prefix}' Complete!\n" f"Total time: {total_duration:.2f}s (Proc: {processing_duration:.2f}s, Zip: {zip_duration:.2f}s)\n" f"{item_desc_for_stats}: {total_items_for_desc}, Processed Items: {processed_items_count}\n" f"--- Primary Targets Processed ---\n" f" Person Detection: {person_stats}\n" f" Half-Body Detection: {halfbody_stats}\n" f" Full Item Processing: {fullframe_stats}\n" f"--- Items Pending Final Processing ({main_targets_pending_count} main, {face_crops_pending_count} face, {head_crops_pending_count} head) ---\n" f" Face Detection: {face_stats}\n" f" Head Detection: {head_stats}\n" f" CCIP Classification: {ccip_stats}\n" f" LPIPS Clustering: {lpips_stats}\n" f" Aesthetic Analysis: {aesthetic_stats}\n" f"Zip file contains {len(images_to_zip)} images.\n" f"Output Zip: {output_zip_path_final}" ) print(status_message) progress_updater(1.0, desc=f"Finished {source_file_prefix}!") # Return the path to the zip file return output_zip_path_final, status_message except Exception as e: print(f"!! An unhandled error occurred during processing of {source_file_prefix}: {e}") traceback.print_exc() # Print detailed traceback for debugging # Clean up main data structures images_pending_final_processing.clear() ccip_clusters_info.clear() gc.collect() # Clean up temp file if it exists on general error if output_zip_path_temp and os.path.exists(output_zip_path_temp): try: os.remove(output_zip_path_temp) except OSError: pass # Clean up final file if it exists on general error (maybe renaming succeeded but later code failed) if output_zip_path_final and os.path.exists(output_zip_path_final): try: os.remove(output_zip_path_final) except OSError: pass return None, f"An error occurred with {source_file_prefix}: {e}" # --- Main Processing Function for Input files --- def process_inputs_main( input_file_objects: List[Any], # Gradio File component gives list of tempfile._TemporaryFileWrapper sample_interval_ms: int, # Relevant for videos only # Person Detection enable_person_detection: bool, min_target_width_person_percentage: float, person_model_name: str, person_conf_threshold: float, person_iou_threshold: float, # Half-Body Detection enable_halfbody_detection: bool, enable_halfbody_cropping: bool, min_target_width_halfbody_percentage: float, halfbody_model_name: str, halfbody_conf_threshold: float, halfbody_iou_threshold: float, # Head Detection enable_head_detection: bool, enable_head_cropping: bool, min_crop_width_head_percentage: float, enable_head_filtering: bool, head_model_name: str, head_conf_threshold: float, head_iou_threshold: float, # Face Detection enable_face_detection: bool, enable_face_cropping: bool, min_crop_width_face_percentage: float, enable_face_filtering: bool, face_model_name: str, face_conf_threshold: float, face_iou_threshold: float, # CCIP Classification enable_ccip_classification: bool, ccip_model_name: str, ccip_threshold: float, # LPIPS Clustering enable_lpips_clustering: bool, lpips_threshold: float, # Aesthetic Analysis enable_aesthetic_analysis: bool, aesthetic_model_name: str, progress=gr.Progress(track_tqdm=True) # Gradio progress for overall processing ) -> Tuple[Optional[List[str]], str]: # Returns list of ZIP paths and combined status if not input_file_objects: return [], "Error: No files provided." video_file_temp_objects: List[Any] = [] image_file_temp_objects: List[Any] = [] for file_obj in input_file_objects: # gr.Files returns a list of tempfile._TemporaryFileWrapper objects # We need the .name attribute to get the actual file path file_name = getattr(file_obj, 'orig_name', file_obj.name) # Use original name if available if isinstance(file_name, str): lower_file_name = file_name.lower() if any(lower_file_name.endswith(ext) for ext in VIDEO_EXTENSIONS): video_file_temp_objects.append(file_obj) elif any(lower_file_name.endswith(ext) for ext in IMAGE_EXTENSIONS): image_file_temp_objects.append(file_obj) else: print(f"Warning: File '{file_name}' has an unrecognized extension and will be skipped.") else: print(f"Warning: File object {file_obj} does not have a valid name and will be skipped.") output_zip_paths_all_sources = [] all_status_messages = [] total_processing_tasks = (1 if image_file_temp_objects else 0) + len(video_file_temp_objects) if total_processing_tasks == 0: return [], "No processable video or image files found in the input." tasks_completed_count = 0 # Print overall settings once print(f"--- Overall Batch Processing Settings ---") print(f" Number of image sequences to process: {1 if image_file_temp_objects else 0}") print(f" Number of videos to process: {len(video_file_temp_objects)}") print(f" Sample Interval (for videos): {sample_interval_ms}ms") print(f" Detection Order: Person => Half-Body (alt) => Face => Head. Then: CCIP => LPIPS => Aesthetic.") print(f" Person Detect = {enable_person_detection}" + (f" (MinW:{min_target_width_person_percentage*100:.1f}%, Mdl:{person_model_name}, Conf:{person_conf_threshold:.2f}, IoU:{person_iou_threshold:.2f})" if enable_person_detection else "")) print(f" HalfBody Detect = {enable_halfbody_detection}" + (f" (FullFrameOnly, Crop:{enable_halfbody_cropping}, MinW:{min_target_width_halfbody_percentage*100:.1f}%, Mdl:{halfbody_model_name}, Conf:{halfbody_conf_threshold:.2f}, IoU:{halfbody_iou_threshold:.2f})" if enable_halfbody_detection else "")) print(f" Face Detect = {enable_face_detection}" + (f" (Crop:{enable_face_cropping}, MinW:{min_crop_width_face_percentage*100:.1f}%, Filter:{enable_face_filtering}, Mdl:{face_model_name}, Conf:{face_conf_threshold:.2f}, IoU:{face_iou_threshold:.2f})" if enable_face_detection else "")) print(f" Head Detect = {enable_head_detection}" + (f" (Crop:{enable_head_cropping}, MinW:{min_crop_width_head_percentage*100:.1f}%, Filter:{enable_head_filtering}, Mdl:{head_model_name}, Conf:{head_conf_threshold:.2f}, IoU:{head_iou_threshold:.2f})" if enable_head_detection else "")) print(f" CCIP Classify = {enable_ccip_classification}" + (f" (Mdl:{ccip_model_name}, Thr:{ccip_threshold:.3f})" if enable_ccip_classification else "")) print(f" LPIPS Clustering = {enable_lpips_clustering}" + (f" (Thr:{lpips_threshold:.3f})" if enable_lpips_clustering else "")) print(f" Aesthetic Analyze = {enable_aesthetic_analysis}" + (f" (Mdl:{aesthetic_model_name})" if enable_aesthetic_analysis else "")) print(f"--- End of Overall Settings ---") # --- Process Image Sequence (if any) --- if image_file_temp_objects: image_group_label_base = "ImageGroup" # Attempt to use first image name for more uniqueness, fallback to timestamp try: first_image_orig_name = getattr(image_file_temp_objects[0], 'orig_name', image_file_temp_objects[0].name) image_group_label_base = sanitize_filename(first_image_orig_name, max_len=20) except: pass # Stick with "ImageGroup" image_source_file_prefix = f"{image_group_label_base}_{int(time.time())}" current_task_number = tasks_completed_count + 1 progress_description_prefix = f"Image Seq. {current_task_number}/{total_processing_tasks} ({image_source_file_prefix})" progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting...") print(f"\n>>> Processing Image Sequence: {image_source_file_prefix} ({len(image_file_temp_objects)} images) <<<") def image_frames_provider_generator() -> Iterator[Tuple[Image.Image, int, int, int]]: num_images = len(image_file_temp_objects) for idx, img_obj in enumerate(image_file_temp_objects): try: pil_img = Image.open(img_obj.name).convert('RGB') yield pil_img, idx, idx + 1, num_images except Exception as e_load: print(f"Error loading image {getattr(img_obj, 'orig_name', img_obj.name)}: {e_load}. Skipping.") # If we skip, the total_items_in_source for _process_input_source_frames might be off # For simplicity, we'll proceed, but this could be refined to adjust total_items dynamically. # Or, pre-filter loadable images. For now, just skip. continue def image_group_progress_updater(item_progress_value: float, desc: str): overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") try: zip_file_path_single, status_message_single = _process_input_source_frames( source_file_prefix=image_source_file_prefix, frames_provider=image_frames_provider_generator(), is_video_source=False, enable_person_detection=enable_person_detection, min_target_width_person_percentage=min_target_width_person_percentage, person_model_name=person_model_name, person_conf_threshold=person_conf_threshold, person_iou_threshold=person_iou_threshold, enable_halfbody_detection=enable_halfbody_detection, enable_halfbody_cropping=enable_halfbody_cropping, min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, halfbody_model_name=halfbody_model_name, halfbody_conf_threshold=halfbody_conf_threshold, halfbody_iou_threshold=halfbody_iou_threshold, enable_head_detection=enable_head_detection, enable_head_cropping=enable_head_cropping, min_crop_width_head_percentage=min_crop_width_head_percentage, enable_head_filtering=enable_head_filtering, head_model_name=head_model_name, head_conf_threshold=head_conf_threshold, head_iou_threshold=head_iou_threshold, enable_face_detection=enable_face_detection, enable_face_cropping=enable_face_cropping, min_crop_width_face_percentage=min_crop_width_face_percentage, enable_face_filtering=enable_face_filtering, face_model_name=face_model_name, face_conf_threshold=face_conf_threshold, face_iou_threshold=face_iou_threshold, enable_ccip_classification=enable_ccip_classification, ccip_model_name=ccip_model_name, ccip_threshold=ccip_threshold, enable_lpips_clustering=enable_lpips_clustering, lpips_threshold=lpips_threshold, enable_aesthetic_analysis=enable_aesthetic_analysis, aesthetic_model_name=aesthetic_model_name, progress_updater=image_group_progress_updater ) if zip_file_path_single: output_zip_paths_all_sources.append(zip_file_path_single) all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Succeeded ---\n{status_message_single}") else: all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Failed ---\n{status_message_single}") except Exception as e_img_seq: error_msg = f"Critical error during processing of image sequence {image_source_file_prefix}: {e_img_seq}" print(error_msg) traceback.print_exc() all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing CRITICALLY FAILED ---\n{error_msg}") tasks_completed_count += 1 print(f">>> Finished attempt for Image Sequence: {image_source_file_prefix} <<<") # --- Process Video Files (if any) --- for video_idx, video_file_temp_obj in enumerate(video_file_temp_objects): video_path_temp = video_file_temp_obj.name video_original_filename = os.path.basename(getattr(video_file_temp_obj, 'orig_name', video_path_temp)) video_source_file_prefix = sanitize_filename(video_original_filename) current_task_number = tasks_completed_count + 1 progress_description_prefix = f"Video {current_task_number}/{total_processing_tasks}" print(f"\n>>> Processing Video: {video_original_filename} (Sanitized Prefix: {video_source_file_prefix}) <<<") progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting processing...") # It yields: (PIL.Image, frame_identifier_string, current_raw_frame_index_from_video, total_items_for_desc) # The third element will be the raw frame number based on CAP_PROP_POS_FRAMES or current_pos_ms # to align progress with total_items_for_desc (raw frame count). def video_frames_provider_generator(video_path: str, interval_ms: int) -> Iterator[Tuple[Image.Image, int, int, int]]: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video file for provider: {video_path}") return total_items_for_desc = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total_items_for_desc <= 0: print(f"Warning: Video {video_original_filename} reported {total_items_for_desc} frames. This might be inaccurate. Proceeding...") # If it's 0, the progress in _process_input_source_frames might behave unexpectedly. # Setting to 1 to avoid division by zero, but this means progress won't be very useful. total_items_for_desc = 1 # Fallback to prevent division by zero # processed_count_in_provider = 0 # Counts *sampled* frames, not used for progress index last_processed_ms = -float('inf') raw_frames_read_by_provider = 0 # Counts all frames read by cap.read() try: while True: # For progress, use current_pos_ms or CAP_PROP_POS_FRAMES # CAP_PROP_POS_FRAMES is a 0-based index of the next frame to be decoded/captured. current_raw_frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) # Use this for progress current_pos_ms_in_provider = cap.get(cv2.CAP_PROP_POS_MSEC) # Loop break condition (more robust) if raw_frames_read_by_provider > 0 and current_pos_ms_in_provider <= last_processed_ms and interval_ms > 0 : # If interval_ms is 0 or very small, current_pos_ms might not advance much for consecutive reads. # Adding a check for raw_frames_read_by_provider against a large number or CAP_PROP_FRAME_COUNT # could be an additional safety, but CAP_PROP_FRAME_COUNT can be unreliable. # The ret_frame check is the primary exit. pass # Let ret_frame handle the actual end. This check is for stuck videos. should_process_this_frame = current_pos_ms_in_provider >= last_processed_ms + interval_ms - 1 ret_frame, frame_cv_data = cap.read() if not ret_frame: # Primary exit point for the loop break raw_frames_read_by_provider +=1 # Incremented after successful read if should_process_this_frame: try: pil_img = convert_to_pil(frame_cv_data) last_processed_ms = current_pos_ms_in_provider yield pil_img, int(current_pos_ms_in_provider), current_raw_frame_index + 1, total_items_for_desc # Yield 1-based raw frame index except Exception as e_conv: print(f"Error converting frame at {current_pos_ms_in_provider}ms (raw index {current_raw_frame_index}) for {video_original_filename}: {e_conv}. Skipping.") finally: pass finally: if cap.isOpened(): cap.release() print(f" Video capture for provider ({video_original_filename}) released.") def video_progress_updater(item_progress_value: float, desc: str): overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") try: zip_file_path_single, status_message_single = _process_input_source_frames( source_file_prefix=video_source_file_prefix, frames_provider=video_frames_provider_generator(video_path_temp, sample_interval_ms), is_video_source=True, enable_person_detection=enable_person_detection, min_target_width_person_percentage=min_target_width_person_percentage, person_model_name=person_model_name, person_conf_threshold=person_conf_threshold, person_iou_threshold=person_iou_threshold, enable_halfbody_detection=enable_halfbody_detection, enable_halfbody_cropping=enable_halfbody_cropping, min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, halfbody_model_name=halfbody_model_name, halfbody_conf_threshold=halfbody_conf_threshold, halfbody_iou_threshold=halfbody_iou_threshold, enable_head_detection=enable_head_detection, enable_head_cropping=enable_head_cropping, min_crop_width_head_percentage=min_crop_width_head_percentage, enable_head_filtering=enable_head_filtering, head_model_name=head_model_name, head_conf_threshold=head_conf_threshold, head_iou_threshold=head_iou_threshold, enable_face_detection=enable_face_detection, enable_face_cropping=enable_face_cropping, min_crop_width_face_percentage=min_crop_width_face_percentage, enable_face_filtering=enable_face_filtering, face_model_name=face_model_name, face_conf_threshold=face_conf_threshold, face_iou_threshold=face_iou_threshold, enable_ccip_classification=enable_ccip_classification, ccip_model_name=ccip_model_name, ccip_threshold=ccip_threshold, enable_lpips_clustering=enable_lpips_clustering, lpips_threshold=lpips_threshold, enable_aesthetic_analysis=enable_aesthetic_analysis, aesthetic_model_name=aesthetic_model_name, progress_updater=video_progress_updater ) if zip_file_path_single: output_zip_paths_all_sources.append(zip_file_path_single) all_status_messages.append(f"--- Video ({video_original_filename}) Processing Succeeded ---\n{status_message_single}") else: all_status_messages.append(f"--- Video ({video_original_filename}) Processing Failed ---\n{status_message_single}") except Exception as e_vid: # This catches errors if process_video itself raises an unhandled exception # (though process_video has its own try-except) error_msg = f"Critical error during processing of video {video_original_filename}: {e_vid}" print(error_msg) traceback.print_exc() all_status_messages.append(f"--- Video ({video_original_filename}) Processing CRITICALLY FAILED ---\n{error_msg}") tasks_completed_count += 1 print(f">>> Finished attempt for Video: {video_original_filename} <<<") # Gradio manages the lifecycle of video_path_temp (the uploaded temp file) final_summary_message = "\n\n==============================\n\n".join(all_status_messages) successful_zips_count = len(output_zip_paths_all_sources) if successful_zips_count == 0 and total_processing_tasks > 0: final_summary_message = f"ALL {total_processing_tasks} INPUT SOURCE(S) FAILED TO PRODUCE A ZIP FILE.\n\n" + final_summary_message elif total_processing_tasks > 0: final_summary_message = f"Successfully processed {successful_zips_count} out of {total_processing_tasks} input source(s).\n\n" + final_summary_message else: # Should be caught earlier by "No processable files" final_summary_message = "No inputs were processed." progress(1.0, desc="All processing attempts finished.") # gr.Files output expects a list of file paths. An empty list is fine if no files. return output_zip_paths_all_sources, final_summary_message # --- Gradio Interface Setup --- css = """ /* Default (Light Mode) Styles */ #warning { background-color: #FFCCCB; /* Light red background */ padding: 10px; border-radius: 5px; color: #A00000; /* Dark red text */ border: 1px solid #E5B8B7; /* A slightly darker border for more definition */ } /* Dark Mode Styles */ @media (prefers-color-scheme: dark) { #warning { background-color: #5C1A1A; /* Darker red background, suitable for dark mode */ color: #FFDDDD; /* Light pink text, for good contrast against the dark red background */ border: 1px solid #8B0000; /* A more prominent dark red border in dark mode */ } } #status_box { white-space: pre-wrap !important; /* Ensure status messages show newlines */ font-family: monospace; /* Optional: Use monospace for better alignment */ } """ # --- Define Model Lists --- person_models = ['person_detect_v1.3_s', 'person_detect_v1.2_s', 'person_detect_v1.1_s', 'person_detect_v1.1_m', 'person_detect_v1_m', 'person_detect_v1.1_n', 'person_detect_v0_s', 'person_detect_v0_m', 'person_detect_v0_x'] halfbody_models = ['halfbody_detect_v1.0_s', 'halfbody_detect_v1.0_n', 'halfbody_detect_v0.4_s', 'halfbody_detect_v0.3_s', 'halfbody_detect_v0.2_s'] head_models = ['head_detect_v2.0_s', 'head_detect_v2.0_m', 'head_detect_v2.0_n', 'head_detect_v2.0_x', 'head_detect_v2.0_s_yv11', 'head_detect_v2.0_m_yv11', 'head_detect_v2.0_n_yv11', 'head_detect_v2.0_x_yv11', 'head_detect_v2.0_l_yv11'] face_models = ['face_detect_v1.4_s', 'face_detect_v1.4_n', 'face_detect_v1.3_s', 'face_detect_v1.3_n', 'face_detect_v1.2_s', 'face_detect_v1.1_s', 'face_detect_v1.1_n', 'face_detect_v1_s', 'face_detect_v1_n', 'face_detect_v0_s', 'face_detect_v0_n'] ccip_models = ['ccip-caformer-24-randaug-pruned', 'ccip-caformer-6-randaug-pruned_fp32', 'ccip-caformer-5_fp32'] aesthetic_models = ['swinv2pv3_v0_448_ls0.2_x', 'swinv2pv3_v0_448_ls0.2', 'caformer_s36_v0_ls0.2'] with gr.Blocks(css=css) as demo: gr.Markdown("# Video Processor using dghs-imgutils") gr.Markdown("Upload one or more videos, or a sequence of images. Videos are processed individually, while multiple images are treated as a single sequence. Each processed source (video or image sequence) is then sequentially analyzed by [dghs-imgutils](https://github.com/deepghs/imgutils) to detect subjects, classify items, and process its content according to your settings, ultimately generating a ZIP file with the extracted images.") gr.Markdown("**Detection Flow:** " + "[Person](https://dghs-imgutils.deepghs.org/main/api_doc/detect/person.html) ⇒ " + "[Half-Body](https://dghs-imgutils.deepghs.org/main/api_doc/detect/halfbody.html) (if no person) ⇒ " + "[Face](https://dghs-imgutils.deepghs.org/main/api_doc/detect/face.html) (on target) ⇒ " + "[Head](https://dghs-imgutils.deepghs.org/main/api_doc/detect/head.html) (on target).") gr.Markdown("**Analysis Flow:** " + "[CCIP](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/ccip.html) Clustering ⇒ " + "[LPIPS](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/lpips.html) Clustering ⇒ " + "[Aesthetic](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/dbaesthetic.html) Labeling.") gr.Markdown("**Note on CCIP Folders:** CCIP cluster folders are named `{source_prefix}_ccip_XXX`, sorted by image count (most images = `_ccip_000`).") gr.Markdown("**Note on LPIPS Folders:** LPIPS cluster folders (e.g., `lpips_XXX` or `lpips_sub_XXX`) are also sorted by image count within their scope. 'noise' folders are named explicitly.") with gr.Row(): with gr.Column(scale=1): # --- Input Components --- process_button = gr.Button("Process Input(s) & Generate ZIP(s)", variant="primary") input_files = gr.Files(label="Upload Videos or Image Sequences", file_types=['video', 'image'], file_count="multiple") sample_interval_ms = gr.Number(label="Sample Interval (ms, for videos)", value=1000, minimum=1, step=100) # --- Detection Options --- gr.Markdown("**Detection Options**") # --- Person Detection Block --- with gr.Accordion("Person Detection Options", open=True): enable_person_detection = gr.Checkbox(label="Enable Person Detection", value=True) with gr.Group() as person_detection_params_group: min_target_width_person_percentage_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.25, step=0.01, label="Min Target Width (% of Item Width)", info="Minimum width for a detected person to be processed (e.g., 0.25 = 25%)." ) person_model_name_dd = gr.Dropdown(person_models, label="PD Model", value=person_models[0]) person_conf_threshold = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="PD Conf") person_iou_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="PD IoU") enable_person_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_person_detection, outputs=person_detection_params_group) # --- Half-Body Detection Block --- with gr.Accordion("Half-Body Detection Options", open=True): enable_halfbody_detection = gr.Checkbox(label="Enable Half-Body Detection", value=True) with gr.Group() as halfbody_params_group: gr.Markdown("_Detects half-bodies in full items if Person Detection is off/fails._") enable_halfbody_cropping = gr.Checkbox(label="Use Half-Bodies as Targets", value=True) min_target_width_halfbody_percentage_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.25, step=0.01, label="Min Target Width (% of Item Width)", info="Minimum width for a detected half-body to be processed (e.g., 0.25 = 25%)." ) halfbody_model_name_dd = gr.Dropdown(halfbody_models, label="HBD Model", value=halfbody_models[0]) halfbody_conf_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="HBD Conf") halfbody_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HBD IoU") enable_halfbody_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_halfbody_detection, outputs=halfbody_params_group) # --- Face Detection Block --- with gr.Accordion("Face Detection Options", open=True): enable_face_detection = gr.Checkbox(label="Enable Face Detection", value=True) with gr.Group() as face_params_group: enable_face_filtering = gr.Checkbox(label="Filter Targets Without Detected Faces", value=True) enable_face_cropping = gr.Checkbox(label="Crop Detected Faces", value=False) min_crop_width_face_percentage_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.2, step=0.01, label="Min Crop Width (% of Parent Width)", info="Minimum width for a face crop relative to its parent image's width (e.g., 0.2 = 20%)." ) face_model_name_dd = gr.Dropdown(face_models, label="FD Model", value=face_models[0]) face_conf_threshold = gr.Slider(0.0, 1.0, value=0.25, step=0.05, label="FD Conf") face_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="FD IoU") enable_face_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_face_detection, outputs=face_params_group) # --- Head Detection Block --- with gr.Accordion("Head Detection Options", open=True): enable_head_detection = gr.Checkbox(label="Enable Head Detection", value=True) with gr.Group() as head_params_group: gr.Markdown("_Detects heads in targets. Crops if meets width req._") enable_head_filtering = gr.Checkbox(label="Filter Targets Without Heads", value=True) enable_head_cropping = gr.Checkbox(label="Crop Detected Heads", value=False) min_crop_width_head_percentage_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.2, step=0.01, label="Min Crop Width (% of Parent Width)", info="Minimum width for a head crop relative to its parent image's width (e.g., 0.2 = 20%)." ) head_model_name_dd = gr.Dropdown(head_models, label="HD Model", value=head_models[0]) head_conf_threshold = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="HD Conf") head_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HD IoU") enable_head_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_head_detection, outputs=head_params_group) # --- Analysis/Classification Options --- gr.Markdown("**Analysis & Classification**") # --- CCIP Classification Block --- with gr.Accordion("CCIP Classification Options", open=True): enable_ccip_classification = gr.Checkbox(label="Enable CCIP Classification", value=True) with gr.Group() as ccip_params_group: gr.Markdown("_Clusters results by similarity. Folders sorted by image count._") ccip_model_name_dd = gr.Dropdown(ccip_models, label="CCIP Model", value=ccip_models[0]) ccip_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.20, label="CCIP Similarity Threshold") enable_ccip_classification.change(fn=lambda e: gr.update(visible=e), inputs=enable_ccip_classification, outputs=ccip_params_group) # LPIPS Clustering Options with gr.Accordion("LPIPS Clustering Options", open=True): enable_lpips_clustering = gr.Checkbox(label="Enable LPIPS Clustering", value=True) with gr.Group() as lpips_params_group: gr.Markdown("_Clusters images by LPIPS similarity. Applied after CCIP (if enabled) or globally. Folders sorted by image count._") lpips_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.45, label="LPIPS Similarity Threshold") enable_lpips_clustering.change(fn=lambda e: gr.update(visible=e), inputs=enable_lpips_clustering, outputs=lpips_params_group) # --- Aesthetic Analysis Block --- with gr.Accordion("Aesthetic Analysis Options", open=True): enable_aesthetic_analysis = gr.Checkbox(label="Enable Aesthetic Analysis (Anime)", value=True) with gr.Group() as aesthetic_params_group: gr.Markdown("_Prepends aesthetic label to filenames._") aesthetic_model_name_dd = gr.Dropdown(aesthetic_models, label="Aesthetic Model", value=aesthetic_models[0]) enable_aesthetic_analysis.change(fn=lambda e: gr.update(visible=e), inputs=enable_aesthetic_analysis, outputs=aesthetic_params_group) gr.Markdown("---") gr.Markdown("**Warning:** Complex combinations can be slow. Models downloaded on first use.", elem_id="warning") with gr.Column(scale=1): # --- Output Components --- status_text = gr.Textbox(label="Processing Status", interactive=False, lines=20, elem_id="status_box") output_zips = gr.Files(label="Download Processed Images (ZIPs)") # Connect button click process_button.click( fn=process_inputs_main, inputs=[ input_files, sample_interval_ms, # Person Detect enable_person_detection, min_target_width_person_percentage_slider, person_model_name_dd, person_conf_threshold, person_iou_threshold, # HalfBody Detect enable_halfbody_detection, enable_halfbody_cropping, min_target_width_halfbody_percentage_slider, halfbody_model_name_dd, halfbody_conf_threshold, halfbody_iou_threshold, # Head Detect enable_head_detection, enable_head_cropping, min_crop_width_head_percentage_slider, enable_head_filtering, head_model_name_dd, head_conf_threshold, head_iou_threshold, # Face Detect enable_face_detection, enable_face_cropping, min_crop_width_face_percentage_slider, enable_face_filtering, face_model_name_dd, face_conf_threshold, face_iou_threshold, # CCIP enable_ccip_classification, ccip_model_name_dd, ccip_threshold_slider, # LPIPS enable_lpips_clustering, lpips_threshold_slider, # Aesthetic enable_aesthetic_analysis, aesthetic_model_name_dd, ], outputs=[output_zips, status_text] ) # --- Launch Script --- if __name__ == "__main__": print("Starting Gradio App...") # Model pre-check try: print("Checking/Downloading models (this might take a moment)...") # Use simple, small images for checks dummy_img_pil = Image.new('RGB', (64, 64), color = 'orange') print(" - Person detection...") _ = person_detector.detect_person(dummy_img_pil, model_name=person_models[0]) print(" - HalfBody detection...") _ = halfbody_detector.detect_halfbody(dummy_img_pil, model_name=halfbody_models[0]) print(" - Head detection...") _ = head_detector.detect_heads(dummy_img_pil, model_name=head_models[0]) print(" - Face detection...") _ = face_detector.detect_faces(dummy_img_pil, model_name=face_models[0]) print(" - CCIP feature extraction...") _ = ccip_analyzer.ccip_extract_feature(dummy_img_pil, size=384, model=ccip_models[0]) print(" - LPIPS feature extraction...") _ = lpips_module.lpips_extract_feature(dummy_img_pil) print(" - Aesthetic analysis...") _ = dbaesthetic_analyzer.anime_dbaesthetic(dummy_img_pil, model_name=aesthetic_models[0]) print("Models seem ready or downloaded.") del dummy_img_pil gc.collect() except Exception as model_err: print(f"\n--- !!! WARNING !!! ---") print(f"Could not pre-check/download all models: {model_err}") print(f"Models will be downloaded when first used by the application, which may cause a delay on the first run.") print(f"Check your internet connection and library installation (pip install \"dghs-imgutils[gpu]\").") print(f"-----------------------\n") # Launch the app demo.launch(inbrowser=True)