Spaces:
Running
Running
File size: 84,858 Bytes
5262ee3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 |
import io
import os
import gc
import re
import cv2
import time
import zipfile
import tempfile
import traceback
import numpy as np
import gradio as gr
import imgutils.detect.person as person_detector
import imgutils.detect.halfbody as halfbody_detector
import imgutils.detect.head as head_detector
import imgutils.detect.face as face_detector
import imgutils.metrics.ccip as ccip_analyzer
import imgutils.metrics.dbaesthetic as dbaesthetic_analyzer
import imgutils.metrics.lpips as lpips_module
from PIL import Image
from typing import List, Tuple, Dict, Any, Union, Optional, Iterator
# --- Constants for File Types ---
IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.tif', '.gif')
VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.webm', '.mpeg', '.mpg')
# --- Helper Functions ---
def sanitize_filename(filename: str, max_len: int = 50) -> str:
"""Removes invalid characters and shortens a filename for safe use."""
# Remove path components
base_name = os.path.basename(filename)
# Remove extension
name_part, _ = os.path.splitext(base_name)
# Replace spaces and problematic characters with underscores
sanitized = re.sub(r'[\\/*?:"<>|\s]+', '_', name_part)
# Remove leading/trailing underscores/periods
sanitized = sanitized.strip('._')
# Limit length (important for temp paths and OS limits)
sanitized = sanitized[:max_len]
# Ensure it's not empty after sanitization
if not sanitized:
return "file"
return sanitized
def convert_to_pil(frame: np.ndarray) -> Image.Image:
"""Converts an OpenCV frame (BGR) to a PIL Image (RGB)."""
# Add error handling for potentially empty frames
if frame is None or frame.size == 0:
raise ValueError("Cannot convert empty frame to PIL Image")
try:
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
except Exception as e:
# Re-raise with more context if conversion fails
raise RuntimeError(f"Failed to convert frame to PIL Image: {e}")
def image_to_bytes(img: Image.Image, format: str = 'PNG') -> bytes:
"""Converts a PIL Image to bytes."""
if img is None:
raise ValueError("Cannot convert None image to bytes")
byte_arr = io.BytesIO()
img.save(byte_arr, format=format)
return byte_arr.getvalue()
def create_zip_file(image_data: Dict[str, bytes], output_path: str) -> None:
"""
Creates a zip file containing the provided images directly at the output_path.
Args:
image_data: A dictionary where keys are filenames (including paths within zip)
and values are image bytes.
output_path: The full path where the zip file should be created.
"""
if not image_data:
raise ValueError("No image data provided to create zip file.")
if not output_path:
raise ValueError("No output path provided for the zip file.")
print(f"Creating zip file at: {output_path}")
try:
# Ensure parent directory exists (useful if output_path is nested)
# Though NamedTemporaryFile usually handles this for its own path.
parent_dir = os.path.dirname(output_path)
if parent_dir: # Check if there is a parent directory component
os.makedirs(parent_dir, exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Sort items for potentially better organization and predictability
for filename, img_bytes in sorted(image_data.items()):
zipf.writestr(filename, img_bytes)
print(f"Successfully created zip file with {len(image_data)} items at {output_path}.")
# No return value needed as we are writing to a path
except Exception as e:
print(f"Error creating zip file at {output_path}: {e}")
# If zip creation fails, attempt to remove the partially created file
if os.path.exists(output_path):
try:
os.remove(output_path)
print(f"Removed partially created/failed zip file: {output_path}")
except OSError as remove_err:
print(f"Warning: Could not remove failed zip file {output_path}: {remove_err}")
raise # Re-raise the original exception
def generate_filename(
base_name: str, # Should be the core identifier, e.g., "frame_X_person_Y_scoreZ"
aesthetic_label: Optional[str] = None,
ccip_cluster_id_for_lpips_logic: Optional[int] = None, # Original CCIP ID, used to decide if LPIPS is sub-cluster
ccip_folder_naming_index: Optional[int] = None, # The new 000, 001, ... index based on image count
source_prefix_for_ccip_folder: Optional[str] = None, # The source filename prefix for CCIP folder
lpips_folder_naming_index: Optional[Union[int, str]] = None, # New: Can be int (0,1,2...) or "noise"
file_extension: str = '.png',
# Suffix flags for this specific image:
is_halfbody_primary_target_type: bool = False, # If this image itself was a halfbody primary target
is_derived_head_crop: bool = False,
is_derived_face_crop: bool = False,
) -> str:
"""
Generates the final filename, incorporating aesthetic label, cluster directory,
and crop indicators. CCIP and LPIPS folder names are sorted by image count.
"""
filename_stem = base_name
# Add suffixes for derived crops.
# For halfbody primary targets, the base_name should already contain "halfbody".
# This flag is more for potentially adding a suffix if desired, but currently not used to add a suffix.
# if is_halfbody_primary_target_type:
# filename_stem += "_halfbody" # Potentially redundant if base_name good.
if is_derived_head_crop:
filename_stem += "_headCrop"
if is_derived_face_crop:
filename_stem += "_faceCrop"
filename_with_extension = filename_stem + file_extension
path_parts = []
# New CCIP folder naming based on source prefix and sorted index
if ccip_folder_naming_index is not None and source_prefix_for_ccip_folder is not None:
path_parts.append(f"{source_prefix_for_ccip_folder}_ccip_{ccip_folder_naming_index:03d}")
# LPIPS folder naming based on the new sorted index or "noise"
if lpips_folder_naming_index is not None:
lpips_folder_name_part_str: Optional[str] = None
if isinstance(lpips_folder_naming_index, str) and lpips_folder_naming_index == "noise":
lpips_folder_name_part_str = "noise"
elif isinstance(lpips_folder_naming_index, int):
lpips_folder_name_part_str = f"{lpips_folder_naming_index:03d}"
if lpips_folder_name_part_str is not None:
# Determine prefix based on whether the item was originally in a CCIP cluster
if ccip_cluster_id_for_lpips_logic is not None: # LPIPS is sub-cluster if item had an original CCIP ID
lpips_folder_name_base = "lpips_sub_"
else: # No CCIP, LPIPS is primary
lpips_folder_name_base = "lpips_"
path_parts.append(f"{lpips_folder_name_base}{lpips_folder_name_part_str}")
final_filename_part = filename_with_extension
if aesthetic_label:
final_filename_part = f"{aesthetic_label}_{filename_with_extension}"
if path_parts:
return f"{'/'.join(path_parts)}/{final_filename_part}"
else:
return final_filename_part
# --- Core Processing Function for a single source (video or image sequence) ---
def _process_input_source_frames(
source_file_prefix: str, # Sanitized name for this source (e.g., "myvideo" or "ImageGroup123")
# Iterator yielding: (PIL.Image, frame_identifier_string, current_item_index, total_items_for_desc)
# For videos, current_item_index is the 1-based raw frame number.
# For images, current_item_index is the 1-based image number in the sequence.
frames_provider: Iterator[Tuple[Image.Image, int, int, int]],
is_video_source: bool, # To adjust some logging/stats messages
# Person Detection
enable_person_detection: bool,
min_target_width_person_percentage: float,
person_model_name: str,
person_conf_threshold: float,
person_iou_threshold: float,
# Half-Body Detection
enable_halfbody_detection: bool,
enable_halfbody_cropping: bool,
min_target_width_halfbody_percentage: float,
halfbody_model_name: str,
halfbody_conf_threshold: float,
halfbody_iou_threshold: float,
# Head Detection
enable_head_detection: bool,
enable_head_cropping: bool,
min_crop_width_head_percentage: float,
enable_head_filtering: bool,
head_model_name: str,
head_conf_threshold: float,
head_iou_threshold: float,
# Face Detection
enable_face_detection: bool,
enable_face_cropping: bool,
min_crop_width_face_percentage: float,
enable_face_filtering: bool,
face_model_name: str,
face_conf_threshold: float,
face_iou_threshold: float,
# CCIP Classification
enable_ccip_classification: bool,
ccip_model_name: str,
ccip_threshold: float,
# LPIPS Clustering
enable_lpips_clustering: bool,
lpips_threshold: float,
# Aesthetic Analysis
enable_aesthetic_analysis: bool,
aesthetic_model_name: str,
# Gradio Progress (specific to this source's processing)
progress_updater # Function: (progress_value: float, desc: str) -> None
) -> Tuple[str | None, str]:
"""
Processes frames from a given source (video or image sequence) according to the specified parameters.
Order: Person => Half-Body (alternative) => Face Detection => Head Detection => CCIP => Aesthetic.
Returns:
A tuple containing:
- Path to the output zip file (or None if error).
- Status message string.
"""
# This list will hold data for images that pass all filters, BEFORE LPIPS and final zipping
images_pending_final_processing: List[Dict[str, Any]] = []
# CCIP specific data
ccip_clusters_info: List[Tuple[int, np.ndarray]] = []
next_ccip_cluster_id = 0
# Stats
processed_items_count = 0
total_persons_detected_raw, total_halfbodies_detected_raw = 0, 0
person_targets_processed_count, halfbody_targets_processed_count, fullframe_targets_processed_count = 0, 0, 0
total_faces_detected_on_targets, total_heads_detected_on_targets = 0, 0
# These count items added to images_pending_final_processing
main_targets_pending_count, face_crops_pending_count, head_crops_pending_count = 0, 0, 0
items_filtered_by_face_count, items_filtered_by_head_count = 0, 0
ccip_applied_count, aesthetic_applied_count = 0, 0
# LPIPS stats
lpips_images_subject_to_clustering, total_lpips_clusters_created, total_lpips_noise_samples = 0, 0, 0
gc_interval = 100 # items from provider
start_time = time.time()
# Progress update for initializing this specific video
progress_updater(0, desc=f"Initializing {source_file_prefix}...")
output_zip_path_temp = None
output_zip_path_final = None
try:
# --- Main Loop for processing items from the frames_provider ---
for pil_image_full_frame, frame_specific_index, current_item_index, total_items_for_desc in frames_provider:
progress_value_for_updater = (current_item_index) / total_items_for_desc if total_items_for_desc > 0 else 1.0
# The description string should reflect what current_item_index means
item_description = ""
if is_video_source:
# For video, total_items_in_source_for_description is total raw frames.
# current_item_index is the raw frame index of the *sampled* frame.
# We also need a counter for *sampled* frames for a "processed X of Y (sampled)" message.
# processed_items_count counts sampled frames.
item_description = f"Scanning frame {current_item_index}/{total_items_for_desc} (processed {processed_items_count + 1} sampled)"
else: # For images
item_description = f"image {current_item_index}/{total_items_for_desc}"
progress_updater(
min(progress_value_for_updater, 1.0), # Cap progress at 1.0
desc=f"Processing {item_description} for {source_file_prefix}"
)
# processed_items_count still counts how many items are yielded by the provider
# (i.e., how many sampled frames for video, or how many images for image sequence)
processed_items_count += 1
try:
full_frame_width = pil_image_full_frame.width # Store for percentage calculations
print(f"--- Processing item ID {frame_specific_index} (Width: {full_frame_width}px) for {source_file_prefix} ---")
# List to hold PIL images that are the primary subjects for this frame
# Each element: {'pil': Image, 'base_name': str, 'source_type': 'person'/'halfbody'/'fullframe'}
primary_targets_for_frame: List[Dict[str, Any]] = []
processed_primary_source_this_frame = False # Flag if Person or HalfBody yielded targets
# --- 1. Person Detection ---
if enable_person_detection and full_frame_width > 0:
print(" Attempting Person Detection...")
min_person_target_px_width = full_frame_width * min_target_width_person_percentage
person_detections = person_detector.detect_person(
pil_image_full_frame, model_name=person_model_name,
conf_threshold=person_conf_threshold, iou_threshold=person_iou_threshold
)
total_persons_detected_raw += len(person_detections)
if person_detections:
print(f" Detected {len(person_detections)} raw persons.")
valid_person_targets = 0
for i, (bbox, _, score) in enumerate(person_detections):
# Check width before full crop for minor optimization
detected_person_width = bbox[2] - bbox[0]
if detected_person_width >= min_person_target_px_width:
primary_targets_for_frame.append({
'pil': pil_image_full_frame.crop(bbox),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_person_{i}_score{int(score*100)}",
'source_type': 'person'})
person_targets_processed_count +=1
valid_person_targets +=1
else:
print(f" Person {i} width {detected_person_width}px < min {min_person_target_px_width:.0f}px. Skipping.")
if valid_person_targets > 0:
processed_primary_source_this_frame = True
print(f" Added {valid_person_targets} persons as primary targets.")
# --- 2. Half-Body Detection (if Person not processed and HBD enabled) ---
if not processed_primary_source_this_frame and enable_halfbody_detection and full_frame_width > 0:
print(" Attempting Half-Body Detection (on full item)...")
min_halfbody_target_px_width = full_frame_width * min_target_width_halfbody_percentage
halfbody_detections = halfbody_detector.detect_halfbody(
pil_image_full_frame, model_name=halfbody_model_name,
conf_threshold=halfbody_conf_threshold, iou_threshold=halfbody_iou_threshold
)
total_halfbodies_detected_raw += len(halfbody_detections)
if halfbody_detections:
print(f" Detected {len(halfbody_detections)} raw half-bodies.")
valid_halfbody_targets = 0
for i, (bbox, _, score) in enumerate(halfbody_detections):
detected_hb_width = bbox[2] - bbox[0]
# Cropping must be enabled and width must be sufficient for it to be a target
if enable_halfbody_cropping and detected_hb_width >= min_halfbody_target_px_width:
primary_targets_for_frame.append({
'pil': pil_image_full_frame.crop(bbox),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_halfbody_{i}_score{int(score*100)}",
'source_type': 'halfbody'})
halfbody_targets_processed_count +=1
valid_halfbody_targets +=1
elif enable_halfbody_cropping:
print(f" Half-body {i} width {detected_hb_width}px < min {min_halfbody_target_px_width:.0f}px. Skipping.")
if valid_halfbody_targets > 0:
processed_primary_source_this_frame = True
print(f" Added {valid_halfbody_targets} half-bodies as primary targets.")
# --- 3. Full Frame/Image (fallback) ---
if not processed_primary_source_this_frame:
print(" Processing Full Item as primary target.")
primary_targets_for_frame.append({
'pil': pil_image_full_frame.copy(),
'base_name': f"{source_file_prefix}_item_{frame_specific_index}_full",
'source_type': 'fullframe'})
fullframe_targets_processed_count += 1
# --- Process each identified primary_target_for_frame ---
for target_data in primary_targets_for_frame:
current_pil: Image.Image = target_data['pil']
current_base_name: str = target_data['base_name'] # Base name for this main target
current_source_type: str = target_data['source_type']
current_pil_width = current_pil.width # For sub-crop percentage calculations
print(f" Processing target: {current_base_name} (type: {current_source_type}, width: {current_pil_width}px)")
# Store PILs of successful crops from current_pil for this target
keep_this_target = True
item_area = current_pil_width * current_pil.height
potential_face_crops_pil: List[Image.Image] = []
potential_head_crops_pil: List[Image.Image] = []
# --- A. Face Detection ---
if keep_this_target and enable_face_detection and current_pil_width > 0:
print(f" Detecting faces in {current_base_name}...")
min_face_crop_px_width = current_pil_width * min_crop_width_face_percentage
face_detections = face_detector.detect_faces(
current_pil, model_name=face_model_name,
conf_threshold=face_conf_threshold, iou_threshold=face_iou_threshold
)
total_faces_detected_on_targets += len(face_detections)
if not face_detections and enable_face_filtering:
keep_this_target = False
items_filtered_by_face_count += 1
print(f" FILTERING TARGET {current_base_name} (no face).")
elif face_detections and enable_face_cropping:
for f_idx, (f_bbox, _, _) in enumerate(face_detections):
if (f_bbox[2]-f_bbox[0]) >= min_face_crop_px_width:
potential_face_crops_pil.append(current_pil.crop(f_bbox))
else:
print(f" Face {f_idx} too small. Skipping crop.")
# --- B. Head Detection ---
if keep_this_target and enable_head_detection and current_pil_width > 0:
print(f" Detecting heads in {current_base_name}...")
min_head_crop_px_width = current_pil_width * min_crop_width_head_percentage
head_detections = head_detector.detect_heads(
current_pil, model_name=head_model_name,
conf_threshold=head_conf_threshold, iou_threshold=head_iou_threshold
)
total_heads_detected_on_targets += len(head_detections)
if not head_detections and enable_head_filtering:
keep_this_target = False
items_filtered_by_head_count += 1
print(f" FILTERING TARGET {current_base_name} (no head).")
potential_face_crops_pil.clear() # Clear faces if head filter removed target
elif head_detections and enable_head_cropping:
for h_idx, (h_bbox, _, _) in enumerate(head_detections):
h_w = h_bbox[2]-h_bbox[0] # h_h = h_bbox[3]-h_bbox[1]
if h_w >= min_head_crop_px_width and item_area > 0:
potential_head_crops_pil.append(current_pil.crop(h_bbox))
else:
print(f" Head {h_idx} too small or too large relative to parent. Skipping crop.")
# --- If target is filtered, clean up and skip to next target ---
if not keep_this_target:
print(f" Target {current_base_name} was filtered by face/head presence rules. Discarding it and its potential crops.")
if current_pil is not None:
del current_pil
potential_face_crops_pil.clear()
potential_head_crops_pil.clear()
continue # To the next primary_target_for_frame
# --- C. CCIP Classification (on current_pil, if it's kept) ---
assigned_ccip_id = None # This is the original CCIP ID
if enable_ccip_classification:
print(f" Classifying {current_base_name} with CCIP...")
try:
feature = ccip_analyzer.ccip_extract_feature(current_pil, model=ccip_model_name)
best_match_cid = None
min_diff = float('inf')
# Find the best potential match among existing clusters
if ccip_clusters_info: # Only loop if there are clusters to compare against
for cid, rep_f in ccip_clusters_info:
diff = ccip_analyzer.ccip_difference(feature, rep_f, model=ccip_model_name)
if diff < min_diff:
min_diff = diff
best_match_cid = cid
# Decide whether to use the best match or create a new cluster
if best_match_cid is not None and min_diff < ccip_threshold:
assigned_ccip_id = best_match_cid
print(f" -> Matched Cluster {assigned_ccip_id} (Diff: {min_diff:.6f} <= Threshold {ccip_threshold:.3f})")
else:
# No suitable match found (either no clusters existed, or the best match's diff was strictly greater than threshold)
# Create a new cluster
assigned_ccip_id = next_ccip_cluster_id
ccip_clusters_info.append((assigned_ccip_id, feature))
if not ccip_clusters_info or len(ccip_clusters_info) == 1:
print(f" -> New Cluster {assigned_ccip_id} (First item or no prior suitable clusters)")
else:
# MODIFIED: Log message reflecting that new cluster is formed if diff > threshold
print(f" -> New Cluster {assigned_ccip_id} (Min diff to others: {min_diff:.6f} > Threshold {ccip_threshold:.3f})")
next_ccip_cluster_id += 1
print(f" CCIP: Target {current_base_name} -> Original Cluster ID {assigned_ccip_id}")
del feature
ccip_applied_count += 1
except Exception as e_ccip:
print(f" Error CCIP: {e_ccip}")
# --- D. Aesthetic Analysis (on current_pil, if it's kept) ---
item_aesthetic_label = None
if enable_aesthetic_analysis:
print(f" Analyzing {current_base_name} for aesthetics...")
try:
res = dbaesthetic_analyzer.anime_dbaesthetic(current_pil, model_name=aesthetic_model_name)
if isinstance(res, tuple) and len(res) >= 1:
item_aesthetic_label = res[0]
print(f" Aesthetic: Target {current_base_name} -> {item_aesthetic_label}")
aesthetic_applied_count += 1
except Exception as e_aes:
print(f" Error Aesthetic: {e_aes}")
add_current_pil_to_pending_list = True
if current_source_type == 'fullframe':
can_skip_fullframe_target = False
if enable_face_detection or enable_head_detection:
found_valid_sub_crop_from_enabled_detector = False
if enable_face_detection and len(potential_face_crops_pil) > 0:
found_valid_sub_crop_from_enabled_detector = True
if not found_valid_sub_crop_from_enabled_detector and \
enable_head_detection and len(potential_head_crops_pil) > 0:
found_valid_sub_crop_from_enabled_detector = True
if not found_valid_sub_crop_from_enabled_detector: # No valid crops from any enabled sub-detector
can_skip_fullframe_target = True # All enabled sub-detectors failed
if can_skip_fullframe_target:
add_current_pil_to_pending_list = False
print(f" Skipping save of fullframe target '{current_base_name}' because all enabled sub-detectors (Face/Head) yielded no valid-width crops.")
if add_current_pil_to_pending_list:
# --- E. Save current_pil (if it passed all filters) ---
# Add main target to pending list
images_pending_final_processing.append({
'pil_image': current_pil.copy(), 'base_name_for_filename': current_base_name,
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': (current_source_type == 'halfbody'),
'is_derived_head_crop': False, 'is_derived_face_crop': False,
'lpips_cluster_id': None, # Will be filled by LPIPS clustering
'lpips_folder_naming_index': None # Will be filled by LPIPS renaming
})
main_targets_pending_count +=1
# --- F. Save Face Crops (derived from current_pil) ---
for i, fc_pil in enumerate(potential_face_crops_pil):
images_pending_final_processing.append({
'pil_image': fc_pil, 'base_name_for_filename': f"{current_base_name}_face{i}",
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': False,
'is_derived_head_crop': False, 'is_derived_face_crop': True,
'lpips_cluster_id': None,
'lpips_folder_naming_index': None
})
face_crops_pending_count+=1
potential_face_crops_pil.clear()
# --- G. Save Head Crops (derived from current_pil) ---
for i, hc_pil in enumerate(potential_head_crops_pil):
images_pending_final_processing.append({
'pil_image': hc_pil, 'base_name_for_filename': f"{current_base_name}_head{i}",
'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label,
'is_halfbody_primary_target_type': False,
'is_derived_head_crop': True, 'is_derived_face_crop': False,
'lpips_cluster_id': None,
'lpips_folder_naming_index': None
})
head_crops_pending_count+=1
potential_head_crops_pil.clear()
if current_pil is not None: # Ensure current_pil exists before attempting to delete
del current_pil # Clean up the PIL for this target_data
primary_targets_for_frame.clear()
except Exception as item_proc_err:
print(f"!! Major Error processing item ID {frame_specific_index} for {source_file_prefix}: {item_proc_err}")
traceback.print_exc()
# Cleanup local vars for this item if error
if 'primary_targets_for_frame' in locals():
primary_targets_for_frame.clear()
# Also ensure current_pil from inner loop is cleaned up if error happened mid-loop
if 'current_pil' in locals() and current_pil is not None:
del current_pil
if processed_items_count % gc_interval == 0:
gc.collect()
print(f" [GC triggered at {processed_items_count} items for {source_file_prefix}]")
# --- End of Main Item Processing Loop ---
print(f"\nRunning final GC before LPIPS/Zipping for {source_file_prefix}...")
gc.collect()
if not images_pending_final_processing:
status_message = f"Processing for {source_file_prefix} finished, but no images were generated or passed filters for LPIPS/Zipping."
print(status_message)
return None, status_message
# --- LPIPS Clustering Stage ---
print(f"\n--- LPIPS Clustering Stage for {source_file_prefix} (Images pending: {len(images_pending_final_processing)}) ---")
if enable_lpips_clustering:
print(f" LPIPS Clustering enabled with threshold: {lpips_threshold}")
lpips_images_subject_to_clustering = len(images_pending_final_processing)
if enable_ccip_classification and next_ccip_cluster_id > 0: # CCIP was used
print(" LPIPS clustering within CCIP clusters.")
images_by_ccip: Dict[Optional[int], List[int]] = {} # ccip_id -> list of original indices
for i, item_data in enumerate(images_pending_final_processing):
ccip_id = item_data['ccip_cluster_id'] # Original CCIP ID
if ccip_id not in images_by_ccip:
images_by_ccip[ccip_id] = []
images_by_ccip[ccip_id].append(i)
for ccip_id, indices_in_ccip_cluster in images_by_ccip.items():
pils_for_lpips_sub_cluster = [images_pending_final_processing[idx]['pil_image'] for idx in indices_in_ccip_cluster]
if len(pils_for_lpips_sub_cluster) > 1:
print(f" Clustering {len(pils_for_lpips_sub_cluster)} images in CCIP cluster {ccip_id}...")
try:
lpips_sub_ids = lpips_module.lpips_clustering(pils_for_lpips_sub_cluster, threshold=lpips_threshold)
for i_sub, lpips_id in enumerate(lpips_sub_ids):
original_idx = indices_in_ccip_cluster[i_sub]
images_pending_final_processing[original_idx]['lpips_cluster_id'] = lpips_id
except Exception as e_lpips_sub:
print(f" Error LPIPS sub-cluster CCIP {ccip_id}: {e_lpips_sub}")
elif len(pils_for_lpips_sub_cluster) == 1:
images_pending_final_processing[indices_in_ccip_cluster[0]]['lpips_cluster_id'] = 0 # type: ignore
del images_by_ccip
if 'pils_for_lpips_sub_cluster' in locals():
del pils_for_lpips_sub_cluster # Ensure cleanup
else: # LPIPS on all images globally
print(" LPIPS clustering on all collected images.")
all_pils_for_global_lpips = [item['pil_image'] for item in images_pending_final_processing]
if len(all_pils_for_global_lpips) > 1:
try:
lpips_global_ids = lpips_module.lpips_clustering(all_pils_for_global_lpips, threshold=lpips_threshold)
for i, lpips_id in enumerate(lpips_global_ids):
images_pending_final_processing[i]['lpips_cluster_id'] = lpips_id
except Exception as e_lpips_global:
print(f" Error LPIPS global: {e_lpips_global}")
elif len(all_pils_for_global_lpips) == 1:
images_pending_final_processing[0]['lpips_cluster_id'] = 0 # type: ignore
del all_pils_for_global_lpips
# Calculate LPIPS stats
all_final_lpips_ids = [item.get('lpips_cluster_id') for item in images_pending_final_processing if item.get('lpips_cluster_id') is not None]
if all_final_lpips_ids:
unique_lpips_clusters = set(filter(lambda x: x != -1, all_final_lpips_ids))
total_lpips_clusters_created = len(unique_lpips_clusters)
total_lpips_noise_samples = sum(1 for x in all_final_lpips_ids if x == -1)
else:
print(" LPIPS Clustering disabled.")
# --- CCIP Folder Renaming Logic ---
original_ccip_id_to_new_naming_index: Dict[int, int] = {}
if enable_ccip_classification:
print(f" Preparing CCIP folder renaming for {source_file_prefix}...")
ccip_image_counts: Dict[int, int] = {} # original_ccip_id -> count of images in it
for item_data_for_count in images_pending_final_processing:
original_ccip_id_val = item_data_for_count.get('ccip_cluster_id')
if original_ccip_id_val is not None:
ccip_image_counts[original_ccip_id_val] = ccip_image_counts.get(original_ccip_id_val, 0) + 1
if ccip_image_counts:
# Sort original ccip_ids by their counts in descending order
sorted_ccip_groups_by_count: List[Tuple[int, int]] = sorted(
ccip_image_counts.items(),
key=lambda item: item[1], # Sort by count
reverse=True
)
for new_idx, (original_id, count) in enumerate(sorted_ccip_groups_by_count):
original_ccip_id_to_new_naming_index[original_id] = new_idx
print(f" CCIP Remap for {source_file_prefix}: Original ID {original_id} (count: {count}) -> New Naming Index {new_idx:03d}")
else:
print(f" No CCIP-assigned images found for {source_file_prefix} to perform renaming.")
# --- LPIPS Folder Renaming Logic ---
if enable_lpips_clustering:
print(f" Preparing LPIPS folder renaming for {source_file_prefix}...")
# Initialize/Reset lpips_folder_naming_index for all items
for item_data in images_pending_final_processing:
item_data['lpips_folder_naming_index'] = None
if enable_ccip_classification and next_ccip_cluster_id > 0: # LPIPS within CCIP
print(f" LPIPS renaming within CCIP clusters for {source_file_prefix}.")
items_grouped_by_original_ccip: Dict[Optional[int], List[Dict[str, Any]]] = {}
for item_data in images_pending_final_processing:
original_ccip_id = item_data.get('ccip_cluster_id')
if original_ccip_id not in items_grouped_by_original_ccip: items_grouped_by_original_ccip[original_ccip_id] = []
items_grouped_by_original_ccip[original_ccip_id].append(item_data)
for original_ccip_id, items_in_ccip in items_grouped_by_original_ccip.items():
lpips_counts_in_ccip: Dict[int, int] = {} # original_lpips_id (non-noise) -> count
for item_data in items_in_ccip:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None and lpips_id != -1:
lpips_counts_in_ccip[lpips_id] = lpips_counts_in_ccip.get(lpips_id, 0) + 1
lpips_id_to_naming_in_ccip: Dict[int, Union[int, str]] = {}
if lpips_counts_in_ccip:
sorted_lpips = sorted(lpips_counts_in_ccip.items(), key=lambda x: x[1], reverse=True)
for new_idx, (lpips_id, count) in enumerate(sorted_lpips):
lpips_id_to_naming_in_ccip[lpips_id] = new_idx
ccip_disp = f"OrigCCIP-{original_ccip_id}" if original_ccip_id is not None else "NoCCIP"
print(f" LPIPS Remap in {ccip_disp}: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}")
for item_data in items_in_ccip:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None:
if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise"
elif lpips_id in lpips_id_to_naming_in_ccip:
item_data['lpips_folder_naming_index'] = lpips_id_to_naming_in_ccip[lpips_id]
del items_grouped_by_original_ccip
else: # Global LPIPS
print(f" Global LPIPS renaming for {source_file_prefix}.")
global_lpips_counts: Dict[int, int] = {}
for item_data in images_pending_final_processing:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None and lpips_id != -1:
global_lpips_counts[lpips_id] = global_lpips_counts.get(lpips_id, 0) + 1
global_lpips_id_to_naming: Dict[int, Union[int, str]] = {}
if global_lpips_counts:
sorted_global_lpips = sorted(global_lpips_counts.items(), key=lambda x: x[1], reverse=True)
for new_idx, (lpips_id, count) in enumerate(sorted_global_lpips):
global_lpips_id_to_naming[lpips_id] = new_idx
print(f" Global LPIPS Remap: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}")
for item_data in images_pending_final_processing:
lpips_id = item_data.get('lpips_cluster_id')
if lpips_id is not None:
if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise"
elif lpips_id in global_lpips_id_to_naming:
item_data['lpips_folder_naming_index'] = global_lpips_id_to_naming[lpips_id]
gc.collect()
# --- Final Zipping Stage ---
images_to_zip: Dict[str, bytes] = {}
print(f"\n--- Final Zipping Stage for {source_file_prefix} ({len(images_pending_final_processing)} items) ---")
for item_data in images_pending_final_processing:
original_ccip_id_for_item = item_data.get('ccip_cluster_id')
current_ccip_naming_idx_for_folder: Optional[int] = None
if enable_ccip_classification and original_ccip_id_for_item is not None and \
original_ccip_id_for_item in original_ccip_id_to_new_naming_index:
current_ccip_naming_idx_for_folder = original_ccip_id_to_new_naming_index[original_ccip_id_for_item]
current_lpips_naming_idx_for_folder = item_data.get('lpips_folder_naming_index')
final_filename = generate_filename(
base_name=item_data['base_name_for_filename'],
aesthetic_label=item_data.get('aesthetic_label'),
ccip_cluster_id_for_lpips_logic=original_ccip_id_for_item,
ccip_folder_naming_index=current_ccip_naming_idx_for_folder,
source_prefix_for_ccip_folder=source_file_prefix if current_ccip_naming_idx_for_folder is not None else None,
lpips_folder_naming_index=current_lpips_naming_idx_for_folder,
is_halfbody_primary_target_type=item_data['is_halfbody_primary_target_type'],
is_derived_head_crop=item_data['is_derived_head_crop'],
is_derived_face_crop=item_data['is_derived_face_crop']
)
try:
images_to_zip[final_filename] = image_to_bytes(item_data['pil_image'])
except Exception as e_bytes:
print(f" Error converting/adding {final_filename} to zip: {e_bytes}")
finally:
if 'pil_image' in item_data and item_data['pil_image'] is not None:
del item_data['pil_image']
images_pending_final_processing.clear()
if not images_to_zip:
status_message = f"Processing for {source_file_prefix} finished, but no images were converted for zipping."
print(status_message)
return None, status_message
print(f"Preparing zip file for {source_file_prefix} with {len(images_to_zip)} images...")
progress_updater(1.0, desc=f"Creating Zip File for {source_file_prefix}...")
zip_start_time = time.time()
# Use NamedTemporaryFile with delete=False for the final output path
# This file will persist until manually cleaned or OS cleanup
temp_zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
output_zip_path_temp = temp_zip_file.name
temp_zip_file.close() # Close the handle, but file remains
try:
# Write data to the temporary file path
create_zip_file(images_to_zip, output_zip_path_temp)
zip_duration = time.time() - zip_start_time
print(f"Temporary zip file for {source_file_prefix} created in {zip_duration:.2f} seconds at {output_zip_path_temp}")
# Construct the new, desired filename
temp_dir = os.path.dirname(output_zip_path_temp)
timestamp = int(time.time())
desired_filename = f"{source_file_prefix}_processed_{timestamp}.zip"
output_zip_path_final = os.path.join(temp_dir, desired_filename)
# Rename the temporary file to the desired name
print(f"Renaming temp file for {source_file_prefix} to: {output_zip_path_final}")
os.rename(output_zip_path_temp, output_zip_path_final)
print("Rename successful.")
output_zip_path_temp = None # Clear temp path as it's been renamed
except Exception as zip_or_rename_err:
print(f"Error during zip creation or renaming for {source_file_prefix}: {zip_or_rename_err}")
# Clean up the *original* temp file if it still exists and renaming failed
if output_zip_path_temp and os.path.exists(output_zip_path_temp):
try:
os.remove(output_zip_path_temp)
except OSError:
pass
if output_zip_path_final and os.path.exists(output_zip_path_final): # Check if rename partially happened
try:
os.remove(output_zip_path_final)
except OSError:
pass
raise zip_or_rename_err # Re-raise the error
# --- Prepare Status Message ---
processing_duration = time.time() - start_time - zip_duration # Exclude zipping time from processing time
total_duration = time.time() - start_time # Includes zipping/renaming
# --- Build final status message ---
person_stats = "N/A"
if enable_person_detection:
person_stats = f"{total_persons_detected_raw} raw, {person_targets_processed_count} targets (>{min_target_width_person_percentage*100:.1f}% itemW)"
halfbody_stats = "N/A"
if enable_halfbody_detection:
halfbody_stats = f"{total_halfbodies_detected_raw} raw, {halfbody_targets_processed_count} targets (>{min_target_width_halfbody_percentage*100:.1f}% itemW)"
fullframe_stats = f"{fullframe_targets_processed_count} targets"
face_stats = "N/A"
if enable_face_detection:
face_stats = f"{total_faces_detected_on_targets} on targets, {face_crops_pending_count} crops pending (>{min_crop_width_face_percentage*100:.1f}% parentW)"
if enable_face_filtering:
face_stats += f", {items_filtered_by_face_count} targets filtered"
head_stats = "N/A"
if enable_head_detection:
head_stats = f"{total_heads_detected_on_targets} on targets, {head_crops_pending_count} crops pending (>{min_crop_width_head_percentage*100:.1f}% parentW)"
if enable_head_filtering:
head_stats += f", {items_filtered_by_head_count} targets filtered"
ccip_stats = "N/A"
if enable_ccip_classification:
ccip_stats = f"{next_ccip_cluster_id} original clusters created, on {ccip_applied_count} targets. Folders renamed by image count."
lpips_stats = "N/A"
if enable_lpips_clustering:
lpips_stats = f"{lpips_images_subject_to_clustering} images processed, {total_lpips_clusters_created} clusters, {total_lpips_noise_samples} noise. Folders renamed by image count."
aesthetic_stats = "N/A"
if enable_aesthetic_analysis:
aesthetic_stats = f"On {aesthetic_applied_count} targets"
item_desc_for_stats = "Items from Provider" if not is_video_source else "Sampled Frames"
status_message = (
f"Processing for '{source_file_prefix}' Complete!\n"
f"Total time: {total_duration:.2f}s (Proc: {processing_duration:.2f}s, Zip: {zip_duration:.2f}s)\n"
f"{item_desc_for_stats}: {total_items_for_desc}, Processed Items: {processed_items_count}\n"
f"--- Primary Targets Processed ---\n"
f" Person Detection: {person_stats}\n"
f" Half-Body Detection: {halfbody_stats}\n"
f" Full Item Processing: {fullframe_stats}\n"
f"--- Items Pending Final Processing ({main_targets_pending_count} main, {face_crops_pending_count} face, {head_crops_pending_count} head) ---\n"
f" Face Detection: {face_stats}\n"
f" Head Detection: {head_stats}\n"
f" CCIP Classification: {ccip_stats}\n"
f" LPIPS Clustering: {lpips_stats}\n"
f" Aesthetic Analysis: {aesthetic_stats}\n"
f"Zip file contains {len(images_to_zip)} images.\n"
f"Output Zip: {output_zip_path_final}"
)
print(status_message)
progress_updater(1.0, desc=f"Finished {source_file_prefix}!")
# Return the path to the zip file
return output_zip_path_final, status_message
except Exception as e:
print(f"!! An unhandled error occurred during processing of {source_file_prefix}: {e}")
traceback.print_exc() # Print detailed traceback for debugging
# Clean up main data structures
images_pending_final_processing.clear()
ccip_clusters_info.clear()
gc.collect()
# Clean up temp file if it exists on general error
if output_zip_path_temp and os.path.exists(output_zip_path_temp):
try:
os.remove(output_zip_path_temp)
except OSError:
pass
# Clean up final file if it exists on general error (maybe renaming succeeded but later code failed)
if output_zip_path_final and os.path.exists(output_zip_path_final):
try:
os.remove(output_zip_path_final)
except OSError:
pass
return None, f"An error occurred with {source_file_prefix}: {e}"
# --- Main Processing Function for Input files ---
def process_inputs_main(
input_file_objects: List[Any], # Gradio File component gives list of tempfile._TemporaryFileWrapper
sample_interval_ms: int, # Relevant for videos only
# Person Detection
enable_person_detection: bool,
min_target_width_person_percentage: float,
person_model_name: str,
person_conf_threshold: float,
person_iou_threshold: float,
# Half-Body Detection
enable_halfbody_detection: bool,
enable_halfbody_cropping: bool,
min_target_width_halfbody_percentage: float,
halfbody_model_name: str,
halfbody_conf_threshold: float,
halfbody_iou_threshold: float,
# Head Detection
enable_head_detection: bool,
enable_head_cropping: bool,
min_crop_width_head_percentage: float,
enable_head_filtering: bool,
head_model_name: str,
head_conf_threshold: float,
head_iou_threshold: float,
# Face Detection
enable_face_detection: bool,
enable_face_cropping: bool,
min_crop_width_face_percentage: float,
enable_face_filtering: bool,
face_model_name: str,
face_conf_threshold: float,
face_iou_threshold: float,
# CCIP Classification
enable_ccip_classification: bool,
ccip_model_name: str,
ccip_threshold: float,
# LPIPS Clustering
enable_lpips_clustering: bool,
lpips_threshold: float,
# Aesthetic Analysis
enable_aesthetic_analysis: bool,
aesthetic_model_name: str,
progress=gr.Progress(track_tqdm=True) # Gradio progress for overall processing
) -> Tuple[Optional[List[str]], str]: # Returns list of ZIP paths and combined status
if not input_file_objects:
return [], "Error: No files provided."
video_file_temp_objects: List[Any] = []
image_file_temp_objects: List[Any] = []
for file_obj in input_file_objects:
# gr.Files returns a list of tempfile._TemporaryFileWrapper objects
# We need the .name attribute to get the actual file path
file_name = getattr(file_obj, 'orig_name', file_obj.name) # Use original name if available
if isinstance(file_name, str):
lower_file_name = file_name.lower()
if any(lower_file_name.endswith(ext) for ext in VIDEO_EXTENSIONS):
video_file_temp_objects.append(file_obj)
elif any(lower_file_name.endswith(ext) for ext in IMAGE_EXTENSIONS):
image_file_temp_objects.append(file_obj)
else:
print(f"Warning: File '{file_name}' has an unrecognized extension and will be skipped.")
else:
print(f"Warning: File object {file_obj} does not have a valid name and will be skipped.")
output_zip_paths_all_sources = []
all_status_messages = []
total_processing_tasks = (1 if image_file_temp_objects else 0) + len(video_file_temp_objects)
if total_processing_tasks == 0:
return [], "No processable video or image files found in the input."
tasks_completed_count = 0
# Print overall settings once
print(f"--- Overall Batch Processing Settings ---")
print(f" Number of image sequences to process: {1 if image_file_temp_objects else 0}")
print(f" Number of videos to process: {len(video_file_temp_objects)}")
print(f" Sample Interval (for videos): {sample_interval_ms}ms")
print(f" Detection Order: Person => Half-Body (alt) => Face => Head. Then: CCIP => LPIPS => Aesthetic.")
print(f" Person Detect = {enable_person_detection}" + (f" (MinW:{min_target_width_person_percentage*100:.1f}%, Mdl:{person_model_name}, Conf:{person_conf_threshold:.2f}, IoU:{person_iou_threshold:.2f})" if enable_person_detection else ""))
print(f" HalfBody Detect = {enable_halfbody_detection}" + (f" (FullFrameOnly, Crop:{enable_halfbody_cropping}, MinW:{min_target_width_halfbody_percentage*100:.1f}%, Mdl:{halfbody_model_name}, Conf:{halfbody_conf_threshold:.2f}, IoU:{halfbody_iou_threshold:.2f})" if enable_halfbody_detection else ""))
print(f" Face Detect = {enable_face_detection}" + (f" (Crop:{enable_face_cropping}, MinW:{min_crop_width_face_percentage*100:.1f}%, Filter:{enable_face_filtering}, Mdl:{face_model_name}, Conf:{face_conf_threshold:.2f}, IoU:{face_iou_threshold:.2f})" if enable_face_detection else ""))
print(f" Head Detect = {enable_head_detection}" + (f" (Crop:{enable_head_cropping}, MinW:{min_crop_width_head_percentage*100:.1f}%, Filter:{enable_head_filtering}, Mdl:{head_model_name}, Conf:{head_conf_threshold:.2f}, IoU:{head_iou_threshold:.2f})" if enable_head_detection else ""))
print(f" CCIP Classify = {enable_ccip_classification}" + (f" (Mdl:{ccip_model_name}, Thr:{ccip_threshold:.3f})" if enable_ccip_classification else ""))
print(f" LPIPS Clustering = {enable_lpips_clustering}" + (f" (Thr:{lpips_threshold:.3f})" if enable_lpips_clustering else ""))
print(f" Aesthetic Analyze = {enable_aesthetic_analysis}" + (f" (Mdl:{aesthetic_model_name})" if enable_aesthetic_analysis else ""))
print(f"--- End of Overall Settings ---")
# --- Process Image Sequence (if any) ---
if image_file_temp_objects:
image_group_label_base = "ImageGroup"
# Attempt to use first image name for more uniqueness, fallback to timestamp
try:
first_image_orig_name = getattr(image_file_temp_objects[0], 'orig_name', image_file_temp_objects[0].name)
image_group_label_base = sanitize_filename(first_image_orig_name, max_len=20)
except:
pass # Stick with "ImageGroup"
image_source_file_prefix = f"{image_group_label_base}_{int(time.time())}"
current_task_number = tasks_completed_count + 1
progress_description_prefix = f"Image Seq. {current_task_number}/{total_processing_tasks} ({image_source_file_prefix})"
progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting...")
print(f"\n>>> Processing Image Sequence: {image_source_file_prefix} ({len(image_file_temp_objects)} images) <<<")
def image_frames_provider_generator() -> Iterator[Tuple[Image.Image, int, int, int]]:
num_images = len(image_file_temp_objects)
for idx, img_obj in enumerate(image_file_temp_objects):
try:
pil_img = Image.open(img_obj.name).convert('RGB')
yield pil_img, idx, idx + 1, num_images
except Exception as e_load:
print(f"Error loading image {getattr(img_obj, 'orig_name', img_obj.name)}: {e_load}. Skipping.")
# If we skip, the total_items_in_source for _process_input_source_frames might be off
# For simplicity, we'll proceed, but this could be refined to adjust total_items dynamically.
# Or, pre-filter loadable images. For now, just skip.
continue
def image_group_progress_updater(item_progress_value: float, desc: str):
overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks
progress(overall_progress, desc=f"{progress_description_prefix}: {desc}")
try:
zip_file_path_single, status_message_single = _process_input_source_frames(
source_file_prefix=image_source_file_prefix,
frames_provider=image_frames_provider_generator(),
is_video_source=False,
enable_person_detection=enable_person_detection,
min_target_width_person_percentage=min_target_width_person_percentage,
person_model_name=person_model_name,
person_conf_threshold=person_conf_threshold,
person_iou_threshold=person_iou_threshold,
enable_halfbody_detection=enable_halfbody_detection,
enable_halfbody_cropping=enable_halfbody_cropping,
min_target_width_halfbody_percentage=min_target_width_halfbody_percentage,
halfbody_model_name=halfbody_model_name,
halfbody_conf_threshold=halfbody_conf_threshold,
halfbody_iou_threshold=halfbody_iou_threshold,
enable_head_detection=enable_head_detection,
enable_head_cropping=enable_head_cropping,
min_crop_width_head_percentage=min_crop_width_head_percentage,
enable_head_filtering=enable_head_filtering,
head_model_name=head_model_name,
head_conf_threshold=head_conf_threshold,
head_iou_threshold=head_iou_threshold,
enable_face_detection=enable_face_detection,
enable_face_cropping=enable_face_cropping,
min_crop_width_face_percentage=min_crop_width_face_percentage,
enable_face_filtering=enable_face_filtering,
face_model_name=face_model_name,
face_conf_threshold=face_conf_threshold,
face_iou_threshold=face_iou_threshold,
enable_ccip_classification=enable_ccip_classification,
ccip_model_name=ccip_model_name,
ccip_threshold=ccip_threshold,
enable_lpips_clustering=enable_lpips_clustering,
lpips_threshold=lpips_threshold,
enable_aesthetic_analysis=enable_aesthetic_analysis,
aesthetic_model_name=aesthetic_model_name,
progress_updater=image_group_progress_updater
)
if zip_file_path_single:
output_zip_paths_all_sources.append(zip_file_path_single)
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Succeeded ---\n{status_message_single}")
else:
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Failed ---\n{status_message_single}")
except Exception as e_img_seq:
error_msg = f"Critical error during processing of image sequence {image_source_file_prefix}: {e_img_seq}"
print(error_msg)
traceback.print_exc()
all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing CRITICALLY FAILED ---\n{error_msg}")
tasks_completed_count += 1
print(f">>> Finished attempt for Image Sequence: {image_source_file_prefix} <<<")
# --- Process Video Files (if any) ---
for video_idx, video_file_temp_obj in enumerate(video_file_temp_objects):
video_path_temp = video_file_temp_obj.name
video_original_filename = os.path.basename(getattr(video_file_temp_obj, 'orig_name', video_path_temp))
video_source_file_prefix = sanitize_filename(video_original_filename)
current_task_number = tasks_completed_count + 1
progress_description_prefix = f"Video {current_task_number}/{total_processing_tasks}"
print(f"\n>>> Processing Video: {video_original_filename} (Sanitized Prefix: {video_source_file_prefix}) <<<")
progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting processing...")
# It yields: (PIL.Image, frame_identifier_string, current_raw_frame_index_from_video, total_items_for_desc)
# The third element will be the raw frame number based on CAP_PROP_POS_FRAMES or current_pos_ms
# to align progress with total_items_for_desc (raw frame count).
def video_frames_provider_generator(video_path: str, interval_ms: int) -> Iterator[Tuple[Image.Image, int, int, int]]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file for provider: {video_path}")
return
total_items_for_desc = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_items_for_desc <= 0:
print(f"Warning: Video {video_original_filename} reported {total_items_for_desc} frames. This might be inaccurate. Proceeding...")
# If it's 0, the progress in _process_input_source_frames might behave unexpectedly.
# Setting to 1 to avoid division by zero, but this means progress won't be very useful.
total_items_for_desc = 1 # Fallback to prevent division by zero
# processed_count_in_provider = 0 # Counts *sampled* frames, not used for progress index
last_processed_ms = -float('inf')
raw_frames_read_by_provider = 0 # Counts all frames read by cap.read()
try:
while True:
# For progress, use current_pos_ms or CAP_PROP_POS_FRAMES
# CAP_PROP_POS_FRAMES is a 0-based index of the next frame to be decoded/captured.
current_raw_frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) # Use this for progress
current_pos_ms_in_provider = cap.get(cv2.CAP_PROP_POS_MSEC)
# Loop break condition (more robust)
if raw_frames_read_by_provider > 0 and current_pos_ms_in_provider <= last_processed_ms and interval_ms > 0 :
# If interval_ms is 0 or very small, current_pos_ms might not advance much for consecutive reads.
# Adding a check for raw_frames_read_by_provider against a large number or CAP_PROP_FRAME_COUNT
# could be an additional safety, but CAP_PROP_FRAME_COUNT can be unreliable.
# The ret_frame check is the primary exit.
pass # Let ret_frame handle the actual end. This check is for stuck videos.
should_process_this_frame = current_pos_ms_in_provider >= last_processed_ms + interval_ms - 1
ret_frame, frame_cv_data = cap.read()
if not ret_frame: # Primary exit point for the loop
break
raw_frames_read_by_provider +=1 # Incremented after successful read
if should_process_this_frame:
try:
pil_img = convert_to_pil(frame_cv_data)
last_processed_ms = current_pos_ms_in_provider
yield pil_img, int(current_pos_ms_in_provider), current_raw_frame_index + 1, total_items_for_desc # Yield 1-based raw frame index
except Exception as e_conv:
print(f"Error converting frame at {current_pos_ms_in_provider}ms (raw index {current_raw_frame_index}) for {video_original_filename}: {e_conv}. Skipping.")
finally:
pass
finally:
if cap.isOpened():
cap.release()
print(f" Video capture for provider ({video_original_filename}) released.")
def video_progress_updater(item_progress_value: float, desc: str):
overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks
progress(overall_progress, desc=f"{progress_description_prefix}: {desc}")
try:
zip_file_path_single, status_message_single = _process_input_source_frames(
source_file_prefix=video_source_file_prefix,
frames_provider=video_frames_provider_generator(video_path_temp, sample_interval_ms),
is_video_source=True,
enable_person_detection=enable_person_detection,
min_target_width_person_percentage=min_target_width_person_percentage,
person_model_name=person_model_name,
person_conf_threshold=person_conf_threshold,
person_iou_threshold=person_iou_threshold,
enable_halfbody_detection=enable_halfbody_detection,
enable_halfbody_cropping=enable_halfbody_cropping,
min_target_width_halfbody_percentage=min_target_width_halfbody_percentage,
halfbody_model_name=halfbody_model_name,
halfbody_conf_threshold=halfbody_conf_threshold,
halfbody_iou_threshold=halfbody_iou_threshold,
enable_head_detection=enable_head_detection,
enable_head_cropping=enable_head_cropping,
min_crop_width_head_percentage=min_crop_width_head_percentage,
enable_head_filtering=enable_head_filtering,
head_model_name=head_model_name,
head_conf_threshold=head_conf_threshold,
head_iou_threshold=head_iou_threshold,
enable_face_detection=enable_face_detection,
enable_face_cropping=enable_face_cropping,
min_crop_width_face_percentage=min_crop_width_face_percentage,
enable_face_filtering=enable_face_filtering,
face_model_name=face_model_name,
face_conf_threshold=face_conf_threshold,
face_iou_threshold=face_iou_threshold,
enable_ccip_classification=enable_ccip_classification,
ccip_model_name=ccip_model_name,
ccip_threshold=ccip_threshold,
enable_lpips_clustering=enable_lpips_clustering,
lpips_threshold=lpips_threshold,
enable_aesthetic_analysis=enable_aesthetic_analysis,
aesthetic_model_name=aesthetic_model_name,
progress_updater=video_progress_updater
)
if zip_file_path_single:
output_zip_paths_all_sources.append(zip_file_path_single)
all_status_messages.append(f"--- Video ({video_original_filename}) Processing Succeeded ---\n{status_message_single}")
else:
all_status_messages.append(f"--- Video ({video_original_filename}) Processing Failed ---\n{status_message_single}")
except Exception as e_vid:
# This catches errors if process_video itself raises an unhandled exception
# (though process_video has its own try-except)
error_msg = f"Critical error during processing of video {video_original_filename}: {e_vid}"
print(error_msg)
traceback.print_exc()
all_status_messages.append(f"--- Video ({video_original_filename}) Processing CRITICALLY FAILED ---\n{error_msg}")
tasks_completed_count += 1
print(f">>> Finished attempt for Video: {video_original_filename} <<<")
# Gradio manages the lifecycle of video_path_temp (the uploaded temp file)
final_summary_message = "\n\n==============================\n\n".join(all_status_messages)
successful_zips_count = len(output_zip_paths_all_sources)
if successful_zips_count == 0 and total_processing_tasks > 0:
final_summary_message = f"ALL {total_processing_tasks} INPUT SOURCE(S) FAILED TO PRODUCE A ZIP FILE.\n\n" + final_summary_message
elif total_processing_tasks > 0:
final_summary_message = f"Successfully processed {successful_zips_count} out of {total_processing_tasks} input source(s).\n\n" + final_summary_message
else: # Should be caught earlier by "No processable files"
final_summary_message = "No inputs were processed."
progress(1.0, desc="All processing attempts finished.")
# gr.Files output expects a list of file paths. An empty list is fine if no files.
return output_zip_paths_all_sources, final_summary_message
# --- Gradio Interface Setup ---
css = """
/* Default (Light Mode) Styles */
#warning {
background-color: #FFCCCB; /* Light red background */
padding: 10px;
border-radius: 5px;
color: #A00000; /* Dark red text */
border: 1px solid #E5B8B7; /* A slightly darker border for more definition */
}
/* Dark Mode Styles */
@media (prefers-color-scheme: dark) {
#warning {
background-color: #5C1A1A; /* Darker red background, suitable for dark mode */
color: #FFDDDD; /* Light pink text, for good contrast against the dark red background */
border: 1px solid #8B0000; /* A more prominent dark red border in dark mode */
}
}
#status_box {
white-space: pre-wrap !important; /* Ensure status messages show newlines */
font-family: monospace; /* Optional: Use monospace for better alignment */
}
"""
# --- Define Model Lists ---
person_models = ['person_detect_v1.3_s', 'person_detect_v1.2_s', 'person_detect_v1.1_s', 'person_detect_v1.1_m', 'person_detect_v1_m', 'person_detect_v1.1_n', 'person_detect_v0_s', 'person_detect_v0_m', 'person_detect_v0_x']
halfbody_models = ['halfbody_detect_v1.0_s', 'halfbody_detect_v1.0_n', 'halfbody_detect_v0.4_s', 'halfbody_detect_v0.3_s', 'halfbody_detect_v0.2_s']
head_models = ['head_detect_v2.0_s', 'head_detect_v2.0_m', 'head_detect_v2.0_n', 'head_detect_v2.0_x', 'head_detect_v2.0_s_yv11', 'head_detect_v2.0_m_yv11', 'head_detect_v2.0_n_yv11', 'head_detect_v2.0_x_yv11', 'head_detect_v2.0_l_yv11']
face_models = ['face_detect_v1.4_s', 'face_detect_v1.4_n', 'face_detect_v1.3_s', 'face_detect_v1.3_n', 'face_detect_v1.2_s', 'face_detect_v1.1_s', 'face_detect_v1.1_n', 'face_detect_v1_s', 'face_detect_v1_n', 'face_detect_v0_s', 'face_detect_v0_n']
ccip_models = ['ccip-caformer-24-randaug-pruned', 'ccip-caformer-6-randaug-pruned_fp32', 'ccip-caformer-5_fp32']
aesthetic_models = ['swinv2pv3_v0_448_ls0.2_x', 'swinv2pv3_v0_448_ls0.2', 'caformer_s36_v0_ls0.2']
with gr.Blocks(css=css) as demo:
gr.Markdown("# Video Processor using dghs-imgutils")
gr.Markdown("Upload one or more videos, or a sequence of images. Videos are processed individually, while multiple images are treated as a single sequence. Each processed source (video or image sequence) is then sequentially analyzed by [dghs-imgutils](https://github.com/deepghs/imgutils) to detect subjects, classify items, and process its content according to your settings, ultimately generating a ZIP file with the extracted images.")
gr.Markdown("**Detection Flow:** " +
"[Person](https://dghs-imgutils.deepghs.org/main/api_doc/detect/person.html) ⇒ " +
"[Half-Body](https://dghs-imgutils.deepghs.org/main/api_doc/detect/halfbody.html) (if no person) ⇒ " +
"[Face](https://dghs-imgutils.deepghs.org/main/api_doc/detect/face.html) (on target) ⇒ " +
"[Head](https://dghs-imgutils.deepghs.org/main/api_doc/detect/head.html) (on target).")
gr.Markdown("**Analysis Flow:** " +
"[CCIP](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/ccip.html) Clustering ⇒ " +
"[LPIPS](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/lpips.html) Clustering ⇒ " +
"[Aesthetic](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/dbaesthetic.html) Labeling.")
gr.Markdown("**Note on CCIP Folders:** CCIP cluster folders are named `{source_prefix}_ccip_XXX`, sorted by image count (most images = `_ccip_000`).")
gr.Markdown("**Note on LPIPS Folders:** LPIPS cluster folders (e.g., `lpips_XXX` or `lpips_sub_XXX`) are also sorted by image count within their scope. 'noise' folders are named explicitly.")
with gr.Row():
with gr.Column(scale=1):
# --- Input Components ---
process_button = gr.Button("Process Input(s) & Generate ZIP(s)", variant="primary")
input_files = gr.Files(label="Upload Videos or Image Sequences", file_types=['video', 'image'], file_count="multiple")
sample_interval_ms = gr.Number(label="Sample Interval (ms, for videos)", value=1000, minimum=1, step=100)
# --- Detection Options ---
gr.Markdown("**Detection Options**")
# --- Person Detection Block ---
with gr.Accordion("Person Detection Options", open=True):
enable_person_detection = gr.Checkbox(label="Enable Person Detection", value=True)
with gr.Group() as person_detection_params_group:
min_target_width_person_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.25, step=0.01,
label="Min Target Width (% of Item Width)",
info="Minimum width for a detected person to be processed (e.g., 0.25 = 25%)."
)
person_model_name_dd = gr.Dropdown(person_models, label="PD Model", value=person_models[0])
person_conf_threshold = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="PD Conf")
person_iou_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="PD IoU")
enable_person_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_person_detection, outputs=person_detection_params_group)
# --- Half-Body Detection Block ---
with gr.Accordion("Half-Body Detection Options", open=True):
enable_halfbody_detection = gr.Checkbox(label="Enable Half-Body Detection", value=True)
with gr.Group() as halfbody_params_group:
gr.Markdown("<small>_Detects half-bodies in full items if Person Detection is off/fails._</small>")
enable_halfbody_cropping = gr.Checkbox(label="Use Half-Bodies as Targets", value=True)
min_target_width_halfbody_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.25, step=0.01,
label="Min Target Width (% of Item Width)",
info="Minimum width for a detected half-body to be processed (e.g., 0.25 = 25%)."
)
halfbody_model_name_dd = gr.Dropdown(halfbody_models, label="HBD Model", value=halfbody_models[0])
halfbody_conf_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="HBD Conf")
halfbody_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HBD IoU")
enable_halfbody_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_halfbody_detection, outputs=halfbody_params_group)
# --- Face Detection Block ---
with gr.Accordion("Face Detection Options", open=True):
enable_face_detection = gr.Checkbox(label="Enable Face Detection", value=True)
with gr.Group() as face_params_group:
enable_face_filtering = gr.Checkbox(label="Filter Targets Without Detected Faces", value=True)
enable_face_cropping = gr.Checkbox(label="Crop Detected Faces", value=False)
min_crop_width_face_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.2, step=0.01,
label="Min Crop Width (% of Parent Width)",
info="Minimum width for a face crop relative to its parent image's width (e.g., 0.2 = 20%)."
)
face_model_name_dd = gr.Dropdown(face_models, label="FD Model", value=face_models[0])
face_conf_threshold = gr.Slider(0.0, 1.0, value=0.25, step=0.05, label="FD Conf")
face_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="FD IoU")
enable_face_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_face_detection, outputs=face_params_group)
# --- Head Detection Block ---
with gr.Accordion("Head Detection Options", open=True):
enable_head_detection = gr.Checkbox(label="Enable Head Detection", value=True)
with gr.Group() as head_params_group:
gr.Markdown("<small>_Detects heads in targets. Crops if meets width req._</small>")
enable_head_filtering = gr.Checkbox(label="Filter Targets Without Heads", value=True)
enable_head_cropping = gr.Checkbox(label="Crop Detected Heads", value=False)
min_crop_width_head_percentage_slider = gr.Slider(
minimum=0.0, maximum=1.0, value=0.2, step=0.01,
label="Min Crop Width (% of Parent Width)",
info="Minimum width for a head crop relative to its parent image's width (e.g., 0.2 = 20%)."
)
head_model_name_dd = gr.Dropdown(head_models, label="HD Model", value=head_models[0])
head_conf_threshold = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="HD Conf")
head_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HD IoU")
enable_head_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_head_detection, outputs=head_params_group)
# --- Analysis/Classification Options ---
gr.Markdown("**Analysis & Classification**")
# --- CCIP Classification Block ---
with gr.Accordion("CCIP Classification Options", open=True):
enable_ccip_classification = gr.Checkbox(label="Enable CCIP Classification", value=True)
with gr.Group() as ccip_params_group:
gr.Markdown("<small>_Clusters results by similarity. Folders sorted by image count._</small>")
ccip_model_name_dd = gr.Dropdown(ccip_models, label="CCIP Model", value=ccip_models[0])
ccip_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.20, label="CCIP Similarity Threshold")
enable_ccip_classification.change(fn=lambda e: gr.update(visible=e), inputs=enable_ccip_classification, outputs=ccip_params_group)
# LPIPS Clustering Options
with gr.Accordion("LPIPS Clustering Options", open=True):
enable_lpips_clustering = gr.Checkbox(label="Enable LPIPS Clustering", value=True)
with gr.Group() as lpips_params_group:
gr.Markdown("<small>_Clusters images by LPIPS similarity. Applied after CCIP (if enabled) or globally. Folders sorted by image count._</small>")
lpips_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.45, label="LPIPS Similarity Threshold")
enable_lpips_clustering.change(fn=lambda e: gr.update(visible=e), inputs=enable_lpips_clustering, outputs=lpips_params_group)
# --- Aesthetic Analysis Block ---
with gr.Accordion("Aesthetic Analysis Options", open=True):
enable_aesthetic_analysis = gr.Checkbox(label="Enable Aesthetic Analysis (Anime)", value=True)
with gr.Group() as aesthetic_params_group:
gr.Markdown("<small>_Prepends aesthetic label to filenames._</small>")
aesthetic_model_name_dd = gr.Dropdown(aesthetic_models, label="Aesthetic Model", value=aesthetic_models[0])
enable_aesthetic_analysis.change(fn=lambda e: gr.update(visible=e), inputs=enable_aesthetic_analysis, outputs=aesthetic_params_group)
gr.Markdown("---")
gr.Markdown("**Warning:** Complex combinations can be slow. Models downloaded on first use.", elem_id="warning")
with gr.Column(scale=1):
# --- Output Components ---
status_text = gr.Textbox(label="Processing Status", interactive=False, lines=20, elem_id="status_box")
output_zips = gr.Files(label="Download Processed Images (ZIPs)")
# Connect button click
process_button.click(
fn=process_inputs_main,
inputs=[
input_files, sample_interval_ms,
# Person Detect
enable_person_detection, min_target_width_person_percentage_slider,
person_model_name_dd, person_conf_threshold, person_iou_threshold,
# HalfBody Detect
enable_halfbody_detection, enable_halfbody_cropping, min_target_width_halfbody_percentage_slider,
halfbody_model_name_dd, halfbody_conf_threshold, halfbody_iou_threshold,
# Head Detect
enable_head_detection, enable_head_cropping, min_crop_width_head_percentage_slider,
enable_head_filtering, head_model_name_dd, head_conf_threshold, head_iou_threshold,
# Face Detect
enable_face_detection, enable_face_cropping, min_crop_width_face_percentage_slider,
enable_face_filtering, face_model_name_dd, face_conf_threshold, face_iou_threshold,
# CCIP
enable_ccip_classification, ccip_model_name_dd, ccip_threshold_slider,
# LPIPS
enable_lpips_clustering, lpips_threshold_slider,
# Aesthetic
enable_aesthetic_analysis, aesthetic_model_name_dd,
],
outputs=[output_zips, status_text]
)
# --- Launch Script ---
if __name__ == "__main__":
print("Starting Gradio App...")
# Model pre-check
try:
print("Checking/Downloading models (this might take a moment)...")
# Use simple, small images for checks
dummy_img_pil = Image.new('RGB', (64, 64), color = 'orange')
print(" - Person detection...")
_ = person_detector.detect_person(dummy_img_pil, model_name=person_models[0])
print(" - HalfBody detection...")
_ = halfbody_detector.detect_halfbody(dummy_img_pil, model_name=halfbody_models[0])
print(" - Head detection...")
_ = head_detector.detect_heads(dummy_img_pil, model_name=head_models[0])
print(" - Face detection...")
_ = face_detector.detect_faces(dummy_img_pil, model_name=face_models[0])
print(" - CCIP feature extraction...")
_ = ccip_analyzer.ccip_extract_feature(dummy_img_pil, size=384, model=ccip_models[0])
print(" - LPIPS feature extraction...")
_ = lpips_module.lpips_extract_feature(dummy_img_pil)
print(" - Aesthetic analysis...")
_ = dbaesthetic_analyzer.anime_dbaesthetic(dummy_img_pil, model_name=aesthetic_models[0])
print("Models seem ready or downloaded.")
del dummy_img_pil
gc.collect()
except Exception as model_err:
print(f"\n--- !!! WARNING !!! ---")
print(f"Could not pre-check/download all models: {model_err}")
print(f"Models will be downloaded when first used by the application, which may cause a delay on the first run.")
print(f"Check your internet connection and library installation (pip install \"dghs-imgutils[gpu]\").")
print(f"-----------------------\n")
# Launch the app
demo.launch(inbrowser=True)
|