avans06's picture
Add application file
5262ee3
import io
import os
import gc
import re
import cv2
import time
import zipfile
import tempfile
import traceback
import numpy as np
import gradio as gr
import imgutils.detect.person as person_detector
import imgutils.detect.halfbody as halfbody_detector
import imgutils.detect.head as head_detector
import imgutils.detect.face as face_detector
import imgutils.metrics.ccip as ccip_analyzer
import imgutils.metrics.dbaesthetic as dbaesthetic_analyzer
import imgutils.metrics.lpips as lpips_module
from PIL import Image
from typing import List, Tuple, Dict, Any, Union, Optional, Iterator
# --- Constants for File Types ---
IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.tif', '.gif')
VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.webm', '.mpeg', '.mpg')
# --- Helper Functions ---
def sanitize_filename(filename: str, max_len: int = 50) -> str:
"""Removes invalid characters and shortens a filename for safe use."""
# Remove path components
base_name = os.path.basename(filename)
# Remove extension
name_part, _ = os.path.splitext(base_name)
# Replace spaces and problematic characters with underscores
sanitized = re.sub(r'[\\/*?:"<>|\s]+', '_', name_part)
# Remove leading/trailing underscores/periods
sanitized = sanitized.strip('._')
# Limit length (important for temp paths and OS limits)
sanitized = sanitized[:max_len]
# Ensure it's not empty after sanitization
if not sanitized:
return "file"
return sanitized
def convert_to_pil(frame: np.ndarray) -> Image.Image:
"""Converts an OpenCV frame (BGR) to a PIL Image (RGB)."""
# Add error handling for potentially empty frames
if frame is None or frame.size == 0:
raise ValueError("Cannot convert empty frame to PIL Image")
try:
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
except Exception as e:
# Re-raise with more context if conversion fails
raise RuntimeError(f"Failed to convert frame to PIL Image: {e}")
def image_to_bytes(img: Image.Image, format: str = 'PNG') -> bytes:
"""Converts a PIL Image to bytes."""
if img is None:
raise ValueError("Cannot convert None image to bytes")
byte_arr = io.BytesIO()
img.save(byte_arr, format=format)
return byte_arr.getvalue()
def create_zip_file(image_data: Dict[str, bytes], output_path: str) -> None:
"""
Creates a zip file containing the provided images directly at the output_path.
Args:
image_data: A dictionary where keys are filenames (including paths within zip)
and values are image bytes.
output_path: The full path where the zip file should be created.
"""
if not image_data:
raise ValueError("No image data provided to create zip file.")
if not output_path:
raise ValueError("No output path provided for the zip file.")
print(f"Creating zip file at: {output_path}")
try:
# Ensure parent directory exists (useful if output_path is nested)
# Though NamedTemporaryFile usually handles this for its own path.
parent_dir = os.path.dirname(output_path)
if parent_dir: # Check if there is a parent directory component
os.makedirs(parent_dir, exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Sort items for potentially better organization and predictability
for filename, img_bytes in sorted(image_data.items()):
zipf.writestr(filename, img_bytes)
print(f"Successfully created zip file with {len(image_data)} items at {output_path}.")
# No return value needed as we are writing to a path
except Exception as e:
print(f"Error creating zip file at {output_path}: {e}")
# If zip creation fails, attempt to remove the partially created file
if os.path.exists(output_path):
try:
os.remove(output_path)
print(f"Removed partially created/failed zip file: {output_path}")
except OSError as remove_err:
print(f"Warning: Could not remove failed zip file {output_path}: {remove_err}")
raise # Re-raise the original exception
def generate_filename(
base_name: str, # Should be the core identifier, e.g., "frame_X_person_Y_scoreZ"
aesthetic_label: Optional[str] = None,
ccip_cluster_id_for_lpips_logic: Optional[int] = None, # Original CCIP ID, used to decide if LPIPS is sub-cluster
ccip_folder_naming_index: Optional[int] = None, # The new 000, 001, ... index based on image count
source_prefix_for_ccip_folder: Optional[str] = None, # The source filename prefix for CCIP folder
lpips_folder_naming_index: Optional[Union[int, str]] = None, # New: Can be int (0,1,2...) or "noise"
file_extension: str = '.png',
# Suffix flags for this specific image:
is_halfbody_primary_target_type: bool = False, # If this image itself was a halfbody primary target
is_derived_head_crop: bool = False,
is_derived_face_crop: bool = False,
) -> str:
"""
Generates the final filename, incorporating aesthetic label, cluster directory,
and crop indicators. CCIP and LPIPS folder names are sorted by image count.
"""
filename_stem = base_name
# Add suffixes for derived crops.
# For halfbody primary targets, the base_name should already contain "halfbody".
# This flag is more for potentially adding a suffix if desired, but currently not used to add a suffix.
# if is_halfbody_primary_target_type:
# filename_stem += "_halfbody" # Potentially redundant if base_name good.
if is_derived_head_crop:
filename_stem += "_headCrop"
if is_derived_face_crop:
filename_stem += "_faceCrop"
filename_with_extension = filename_stem + file_extension
path_parts = []
# New CCIP folder naming based on source prefix and sorted index
if ccip_folder_naming_index is not None and source_prefix_for_ccip_folder is not None:
path_parts.append(f"{source_prefix_for_ccip_folder}_ccip_{ccip_folder_naming_index:03d}")
# LPIPS folder naming based on the new sorted index or "noise"
if lpips_folder_naming_index is not None:
lpips_folder_name_part_str: Optional[str] = None
if isinstance(lpips_folder_naming_index, str) and lpips_folder_naming_index == "noise":
lpips_folder_name_part_str = "noise"
elif isinstance(lpips_folder_naming_index, int):
lpips_folder_name_part_str = f"{lpips_folder_naming_index:03d}"
if lpips_folder_name_part_str is not None:
# Determine prefix based on whether the item was originally in a CCIP cluster
if ccip_cluster_id_for_lpips_logic is not None: # LPIPS is sub-cluster if item had an original CCIP ID
lpips_folder_name_base = "lpips_sub_"
else: # No CCIP, LPIPS is primary
lpips_folder_name_base = "lpips_"
path_parts.append(f"{lpips_folder_name_base}{lpips_folder_name_part_str}")
final_filename_part = filename_with_extension
if aesthetic_label:
final_filename_part = f"{aesthetic_label}_{filename_with_extension}"
if path_parts:
return f"{'/'.join(path_parts)}/{final_filename_part}"
else:
return final_filename_part
# --- Core Processing Function for a single source (video or image sequence) ---
def _process_input_source_frames(
source_file_prefix: str, # Sanitized name for this source (e.g., "myvideo" or "ImageGroup123")
# Iterator yielding: (PIL.Image, frame_identifier_string, current_item_index, total_items_for_desc)
# For videos, current_item_index is the 1-based raw frame number.
# For images, current_item_index is the 1-based image number in the sequence.
frames_provider: Iterator[Tuple[Image.Image, int, int, int]],
is_video_source: bool, # To adjust some logging/stats messages
# Person Detection
enable_person_detection: bool,
min_target_width_person_percentage: float,
person_model_name: str,
person_conf_threshold: float,
person_iou_threshold: float,
# Half-Body Detection
enable_halfbody_detection: bool,
enable_halfbody_cropping: bool,
min_target_width_halfbody_percentage: float,
halfbody_model_name: str,
halfbody_conf_threshold: float,
halfbody_iou_threshold: float,
# Head Detection
enable_head_detection: bool,
enable_head_cropping: bool,
min_crop_width_head_percentage: float,
enable_head_filtering: bool,
head_model_name: str,
head_conf_threshold: float,
head_iou_threshold: float,
# Face Detection
enable_face_detection: bool,
enable_face_cropping: bool,
min_crop_width_face_percentage: float,
enable_face_filtering: bool,
face_model_name: str,
face_conf_threshold: float,
face_iou_threshold: float,
# CCIP Classification
enable_ccip_classification: bool,
ccip_model_name: str,
ccip_threshold: float,
# LPIPS Clustering
enable_lpips_clustering: bool,
lpips_threshold: float,
# Aesthetic Analysis
enable_aesthetic_analysis: bool,
aesthetic_model_name: str,
# Gradio Progress (specific to this source's processing)
progress_updater # Function: (progress_value: float, desc: str) -> None
) -> Tuple[str | None, str]:
"""
Processes frames from a given source (video or image sequence) according to the specified parameters.
Order: Person => Half-Body (alternative) => Face Detection => Head Detection => CCIP => Aesthetic.
Returns:
A tuple containing:
- Path to the output zip file (or None if error).
- Status message string.
"""
# This list will hold data for images that pass all filters, BEFORE LPIPS and final zipping
images_pending_final_processing: List[Dict[str, Any]] = []
# CCIP specific data
ccip_clusters_info: List[Tuple[int, np.ndarray]] = []
next_ccip_cluster_id = 0
# Stats
processed_items_count = 0
total_persons_detected_raw, total_halfbodies_detected_raw = 0, 0
person_targets_processed_count, halfbody_targets_processed_count, fullframe_targets_processed_count = 0, 0, 0
total_faces_detected_on_targets, total_heads_detected_on_targets = 0, 0
# These count items added to images_pending_final_processing
main_targets_pending_count, face_crops_pending_count, head_crops_pending_count = 0, 0, 0
items_filtered_by_face_count, items_filtered_by_head_count = 0, 0
ccip_applied_count, aesthetic_applied_count = 0, 0
# LPIPS stats
lpips_images_subject_to_clustering, total_lpips_clusters_created, total_lpips_noise_samples = 0, 0, 0
gc_interval = 100 # items from provider
start_time = time.time()
# Progress update for initializing this specific video
progress_updater(0, desc=f"Initializing {source_file_prefix}...")
output_zip_path_temp = None
output_zip_path_final = None
try:
# --- Main Loop for processing items from the frames_provider ---
for pil_image_full_frame, frame_specific_index, current_item_index, total_items_for_desc in frames_provider:
progress_value_for_updater = (current_item_index) / total_items_for_desc if total_items_for_desc > 0 else 1.0
# The description string should reflect what current_item_index means
item_description = ""
if is_video_source:
# For video, total_items_in_source_for_description is total raw frames.
# current_item_index is the raw frame index of the *sampled* frame.
# We also need a counter for *sampled* frames for a "processed X of Y (sampled)" message.
# processed_items_count counts sampled frames.
item_description = f"Scanning frame {current_item_index}/{total_items_for_desc} (processed {processed_items_count + 1} sampled)"
else: # For images
item_description = f"image {current_item_index}/{total_items_for_desc}"
progress_updater(
min(progress_value_for_updater, 1.0), # Cap progress at 1.0
desc=f"Processing {item_description} for {source_file_prefix}"
)
# processed_items_count still counts how many items are yielded by the provider
# (i.e., how many sampled frames for video, or how many images for image sequence)
processed_items_count += 1
try:
full_frame_width = pil_image_full_frame.width # Store for percentage calculations
print(f"--- Processing item ID {frame_specific_index} (Width: {full_frame_width}px) for {source_file_prefix} ---")
# List to hold PIL images that are the primary subjects for this frame
# Each element: {'pil': Image, 'base_name': str, 'source_type': 'person'/'halfbody'/'fullframe'}
primary_targets_for_frame: List[Dict[str, Any]] = []
processed_primary_source_this_frame = False # Flag if Person or HalfBody yielded targets
# --- 1. Person Detection ---
if enable_person_detection and full_frame_width > 0:
print(" Attempting Person Detection...")
min_person_target_px_width = full_frame_width * min_target_width_person_percentage
person_detections = person_detector.detect_person(
pil_image_full_frame, model_name=person_model_name,
conf_threshold=person_conf_threshold, iou_threshold=person_iou_threshold
)
total_persons_detected_raw += len(person_detections)
if person_detections:
print(f" Detected {len(person_detections)} raw persons.")
valid_person_targets = 0
for i, (bbox, _, score) in enumerate(person_detections):
# Check width before full crop for minor optimization
detected_person_width = bbox[2] - bbox[0]
if detected_person_width >= min_person_target_px_width:
primary_targets_for_frame.append({
'pil': pil_image_full_frame.crop(bbox),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_person_{i}_score{int(score*100)}",
'source_type': 'person'})
person_targets_processed_count +=1
valid_person_targets +=1
else:
print(f" Person {i} width {detected_person_width}px < min {min_person_target_px_width:.0f}px. Skipping.")
if valid_person_targets > 0:
processed_primary_source_this_frame = True
print(f" Added {valid_person_targets} persons as primary targets.")
# --- 2. Half-Body Detection (if Person not processed and HBD enabled) ---
if not processed_primary_source_this_frame and enable_halfbody_detection and full_frame_width > 0:
print(" Attempting Half-Body Detection (on full item)...")
min_halfbody_target_px_width = full_frame_width * min_target_width_halfbody_percentage
halfbody_detections = halfbody_detector.detect_halfbody(
pil_image_full_frame, model_name=halfbody_model_name,
conf_threshold=halfbody_conf_threshold, iou_threshold=halfbody_iou_threshold
)
total_halfbodies_detected_raw += len(halfbody_detections)
if halfbody_detections:
print(f" Detected {len(halfbody_detections)} raw half-bodies.")
valid_halfbody_targets = 0
for i, (bbox, _, score) in enumerate(halfbody_detections):
detected_hb_width = bbox[2] - bbox[0]
# Cropping must be enabled and width must be sufficient for it to be a target
if enable_halfbody_cropping and detected_hb_width >= min_halfbody_target_px_width:
primary_targets_for_frame.append({
'pil': pil_image_full_frame.crop(bbox),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_halfbody_{i}_score{int(score*100)}",
'source_type': 'halfbody'})
halfbody_targets_processed_count +=1
valid_halfbody_targets +=1
elif enable_halfbody_cropping:
print(f" Half-body {i} width {detected_hb_width}px < min {min_halfbody_target_px_width:.0f}px. Skipping.")
if valid_halfbody_targets > 0:
processed_primary_source_this_frame = True
print(f" Added {valid_halfbody_targets} half-bodies as primary targets.")
# --- 3. Full Frame/Image (fallback) ---
if not processed_primary_source_this_frame:
print(" Processing Full Item as primary target.")
primary_targets_for_frame.append({
'pil': pil_image_full_frame.copy(),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_full",
'source_type': 'fullframe'})
fullframe_targets_processed_count += 1
# --- Process each identified primary_target_for_frame ---
for target_data in primary_targets_for_frame:
current_pil: Image.Image = target_data['pil']
current_base_name: str = target_data['base_name'] # Base name for this main target
current_source_type: str = target_data['source_type']
current_pil_width = current_pil.width # For sub-crop percentage calculations
print(f" Processing target: {current_base_name} (type: {current_source_type}, width: {current_pil_width}px)")
# Store PILs of successful crops from current_pil for this target
keep_this_target = True
item_area = current_pil_width * current_pil.height
potential_face_crops_pil: List[Image.Image] = []
potential_head_crops_pil: List[Image.Image] = []
# --- A. Face Detection ---
if keep_this_target and enable_face_detection and current_pil_width > 0:
print(f" Detecting faces in {current_base_name}...")
min_face_crop_px_width = current_pil_width * min_crop_width_face_percentage
face_detections = face_detector.detect_faces(
current_pil, model_name=face_model_name,
conf_threshold=face_conf_threshold, iou_threshold=face_iou_threshold
)
total_faces_detected_on_targets += len(face_detections)
if not face_detections and enable_face_filtering:
keep_this_target = False
items_filtered_by_face_count += 1
print(f" FILTERING TARGET {current_base_name} (no face).")
elif face_detections and enable_face_cropping:
for f_idx, (f_bbox, _, _) in enumerate(face_detections):
if (f_bbox[2]-f_bbox[0]) >= min_face_crop_px_width:
potential_face_crops_pil.append(current_pil.crop(f_bbox))
else:
print(f" Face {f_idx} too small. Skipping crop.")
# --- B. Head Detection ---
if keep_this_target and enable_head_detection and current_pil_width > 0:
print(f" Detecting heads in {current_base_name}...")
min_head_crop_px_width = current_pil_width * min_crop_width_head_percentage
head_detections = head_detector.detect_heads(
current_pil, model_name=head_model_name,
conf_threshold=head_conf_threshold, iou_threshold=head_iou_threshold
)
total_heads_detected_on_targets += len(head_detections)
if not head_detections and enable_head_filtering:
keep_this_target = False
items_filtered_by_head_count += 1
print(f" FILTERING TARGET {current_base_name} (no head).")
potential_face_crops_pil.clear() # Clear faces if head filter removed target
elif head_detections and enable_head_cropping:
for h_idx, (h_bbox, _, _) in enumerate(head_detections):
h_w = h_bbox[2]-h_bbox[0] # h_h = h_bbox[3]-h_bbox[1]
if h_w >= min_head_crop_px_width and item_area > 0:
potential_head_crops_pil.append(current_pil.crop(h_bbox))
else:
print(f" Head {h_idx} too small or too large relative to parent. Skipping crop.")
# --- If target is filtered, clean up and skip to next target ---
if not keep_this_target:
print(f" Target {current_base_name} was filtered by face/head presence rules. Discarding it and its potential crops.")
if current_pil is not None:
del current_pil
potential_face_crops_pil.clear()
potential_head_crops_pil.clear()
continue # To the next primary_target_for_frame
# --- C. CCIP Classification (on current_pil, if it's kept) ---
assigned_ccip_id = None # This is the original CCIP ID
if enable_ccip_classification:
print(f" Classifying {current_base_name} with CCIP...")
try:
feature = ccip_analyzer.ccip_extract_feature(current_pil, model=ccip_model_name)
best_match_cid = None
min_diff = float('inf')
# Find the best potential match among existing clusters
if ccip_clusters_info: # Only loop if there are clusters to compare against
for cid, rep_f in ccip_clusters_info:
diff = ccip_analyzer.ccip_difference(feature, rep_f, model=ccip_model_name)
if diff < min_diff:
min_diff = diff
best_match_cid = cid
# Decide whether to use the best match or create a new cluster
if best_match_cid is not None and min_diff < ccip_threshold:
assigned_ccip_id = best_match_cid
print(f" -> Matched Cluster {assigned_ccip_id} (Diff: {min_diff:.6f} <= Threshold {ccip_threshold:.3f})")
else:
# No suitable match found (either no clusters existed, or the best match's diff was strictly greater than threshold)
# Create a new cluster
assigned_ccip_id = next_ccip_cluster_id
ccip_clusters_info.append((assigned_ccip_id, feature))
if not ccip_clusters_info or len(ccip_clusters_info) == 1:
print(f" -> New Cluster {assigned_ccip_id} (First item or no prior suitable clusters)")
else:
# MODIFIED: Log message reflecting that new cluster is formed if diff > threshold
print(f" -> New Cluster {assigned_ccip_id} (Min diff to others: {min_diff:.6f} > Threshold {ccip_threshold:.3f})")
next_ccip_cluster_id += 1
print(f" CCIP: Target {current_base_name} -> Original Cluster ID {assigned_ccip_id}")
del feature
ccip_applied_count += 1
except Exception as e_ccip:
print(f" Error CCIP: {e_ccip}")
# --- D. Aesthetic Analysis (on current_pil, if it's kept) ---
item_aesthetic_label = None
if enable_aesthetic_analysis:
print(f" Analyzing {current_base_name} for aesthetics...")
try:
res = dbaesthetic_analyzer.anime_dbaesthetic(current_pil, model_name=aesthetic_model_name)
if isinstance(res, tuple) and len(res) >= 1:
item_aesthetic_label = res[0]
print(f" Aesthetic: Target {current_base_name} -> {item_aesthetic_label}")
aesthetic_applied_count += 1
except Exception as e_aes:
print(f" Error Aesthetic: {e_aes}")
add_current_pil_to_pending_list = True
if current_source_type == 'fullframe':
can_skip_fullframe_target = False
if enable_face_detection or enable_head_detection:
found_valid_sub_crop_from_enabled_detector = False
if enable_face_detection and len(potential_face_crops_pil) > 0:
found_valid_sub_crop_from_enabled_detector = True
if not found_valid_sub_crop_from_enabled_detector and \
enable_head_detection and len(potential_head_crops_pil) > 0:
found_valid_sub_crop_from_enabled_detector = True
if not found_valid_sub_crop_from_enabled_detector: # No valid crops from any enabled sub-detector
can_skip_fullframe_target = True # All enabled sub-detectors failed
if can_skip_fullframe_target:
add_current_pil_to_pending_list = False
print(f" Skipping save of fullframe target '{current_base_name}' because all enabled sub-detectors (Face/Head) yielded no valid-width crops.")
if add_current_pil_to_pending_list:
# --- E. Save current_pil (if it passed all filters) ---
# Add main target to pending list
images_pending_final_processing.append({
'pil_image': current_pil.copy(), 'base_name_for_filename': current_base_name,
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': (current_source_type == 'halfbody'),
'is_derived_head_crop': False, 'is_derived_face_crop': False,
'lpips_cluster_id': None, # Will be filled by LPIPS clustering
'lpips_folder_naming_index': None # Will be filled by LPIPS renaming
})
main_targets_pending_count +=1
# --- F. Save Face Crops (derived from current_pil) ---
for i, fc_pil in enumerate(potential_face_crops_pil):
images_pending_final_processing.append({
'pil_image': fc_pil, 'base_name_for_filename': f"{current_base_name}_face{i}",
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': False,
'is_derived_head_crop': False, 'is_derived_face_crop': True,
'lpips_cluster_id': None,
'lpips_folder_naming_index': None
})
face_crops_pending_count+=1
potential_face_crops_pil.clear()
# --- G. Save Head Crops (derived from current_pil) ---
for i, hc_pil in enumerate(potential_head_crops_pil):
images_pending_final_processing.append({
'pil_image': hc_pil, 'base_name_for_filename': f"{current_base_name}_head{i}",
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': False,
'is_derived_head_crop': True, 'is_derived_face_crop': False,
'lpips_cluster_id': None,
'lpips_folder_naming_index': None
})
head_crops_pending_count+=1
potential_head_crops_pil.clear()
if current_pil is not None: # Ensure current_pil exists before attempting to delete
del current_pil # Clean up the PIL for this target_data
primary_targets_for_frame.clear()
except Exception as item_proc_err:
print(f"!! Major Error processing item ID {frame_specific_index} for {source_file_prefix}: {item_proc_err}")
traceback.print_exc()
# Cleanup local vars for this item if error
if 'primary_targets_for_frame' in locals():
primary_targets_for_frame.clear()
# Also ensure current_pil from inner loop is cleaned up if error happened mid-loop
if 'current_pil' in locals() and current_pil is not None:
del current_pil
if processed_items_count % gc_interval == 0:
gc.collect()
print(f" [GC triggered at {processed_items_count} items for {source_file_prefix}]")
# --- End of Main Item Processing Loop ---
print(f"\nRunning final GC before LPIPS/Zipping for {source_file_prefix}...")
gc.collect()
if not images_pending_final_processing:
status_message = f"Processing for {source_file_prefix} finished, but no images were generated or passed filters for LPIPS/Zipping."
print(status_message)
return None, status_message
# --- LPIPS Clustering Stage ---
print(f"\n--- LPIPS Clustering Stage for {source_file_prefix} (Images pending: {len(images_pending_final_processing)}) ---")
if enable_lpips_clustering:
print(f" LPIPS Clustering enabled with threshold: {lpips_threshold}")
lpips_images_subject_to_clustering = len(images_pending_final_processing)
if enable_ccip_classification and next_ccip_cluster_id > 0: # CCIP was used
print(" LPIPS clustering within CCIP clusters.")
images_by_ccip: Dict[Optional[int], List[int]] = {} # ccip_id -> list of original indices
for i, item_data in enumerate(images_pending_final_processing):
ccip_id = item_data['ccip_cluster_id'] # Original CCIP ID
if ccip_id not in images_by_ccip:
images_by_ccip[ccip_id] = []
images_by_ccip[ccip_id].append(i)
for ccip_id, indices_in_ccip_cluster in images_by_ccip.items():
pils_for_lpips_sub_cluster = [images_pending_final_processing[idx]['pil_image'] for idx in indices_in_ccip_cluster]
if len(pils_for_lpips_sub_cluster) > 1:
print(f" Clustering {len(pils_for_lpips_sub_cluster)} images in CCIP cluster {ccip_id}...")
try:
lpips_sub_ids = lpips_module.lpips_clustering(pils_for_lpips_sub_cluster, threshold=lpips_threshold)
for i_sub, lpips_id in enumerate(lpips_sub_ids):
original_idx = indices_in_ccip_cluster[i_sub]
images_pending_final_processing[original_idx]['lpips_cluster_id'] = lpips_id
except Exception as e_lpips_sub:
print(f" Error LPIPS sub-cluster CCIP {ccip_id}: {e_lpips_sub}")
elif len(pils_for_lpips_sub_cluster) == 1:
images_pending_final_processing[indices_in_ccip_cluster[0]]['lpips_cluster_id'] = 0 # type: ignore
del images_by_ccip
if 'pils_for_lpips_sub_cluster' in locals():
del pils_for_lpips_sub_cluster # Ensure cleanup
else: # LPIPS on all images globally
print(" LPIPS clustering on all collected images.")
all_pils_for_global_lpips = [item['pil_image'] for item in images_pending_final_processing]
if len(all_pils_for_global_lpips) > 1:
try:
lpips_global_ids = lpips_module.lpips_clustering(all_pils_for_global_lpips, threshold=lpips_threshold)
for i, lpips_id in enumerate(lpips_global_ids):
images_pending_final_processing[i]['lpips_cluster_id'] = lpips_id
except Exception as e_lpips_global:
print(f" Error LPIPS global: {e_lpips_global}")
elif len(all_pils_for_global_lpips) == 1:
images_pending_final_processing[0]['lpips_cluster_id'] = 0 # type: ignore
del all_pils_for_global_lpips
# Calculate LPIPS stats
all_final_lpips_ids = [item.get('lpips_cluster_id') for item in images_pending_final_processing if item.get('lpips_cluster_id') is not None]
if all_final_lpips_ids:
unique_lpips_clusters = set(filter(lambda x: x != -1, all_final_lpips_ids))
total_lpips_clusters_created = len(unique_lpips_clusters)
total_lpips_noise_samples = sum(1 for x in all_final_lpips_ids if x == -1)
else:
print(" LPIPS Clustering disabled.")
# --- CCIP Folder Renaming Logic ---
original_ccip_id_to_new_naming_index: Dict[int, int] = {}
if enable_ccip_classification:
print(f" Preparing CCIP folder renaming for {source_file_prefix}...")
ccip_image_counts: Dict[int, int] = {} # original_ccip_id -> count of images in it
for item_data_for_count in images_pending_final_processing:
original_ccip_id_val = item_data_for_count.get('ccip_cluster_id')
if original_ccip_id_val is not None:
ccip_image_counts[original_ccip_id_val] = ccip_image_counts.get(original_ccip_id_val, 0) + 1
if ccip_image_counts:
# Sort original ccip_ids by their counts in descending order
sorted_ccip_groups_by_count: List[Tuple[int, int]] = sorted(
ccip_image_counts.items(),
key=lambda item: item[1], # Sort by count
reverse=True
)
for new_idx, (original_id, count) in enumerate(sorted_ccip_groups_by_count):
original_ccip_id_to_new_naming_index[original_id] = new_idx
print(f" CCIP Remap for {source_file_prefix}: Original ID {original_id} (count: {count}) -> New Naming Index {new_idx:03d}")
else:
print(f" No CCIP-assigned images found for {source_file_prefix} to perform renaming.")
# --- LPIPS Folder Renaming Logic ---
if enable_lpips_clustering:
print(f" Preparing LPIPS folder renaming for {source_file_prefix}...")
# Initialize/Reset lpips_folder_naming_index for all items
for item_data in images_pending_final_processing:
item_data['lpips_folder_naming_index'] = None
if enable_ccip_classification and next_ccip_cluster_id > 0: # LPIPS within CCIP
print(f" LPIPS renaming within CCIP clusters for {source_file_prefix}.")
items_grouped_by_original_ccip: Dict[Optional[int], List[Dict[str, Any]]] = {}
for item_data in images_pending_final_processing:
original_ccip_id = item_data.get('ccip_cluster_id')
if original_ccip_id not in items_grouped_by_original_ccip: items_grouped_by_original_ccip[original_ccip_id] = []
items_grouped_by_original_ccip[original_ccip_id].append(item_data)
for original_ccip_id, items_in_ccip in items_grouped_by_original_ccip.items():
lpips_counts_in_ccip: Dict[int, int] = {} # original_lpips_id (non-noise) -> count
for item_data in items_in_ccip:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None and lpips_id != -1:
lpips_counts_in_ccip[lpips_id] = lpips_counts_in_ccip.get(lpips_id, 0) + 1
lpips_id_to_naming_in_ccip: Dict[int, Union[int, str]] = {}
if lpips_counts_in_ccip:
sorted_lpips = sorted(lpips_counts_in_ccip.items(), key=lambda x: x[1], reverse=True)
for new_idx, (lpips_id, count) in enumerate(sorted_lpips):
lpips_id_to_naming_in_ccip[lpips_id] = new_idx
ccip_disp = f"OrigCCIP-{original_ccip_id}" if original_ccip_id is not None else "NoCCIP"
print(f" LPIPS Remap in {ccip_disp}: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}")
for item_data in items_in_ccip:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None:
if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise"
elif lpips_id in lpips_id_to_naming_in_ccip:
item_data['lpips_folder_naming_index'] = lpips_id_to_naming_in_ccip[lpips_id]
del items_grouped_by_original_ccip
else: # Global LPIPS
print(f" Global LPIPS renaming for {source_file_prefix}.")
global_lpips_counts: Dict[int, int] = {}
for item_data in images_pending_final_processing:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None and lpips_id != -1:
global_lpips_counts[lpips_id] = global_lpips_counts.get(lpips_id, 0) + 1
global_lpips_id_to_naming: Dict[int, Union[int, str]] = {}
if global_lpips_counts:
sorted_global_lpips = sorted(global_lpips_counts.items(), key=lambda x: x[1], reverse=True)
for new_idx, (lpips_id, count) in enumerate(sorted_global_lpips):
global_lpips_id_to_naming[lpips_id] = new_idx
print(f" Global LPIPS Remap: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}")
for item_data in images_pending_final_processing:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None:
if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise"
elif lpips_id in global_lpips_id_to_naming:
item_data['lpips_folder_naming_index'] = global_lpips_id_to_naming[lpips_id]
gc.collect()
# --- Final Zipping Stage ---
images_to_zip: Dict[str, bytes] = {}
print(f"\n--- Final Zipping Stage for {source_file_prefix} ({len(images_pending_final_processing)} items) ---")
for item_data in images_pending_final_processing:
original_ccip_id_for_item = item_data.get('ccip_cluster_id')
current_ccip_naming_idx_for_folder: Optional[int] = None
if enable_ccip_classification and original_ccip_id_for_item is not None and \
original_ccip_id_for_item in original_ccip_id_to_new_naming_index:
current_ccip_naming_idx_for_folder = original_ccip_id_to_new_naming_index[original_ccip_id_for_item]
current_lpips_naming_idx_for_folder = item_data.get('lpips_folder_naming_index')
final_filename = generate_filename(
base_name=item_data['base_name_for_filename'],
aesthetic_label=item_data.get('aesthetic_label'),
ccip_cluster_id_for_lpips_logic=original_ccip_id_for_item,
ccip_folder_naming_index=current_ccip_naming_idx_for_folder,
source_prefix_for_ccip_folder=source_file_prefix if current_ccip_naming_idx_for_folder is not None else None,
lpips_folder_naming_index=current_lpips_naming_idx_for_folder,
is_halfbody_primary_target_type=item_data['is_halfbody_primary_target_type'],
is_derived_head_crop=item_data['is_derived_head_crop'],
is_derived_face_crop=item_data['is_derived_face_crop']
)
try:
images_to_zip[final_filename] = image_to_bytes(item_data['pil_image'])
except Exception as e_bytes:
print(f" Error converting/adding {final_filename} to zip: {e_bytes}")
finally:
if 'pil_image' in item_data and item_data['pil_image'] is not None:
del item_data['pil_image']
images_pending_final_processing.clear()
if not images_to_zip:
status_message = f"Processing for {source_file_prefix} finished, but no images were converted for zipping."
print(status_message)
return None, status_message
print(f"Preparing zip file for {source_file_prefix} with {len(images_to_zip)} images...")
progress_updater(1.0, desc=f"Creating Zip File for {source_file_prefix}...")
zip_start_time = time.time()
# Use NamedTemporaryFile with delete=False for the final output path
# This file will persist until manually cleaned or OS cleanup
temp_zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
output_zip_path_temp = temp_zip_file.name
temp_zip_file.close() # Close the handle, but file remains
try:
# Write data to the temporary file path
create_zip_file(images_to_zip, output_zip_path_temp)
zip_duration = time.time() - zip_start_time
print(f"Temporary zip file for {source_file_prefix} created in {zip_duration:.2f} seconds at {output_zip_path_temp}")
# Construct the new, desired filename
temp_dir = os.path.dirname(output_zip_path_temp)
timestamp = int(time.time())
desired_filename = f"{source_file_prefix}_processed_{timestamp}.zip"
output_zip_path_final = os.path.join(temp_dir, desired_filename)
# Rename the temporary file to the desired name
print(f"Renaming temp file for {source_file_prefix} to: {output_zip_path_final}")
os.rename(output_zip_path_temp, output_zip_path_final)
print("Rename successful.")
output_zip_path_temp = None # Clear temp path as it's been renamed
except Exception as zip_or_rename_err:
print(f"Error during zip creation or renaming for {source_file_prefix}: {zip_or_rename_err}")
# Clean up the *original* temp file if it still exists and renaming failed
if output_zip_path_temp and os.path.exists(output_zip_path_temp):
try:
os.remove(output_zip_path_temp)
except OSError:
pass
if output_zip_path_final and os.path.exists(output_zip_path_final): # Check if rename partially happened
try:
os.remove(output_zip_path_final)
except OSError:
pass
raise zip_or_rename_err # Re-raise the error
# --- Prepare Status Message ---
processing_duration = time.time() - start_time - zip_duration # Exclude zipping time from processing time
total_duration = time.time() - start_time # Includes zipping/renaming
# --- Build final status message ---
person_stats = "N/A"
if enable_person_detection:
person_stats = f"{total_persons_detected_raw} raw, {person_targets_processed_count} targets (>{min_target_width_person_percentage*100:.1f}% itemW)"
halfbody_stats = "N/A"
if enable_halfbody_detection:
halfbody_stats = f"{total_halfbodies_detected_raw} raw, {halfbody_targets_processed_count} targets (>{min_target_width_halfbody_percentage*100:.1f}% itemW)"
fullframe_stats = f"{fullframe_targets_processed_count} targets"
face_stats = "N/A"
if enable_face_detection:
face_stats = f"{total_faces_detected_on_targets} on targets, {face_crops_pending_count} crops pending (>{min_crop_width_face_percentage*100:.1f}% parentW)"
if enable_face_filtering:
face_stats += f", {items_filtered_by_face_count} targets filtered"
head_stats = "N/A"
if enable_head_detection:
head_stats = f"{total_heads_detected_on_targets} on targets, {head_crops_pending_count} crops pending (>{min_crop_width_head_percentage*100:.1f}% parentW)"
if enable_head_filtering:
head_stats += f", {items_filtered_by_head_count} targets filtered"
ccip_stats = "N/A"
if enable_ccip_classification:
ccip_stats = f"{next_ccip_cluster_id} original clusters created, on {ccip_applied_count} targets. Folders renamed by image count."
lpips_stats = "N/A"
if enable_lpips_clustering:
lpips_stats = f"{lpips_images_subject_to_clustering} images processed, {total_lpips_clusters_created} clusters, {total_lpips_noise_samples} noise. Folders renamed by image count."
aesthetic_stats = "N/A"
if enable_aesthetic_analysis:
aesthetic_stats = f"On {aesthetic_applied_count} targets"
item_desc_for_stats = "Items from Provider" if not is_video_source else "Sampled Frames"
status_message = (
f"Processing for '{source_file_prefix}' Complete!\n"
f"Total time: {total_duration:.2f}s (Proc: {processing_duration:.2f}s, Zip: {zip_duration:.2f}s)\n"
f"{item_desc_for_stats}: {total_items_for_desc}, Processed Items: {processed_items_count}\n"
f"--- Primary Targets Processed ---\n"
f" Person Detection: {person_stats}\n"
f" Half-Body Detection: {halfbody_stats}\n"
f" Full Item Processing: {fullframe_stats}\n"
f"--- Items Pending Final Processing ({main_targets_pending_count} main, {face_crops_pending_count} face, {head_crops_pending_count} head) ---\n"
f" Face Detection: {face_stats}\n"
f" Head Detection: {head_stats}\n"
f" CCIP Classification: {ccip_stats}\n"
f" LPIPS Clustering: {lpips_stats}\n"
f" Aesthetic Analysis: {aesthetic_stats}\n"
f"Zip file contains {len(images_to_zip)} images.\n"
f"Output Zip: {output_zip_path_final}"
)
print(status_message)
progress_updater(1.0, desc=f"Finished {source_file_prefix}!")
# Return the path to the zip file
return output_zip_path_final, status_message
except Exception as e:
print(f"!! An unhandled error occurred during processing of {source_file_prefix}: {e}")
traceback.print_exc() # Print detailed traceback for debugging
# Clean up main data structures
images_pending_final_processing.clear()
ccip_clusters_info.clear()
gc.collect()
# Clean up temp file if it exists on general error
if output_zip_path_temp and os.path.exists(output_zip_path_temp):
try:
os.remove(output_zip_path_temp)
except OSError:
pass
# Clean up final file if it exists on general error (maybe renaming succeeded but later code failed)
if output_zip_path_final and os.path.exists(output_zip_path_final):
try:
os.remove(output_zip_path_final)
except OSError:
pass
return None, f"An error occurred with {source_file_prefix}: {e}"
# --- Main Processing Function for Input files ---
def process_inputs_main(
input_file_objects: List[Any], # Gradio File component gives list of tempfile._TemporaryFileWrapper
sample_interval_ms: int, # Relevant for videos only
# Person Detection
enable_person_detection: bool,
min_target_width_person_percentage: float,
person_model_name: str,
person_conf_threshold: float,
person_iou_threshold: float,
# Half-Body Detection
enable_halfbody_detection: bool,
enable_halfbody_cropping: bool,
min_target_width_halfbody_percentage: float,
halfbody_model_name: str,
halfbody_conf_threshold: float,
halfbody_iou_threshold: float,
# Head Detection
enable_head_detection: bool,
enable_head_cropping: bool,
min_crop_width_head_percentage: float,
enable_head_filtering: bool,
head_model_name: str,
head_conf_threshold: float,
head_iou_threshold: float,
# Face Detection
enable_face_detection: bool,
enable_face_cropping: bool,
min_crop_width_face_percentage: float,
enable_face_filtering: bool,
face_model_name: str,
face_conf_threshold: float,
face_iou_threshold: float,
# CCIP Classification
enable_ccip_classification: bool,
ccip_model_name: str,
ccip_threshold: float,
# LPIPS Clustering
enable_lpips_clustering: bool,
lpips_threshold: float,
# Aesthetic Analysis
enable_aesthetic_analysis: bool,
aesthetic_model_name: str,
progress=gr.Progress(track_tqdm=True) # Gradio progress for overall processing
) -> Tuple[Optional[List[str]], str]: # Returns list of ZIP paths and combined status
if not input_file_objects:
return [], "Error: No files provided."
video_file_temp_objects: List[Any] = []
image_file_temp_objects: List[Any] = []
for file_obj in input_file_objects:
# gr.Files returns a list of tempfile._TemporaryFileWrapper objects
# We need the .name attribute to get the actual file path
file_name = getattr(file_obj, 'orig_name', file_obj.name) # Use original name if available
if isinstance(file_name, str):
lower_file_name = file_name.lower()
if any(lower_file_name.endswith(ext) for ext in VIDEO_EXTENSIONS):
video_file_temp_objects.append(file_obj)
elif any(lower_file_name.endswith(ext) for ext in IMAGE_EXTENSIONS):
image_file_temp_objects.append(file_obj)
else:
print(f"Warning: File '{file_name}' has an unrecognized extension and will be skipped.")
else:
print(f"Warning: File object {file_obj} does not have a valid name and will be skipped.")
output_zip_paths_all_sources = []
all_status_messages = []
total_processing_tasks = (1 if image_file_temp_objects else 0) + len(video_file_temp_objects)
if total_processing_tasks == 0:
return [], "No processable video or image files found in the input."
tasks_completed_count = 0
# Print overall settings once
print(f"--- Overall Batch Processing Settings ---")
print(f" Number of image sequences to process: {1 if image_file_temp_objects else 0}")
print(f" Number of videos to process: {len(video_file_temp_objects)}")
print(f" Sample Interval (for videos): {sample_interval_ms}ms")
print(f" Detection Order: Person => Half-Body (alt) => Face => Head. Then: CCIP => LPIPS => Aesthetic.")
print(f" Person Detect = {enable_person_detection}" + (f" (MinW:{min_target_width_person_percentage*100:.1f}%, Mdl:{person_model_name}, Conf:{person_conf_threshold:.2f}, IoU:{person_iou_threshold:.2f})" if enable_person_detection else ""))
print(f" HalfBody Detect = {enable_halfbody_detection}" + (f" (FullFrameOnly, Crop:{enable_halfbody_cropping}, MinW:{min_target_width_halfbody_percentage*100:.1f}%, Mdl:{halfbody_model_name}, Conf:{halfbody_conf_threshold:.2f}, IoU:{halfbody_iou_threshold:.2f})" if enable_halfbody_detection else ""))
print(f" Face Detect = {enable_face_detection}" + (f" (Crop:{enable_face_cropping}, MinW:{min_crop_width_face_percentage*100:.1f}%, Filter:{enable_face_filtering}, Mdl:{face_model_name}, Conf:{face_conf_threshold:.2f}, IoU:{face_iou_threshold:.2f})" if enable_face_detection else ""))
print(f" Head Detect = {enable_head_detection}" + (f" (Crop:{enable_head_cropping}, MinW:{min_crop_width_head_percentage*100:.1f}%, Filter:{enable_head_filtering}, Mdl:{head_model_name}, Conf:{head_conf_threshold:.2f}, IoU:{head_iou_threshold:.2f})" if enable_head_detection else ""))
print(f" CCIP Classify = {enable_ccip_classification}" + (f" (Mdl:{ccip_model_name}, Thr:{ccip_threshold:.3f})" if enable_ccip_classification else ""))
print(f" LPIPS Clustering = {enable_lpips_clustering}" + (f" (Thr:{lpips_threshold:.3f})" if enable_lpips_clustering else ""))
print(f" Aesthetic Analyze = {enable_aesthetic_analysis}" + (f" (Mdl:{aesthetic_model_name})" if enable_aesthetic_analysis else ""))
print(f"--- End of Overall Settings ---")
# --- Process Image Sequence (if any) ---
if image_file_temp_objects:
image_group_label_base = "ImageGroup"
# Attempt to use first image name for more uniqueness, fallback to timestamp
try:
first_image_orig_name = getattr(image_file_temp_objects[0], 'orig_name', image_file_temp_objects[0].name)
image_group_label_base = sanitize_filename(first_image_orig_name, max_len=20)
except:
pass # Stick with "ImageGroup"
image_source_file_prefix = f"{image_group_label_base}_{int(time.time())}"
current_task_number = tasks_completed_count + 1
progress_description_prefix = f"Image Seq. {current_task_number}/{total_processing_tasks} ({image_source_file_prefix})"
progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting...")
print(f"\n>>> Processing Image Sequence: {image_source_file_prefix} ({len(image_file_temp_objects)} images) <<<")
def image_frames_provider_generator() -> Iterator[Tuple[Image.Image, int, int, int]]:
num_images = len(image_file_temp_objects)
for idx, img_obj in enumerate(image_file_temp_objects):
try:
pil_img = Image.open(img_obj.name).convert('RGB')
yield pil_img, idx, idx + 1, num_images
except Exception as e_load:
print(f"Error loading image {getattr(img_obj, 'orig_name', img_obj.name)}: {e_load}. Skipping.")
# If we skip, the total_items_in_source for _process_input_source_frames might be off
# For simplicity, we'll proceed, but this could be refined to adjust total_items dynamically.
# Or, pre-filter loadable images. For now, just skip.
continue
def image_group_progress_updater(item_progress_value: float, desc: str):
overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks
progress(overall_progress, desc=f"{progress_description_prefix}: {desc}")
try:
zip_file_path_single, status_message_single = _process_input_source_frames(
source_file_prefix=image_source_file_prefix,
frames_provider=image_frames_provider_generator(),
is_video_source=False,
enable_person_detection=enable_person_detection,
min_target_width_person_percentage=min_target_width_person_percentage,
person_model_name=person_model_name,
person_conf_threshold=person_conf_threshold,
person_iou_threshold=person_iou_threshold,
enable_halfbody_detection=enable_halfbody_detection,
enable_halfbody_cropping=enable_halfbody_cropping,
min_target_width_halfbody_percentage=min_target_width_halfbody_percentage,
halfbody_model_name=halfbody_model_name,
halfbody_conf_threshold=halfbody_conf_threshold,
halfbody_iou_threshold=halfbody_iou_threshold,
enable_head_detection=enable_head_detection,
enable_head_cropping=enable_head_cropping,
min_crop_width_head_percentage=min_crop_width_head_percentage,
enable_head_filtering=enable_head_filtering,
head_model_name=head_model_name,
head_conf_threshold=head_conf_threshold,
head_iou_threshold=head_iou_threshold,
enable_face_detection=enable_face_detection,
enable_face_cropping=enable_face_cropping,
min_crop_width_face_percentage=min_crop_width_face_percentage,
enable_face_filtering=enable_face_filtering,
face_model_name=face_model_name,
face_conf_threshold=face_conf_threshold,
face_iou_threshold=face_iou_threshold,
enable_ccip_classification=enable_ccip_classification,
ccip_model_name=ccip_model_name,
ccip_threshold=ccip_threshold,
enable_lpips_clustering=enable_lpips_clustering,
lpips_threshold=lpips_threshold,
enable_aesthetic_analysis=enable_aesthetic_analysis,
aesthetic_model_name=aesthetic_model_name,
progress_updater=image_group_progress_updater
)
if zip_file_path_single:
output_zip_paths_all_sources.append(zip_file_path_single)
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Succeeded ---\n{status_message_single}")
else:
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Failed ---\n{status_message_single}")
except Exception as e_img_seq:
error_msg = f"Critical error during processing of image sequence {image_source_file_prefix}: {e_img_seq}"
print(error_msg)
traceback.print_exc()
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing CRITICALLY FAILED ---\n{error_msg}")
tasks_completed_count += 1
print(f">>> Finished attempt for Image Sequence: {image_source_file_prefix} <<<")
# --- Process Video Files (if any) ---
for video_idx, video_file_temp_obj in enumerate(video_file_temp_objects):
video_path_temp = video_file_temp_obj.name
video_original_filename = os.path.basename(getattr(video_file_temp_obj, 'orig_name', video_path_temp))
video_source_file_prefix = sanitize_filename(video_original_filename)
current_task_number = tasks_completed_count + 1
progress_description_prefix = f"Video {current_task_number}/{total_processing_tasks}"
print(f"\n>>> Processing Video: {video_original_filename} (Sanitized Prefix: {video_source_file_prefix}) <<<")
progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting processing...")
# It yields: (PIL.Image, frame_identifier_string, current_raw_frame_index_from_video, total_items_for_desc)
# The third element will be the raw frame number based on CAP_PROP_POS_FRAMES or current_pos_ms
# to align progress with total_items_for_desc (raw frame count).
def video_frames_provider_generator(video_path: str, interval_ms: int) -> Iterator[Tuple[Image.Image, int, int, int]]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file for provider: {video_path}")
return
total_items_for_desc = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_items_for_desc <= 0:
print(f"Warning: Video {video_original_filename} reported {total_items_for_desc} frames. This might be inaccurate. Proceeding...")
# If it's 0, the progress in _process_input_source_frames might behave unexpectedly.
# Setting to 1 to avoid division by zero, but this means progress won't be very useful.
total_items_for_desc = 1 # Fallback to prevent division by zero
# processed_count_in_provider = 0 # Counts *sampled* frames, not used for progress index
last_processed_ms = -float('inf')
raw_frames_read_by_provider = 0 # Counts all frames read by cap.read()
try:
while True:
# For progress, use current_pos_ms or CAP_PROP_POS_FRAMES
# CAP_PROP_POS_FRAMES is a 0-based index of the next frame to be decoded/captured.
current_raw_frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) # Use this for progress
current_pos_ms_in_provider = cap.get(cv2.CAP_PROP_POS_MSEC)
# Loop break condition (more robust)
if raw_frames_read_by_provider > 0 and current_pos_ms_in_provider <= last_processed_ms and interval_ms > 0 :
# If interval_ms is 0 or very small, current_pos_ms might not advance much for consecutive reads.
# Adding a check for raw_frames_read_by_provider against a large number or CAP_PROP_FRAME_COUNT
# could be an additional safety, but CAP_PROP_FRAME_COUNT can be unreliable.
# The ret_frame check is the primary exit.
pass # Let ret_frame handle the actual end. This check is for stuck videos.
should_process_this_frame = current_pos_ms_in_provider >= last_processed_ms + interval_ms - 1
ret_frame, frame_cv_data = cap.read()
if not ret_frame: # Primary exit point for the loop
break
raw_frames_read_by_provider +=1 # Incremented after successful read
if should_process_this_frame:
try:
pil_img = convert_to_pil(frame_cv_data)
last_processed_ms = current_pos_ms_in_provider
yield pil_img, int(current_pos_ms_in_provider), current_raw_frame_index + 1, total_items_for_desc # Yield 1-based raw frame index
except Exception as e_conv:
print(f"Error converting frame at {current_pos_ms_in_provider}ms (raw index {current_raw_frame_index}) for {video_original_filename}: {e_conv}. Skipping.")
finally:
pass
finally:
if cap.isOpened():
cap.release()
print(f" Video capture for provider ({video_original_filename}) released.")
def video_progress_updater(item_progress_value: float, desc: str):
overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks
progress(overall_progress, desc=f"{progress_description_prefix}: {desc}")
try:
zip_file_path_single, status_message_single = _process_input_source_frames(
source_file_prefix=video_source_file_prefix,
frames_provider=video_frames_provider_generator(video_path_temp, sample_interval_ms),
is_video_source=True,
enable_person_detection=enable_person_detection,
min_target_width_person_percentage=min_target_width_person_percentage,
person_model_name=person_model_name,
person_conf_threshold=person_conf_threshold,
person_iou_threshold=person_iou_threshold,
enable_halfbody_detection=enable_halfbody_detection,
enable_halfbody_cropping=enable_halfbody_cropping,
min_target_width_halfbody_percentage=min_target_width_halfbody_percentage,
halfbody_model_name=halfbody_model_name,
halfbody_conf_threshold=halfbody_conf_threshold,
halfbody_iou_threshold=halfbody_iou_threshold,
enable_head_detection=enable_head_detection,
enable_head_cropping=enable_head_cropping,
min_crop_width_head_percentage=min_crop_width_head_percentage,
enable_head_filtering=enable_head_filtering,
head_model_name=head_model_name,
head_conf_threshold=head_conf_threshold,
head_iou_threshold=head_iou_threshold,
enable_face_detection=enable_face_detection,
enable_face_cropping=enable_face_cropping,
min_crop_width_face_percentage=min_crop_width_face_percentage,
enable_face_filtering=enable_face_filtering,
face_model_name=face_model_name,
face_conf_threshold=face_conf_threshold,
face_iou_threshold=face_iou_threshold,
enable_ccip_classification=enable_ccip_classification,
ccip_model_name=ccip_model_name,
ccip_threshold=ccip_threshold,
enable_lpips_clustering=enable_lpips_clustering,
lpips_threshold=lpips_threshold,
enable_aesthetic_analysis=enable_aesthetic_analysis,
aesthetic_model_name=aesthetic_model_name,
progress_updater=video_progress_updater
)
if zip_file_path_single:
output_zip_paths_all_sources.append(zip_file_path_single)
all_status_messages.append(f"--- Video ({video_original_filename}) Processing Succeeded ---\n{status_message_single}")
else:
all_status_messages.append(f"--- Video ({video_original_filename}) Processing Failed ---\n{status_message_single}")
except Exception as e_vid:
# This catches errors if process_video itself raises an unhandled exception
# (though process_video has its own try-except)
error_msg = f"Critical error during processing of video {video_original_filename}: {e_vid}"
print(error_msg)
traceback.print_exc()
all_status_messages.append(f"--- Video ({video_original_filename}) Processing CRITICALLY FAILED ---\n{error_msg}")
tasks_completed_count += 1
print(f">>> Finished attempt for Video: {video_original_filename} <<<")
# Gradio manages the lifecycle of video_path_temp (the uploaded temp file)
final_summary_message = "\n\n==============================\n\n".join(all_status_messages)
successful_zips_count = len(output_zip_paths_all_sources)
if successful_zips_count == 0 and total_processing_tasks > 0:
final_summary_message = f"ALL {total_processing_tasks} INPUT SOURCE(S) FAILED TO PRODUCE A ZIP FILE.\n\n" + final_summary_message
elif total_processing_tasks > 0:
final_summary_message = f"Successfully processed {successful_zips_count} out of {total_processing_tasks} input source(s).\n\n" + final_summary_message
else: # Should be caught earlier by "No processable files"
final_summary_message = "No inputs were processed."
progress(1.0, desc="All processing attempts finished.")
# gr.Files output expects a list of file paths. An empty list is fine if no files.
return output_zip_paths_all_sources, final_summary_message
# --- Gradio Interface Setup ---
css = """
/* Default (Light Mode) Styles */
#warning {
background-color: #FFCCCB; /* Light red background */
padding: 10px;
border-radius: 5px;
color: #A00000; /* Dark red text */
border: 1px solid #E5B8B7; /* A slightly darker border for more definition */
}
/* Dark Mode Styles */
@media (prefers-color-scheme: dark) {
#warning {
background-color: #5C1A1A; /* Darker red background, suitable for dark mode */
color: #FFDDDD; /* Light pink text, for good contrast against the dark red background */
border: 1px solid #8B0000; /* A more prominent dark red border in dark mode */
}
}
#status_box {
white-space: pre-wrap !important; /* Ensure status messages show newlines */
font-family: monospace; /* Optional: Use monospace for better alignment */
}
"""
# --- Define Model Lists ---
person_models = ['person_detect_v1.3_s', 'person_detect_v1.2_s', 'person_detect_v1.1_s', 'person_detect_v1.1_m', 'person_detect_v1_m', 'person_detect_v1.1_n', 'person_detect_v0_s', 'person_detect_v0_m', 'person_detect_v0_x']
halfbody_models = ['halfbody_detect_v1.0_s', 'halfbody_detect_v1.0_n', 'halfbody_detect_v0.4_s', 'halfbody_detect_v0.3_s', 'halfbody_detect_v0.2_s']
head_models = ['head_detect_v2.0_s', 'head_detect_v2.0_m', 'head_detect_v2.0_n', 'head_detect_v2.0_x', 'head_detect_v2.0_s_yv11', 'head_detect_v2.0_m_yv11', 'head_detect_v2.0_n_yv11', 'head_detect_v2.0_x_yv11', 'head_detect_v2.0_l_yv11']
face_models = ['face_detect_v1.4_s', 'face_detect_v1.4_n', 'face_detect_v1.3_s', 'face_detect_v1.3_n', 'face_detect_v1.2_s', 'face_detect_v1.1_s', 'face_detect_v1.1_n', 'face_detect_v1_s', 'face_detect_v1_n', 'face_detect_v0_s', 'face_detect_v0_n']
ccip_models = ['ccip-caformer-24-randaug-pruned', 'ccip-caformer-6-randaug-pruned_fp32', 'ccip-caformer-5_fp32']
aesthetic_models = ['swinv2pv3_v0_448_ls0.2_x', 'swinv2pv3_v0_448_ls0.2', 'caformer_s36_v0_ls0.2']
with gr.Blocks(css=css) as demo:
gr.Markdown("# Video Processor using dghs-imgutils")
gr.Markdown("Upload one or more videos, or a sequence of images. Videos are processed individually, while multiple images are treated as a single sequence. Each processed source (video or image sequence) is then sequentially analyzed by [dghs-imgutils](https://github.com/deepghs/imgutils) to detect subjects, classify items, and process its content according to your settings, ultimately generating a ZIP file with the extracted images.")
gr.Markdown("**Detection Flow:** " +
"[Person](https://dghs-imgutils.deepghs.org/main/api_doc/detect/person.html) ⇒ " +
"[Half-Body](https://dghs-imgutils.deepghs.org/main/api_doc/detect/halfbody.html) (if no person) ⇒ " +
"[Face](https://dghs-imgutils.deepghs.org/main/api_doc/detect/face.html) (on target) ⇒ " +
"[Head](https://dghs-imgutils.deepghs.org/main/api_doc/detect/head.html) (on target).")
gr.Markdown("**Analysis Flow:** " +
"[CCIP](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/ccip.html) Clustering ⇒ " +
"[LPIPS](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/lpips.html) Clustering ⇒ " +
"[Aesthetic](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/dbaesthetic.html) Labeling.")
gr.Markdown("**Note on CCIP Folders:** CCIP cluster folders are named `{source_prefix}_ccip_XXX`, sorted by image count (most images = `_ccip_000`).")
gr.Markdown("**Note on LPIPS Folders:** LPIPS cluster folders (e.g., `lpips_XXX` or `lpips_sub_XXX`) are also sorted by image count within their scope. 'noise' folders are named explicitly.")
with gr.Row():
with gr.Column(scale=1):
# --- Input Components ---
process_button = gr.Button("Process Input(s) & Generate ZIP(s)", variant="primary")
input_files = gr.Files(label="Upload Videos or Image Sequences", file_types=['video', 'image'], file_count="multiple")
sample_interval_ms = gr.Number(label="Sample Interval (ms, for videos)", value=1000, minimum=1, step=100)
# --- Detection Options ---
gr.Markdown("**Detection Options**")
# --- Person Detection Block ---
with gr.Accordion("Person Detection Options", open=True):
enable_person_detection = gr.Checkbox(label="Enable Person Detection", value=True)
with gr.Group() as person_detection_params_group:
min_target_width_person_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.25, step=0.01,
label="Min Target Width (% of Item Width)",
info="Minimum width for a detected person to be processed (e.g., 0.25 = 25%)."
)
person_model_name_dd = gr.Dropdown(person_models, label="PD Model", value=person_models[0])
person_conf_threshold = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="PD Conf")
person_iou_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="PD IoU")
enable_person_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_person_detection, outputs=person_detection_params_group)
# --- Half-Body Detection Block ---
with gr.Accordion("Half-Body Detection Options", open=True):
enable_halfbody_detection = gr.Checkbox(label="Enable Half-Body Detection", value=True)
with gr.Group() as halfbody_params_group:
gr.Markdown("<small>_Detects half-bodies in full items if Person Detection is off/fails._</small>")
enable_halfbody_cropping = gr.Checkbox(label="Use Half-Bodies as Targets", value=True)
min_target_width_halfbody_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.25, step=0.01,
label="Min Target Width (% of Item Width)",
info="Minimum width for a detected half-body to be processed (e.g., 0.25 = 25%)."
)
halfbody_model_name_dd = gr.Dropdown(halfbody_models, label="HBD Model", value=halfbody_models[0])
halfbody_conf_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="HBD Conf")
halfbody_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HBD IoU")
enable_halfbody_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_halfbody_detection, outputs=halfbody_params_group)
# --- Face Detection Block ---
with gr.Accordion("Face Detection Options", open=True):
enable_face_detection = gr.Checkbox(label="Enable Face Detection", value=True)
with gr.Group() as face_params_group:
enable_face_filtering = gr.Checkbox(label="Filter Targets Without Detected Faces", value=True)
enable_face_cropping = gr.Checkbox(label="Crop Detected Faces", value=False)
min_crop_width_face_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.2, step=0.01,
label="Min Crop Width (% of Parent Width)",
info="Minimum width for a face crop relative to its parent image's width (e.g., 0.2 = 20%)."
)
face_model_name_dd = gr.Dropdown(face_models, label="FD Model", value=face_models[0])
face_conf_threshold = gr.Slider(0.0, 1.0, value=0.25, step=0.05, label="FD Conf")
face_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="FD IoU")
enable_face_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_face_detection, outputs=face_params_group)
# --- Head Detection Block ---
with gr.Accordion("Head Detection Options", open=True):
enable_head_detection = gr.Checkbox(label="Enable Head Detection", value=True)
with gr.Group() as head_params_group:
gr.Markdown("<small>_Detects heads in targets. Crops if meets width req._</small>")
enable_head_filtering = gr.Checkbox(label="Filter Targets Without Heads", value=True)
enable_head_cropping = gr.Checkbox(label="Crop Detected Heads", value=False)
min_crop_width_head_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.2, step=0.01,
label="Min Crop Width (% of Parent Width)",
info="Minimum width for a head crop relative to its parent image's width (e.g., 0.2 = 20%)."
)
head_model_name_dd = gr.Dropdown(head_models, label="HD Model", value=head_models[0])
head_conf_threshold = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="HD Conf")
head_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HD IoU")
enable_head_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_head_detection, outputs=head_params_group)
# --- Analysis/Classification Options ---
gr.Markdown("**Analysis & Classification**")
# --- CCIP Classification Block ---
with gr.Accordion("CCIP Classification Options", open=True):
enable_ccip_classification = gr.Checkbox(label="Enable CCIP Classification", value=True)
with gr.Group() as ccip_params_group:
gr.Markdown("<small>_Clusters results by similarity. Folders sorted by image count._</small>")
ccip_model_name_dd = gr.Dropdown(ccip_models, label="CCIP Model", value=ccip_models[0])
ccip_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.20, label="CCIP Similarity Threshold")
enable_ccip_classification.change(fn=lambda e: gr.update(visible=e), inputs=enable_ccip_classification, outputs=ccip_params_group)
# LPIPS Clustering Options
with gr.Accordion("LPIPS Clustering Options", open=True):
enable_lpips_clustering = gr.Checkbox(label="Enable LPIPS Clustering", value=True)
with gr.Group() as lpips_params_group:
gr.Markdown("<small>_Clusters images by LPIPS similarity. Applied after CCIP (if enabled) or globally. Folders sorted by image count._</small>")
lpips_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.45, label="LPIPS Similarity Threshold")
enable_lpips_clustering.change(fn=lambda e: gr.update(visible=e), inputs=enable_lpips_clustering, outputs=lpips_params_group)
# --- Aesthetic Analysis Block ---
with gr.Accordion("Aesthetic Analysis Options", open=True):
enable_aesthetic_analysis = gr.Checkbox(label="Enable Aesthetic Analysis (Anime)", value=True)
with gr.Group() as aesthetic_params_group:
gr.Markdown("<small>_Prepends aesthetic label to filenames._</small>")
aesthetic_model_name_dd = gr.Dropdown(aesthetic_models, label="Aesthetic Model", value=aesthetic_models[0])
enable_aesthetic_analysis.change(fn=lambda e: gr.update(visible=e), inputs=enable_aesthetic_analysis, outputs=aesthetic_params_group)
gr.Markdown("---")
gr.Markdown("**Warning:** Complex combinations can be slow. Models downloaded on first use.", elem_id="warning")
with gr.Column(scale=1):
# --- Output Components ---
status_text = gr.Textbox(label="Processing Status", interactive=False, lines=20, elem_id="status_box")
output_zips = gr.Files(label="Download Processed Images (ZIPs)")
# Connect button click
process_button.click(
fn=process_inputs_main,
inputs=[
input_files, sample_interval_ms,
# Person Detect
enable_person_detection, min_target_width_person_percentage_slider,
person_model_name_dd, person_conf_threshold, person_iou_threshold,
# HalfBody Detect
enable_halfbody_detection, enable_halfbody_cropping, min_target_width_halfbody_percentage_slider,
halfbody_model_name_dd, halfbody_conf_threshold, halfbody_iou_threshold,
# Head Detect
enable_head_detection, enable_head_cropping, min_crop_width_head_percentage_slider,
enable_head_filtering, head_model_name_dd, head_conf_threshold, head_iou_threshold,
# Face Detect
enable_face_detection, enable_face_cropping, min_crop_width_face_percentage_slider,
enable_face_filtering, face_model_name_dd, face_conf_threshold, face_iou_threshold,
# CCIP
enable_ccip_classification, ccip_model_name_dd, ccip_threshold_slider,
# LPIPS
enable_lpips_clustering, lpips_threshold_slider,
# Aesthetic
enable_aesthetic_analysis, aesthetic_model_name_dd,
],
outputs=[output_zips, status_text]
)
# --- Launch Script ---
if __name__ == "__main__":
print("Starting Gradio App...")
# Model pre-check
try:
print("Checking/Downloading models (this might take a moment)...")
# Use simple, small images for checks
dummy_img_pil = Image.new('RGB', (64, 64), color = 'orange')
print(" - Person detection...")
_ = person_detector.detect_person(dummy_img_pil, model_name=person_models[0])
print(" - HalfBody detection...")
_ = halfbody_detector.detect_halfbody(dummy_img_pil, model_name=halfbody_models[0])
print(" - Head detection...")
_ = head_detector.detect_heads(dummy_img_pil, model_name=head_models[0])
print(" - Face detection...")
_ = face_detector.detect_faces(dummy_img_pil, model_name=face_models[0])
print(" - CCIP feature extraction...")
_ = ccip_analyzer.ccip_extract_feature(dummy_img_pil, size=384, model=ccip_models[0])
print(" - LPIPS feature extraction...")
_ = lpips_module.lpips_extract_feature(dummy_img_pil)
print(" - Aesthetic analysis...")
_ = dbaesthetic_analyzer.anime_dbaesthetic(dummy_img_pil, model_name=aesthetic_models[0])
print("Models seem ready or downloaded.")
del dummy_img_pil
gc.collect()
except Exception as model_err:
print(f"\n--- !!! WARNING !!! ---")
print(f"Could not pre-check/download all models: {model_err}")
print(f"Models will be downloaded when first used by the application, which may cause a delay on the first run.")
print(f"Check your internet connection and library installation (pip install \"dghs-imgutils[gpu]\").")
print(f"-----------------------\n")
# Launch the app
demo.launch(inbrowser=True)