""" Feedback Viewer Module This module provides functions for viewing and displaying feedback entries from Hugging Face Datasets. """ import logging import math import os from typing import List, Optional from io import BytesIO from datetime import datetime import tempfile import uuid import base64 from datasets import Dataset import gradio as gr from PIL import Image from ipadapter_model import create_image_grid # Hugging Face Datasets imports try: from datasets import load_dataset # type: ignore from huggingface_hub import login # type: ignore HF_DATASETS_AVAILABLE = True except ImportError: load_dataset = None # type: ignore login = None # type: ignore HF_DATASETS_AVAILABLE = False logging.warning("Hugging Face datasets not available. Feedback viewer will not work.") # Configuration - can be overridden HF_FEEDBACK_DATASET_REPO = os.getenv("HF_FEEDBACK_DATASET_REPO", None) HF_TOKEN = os.getenv("HF_TOKEN", None) def store_feedback_to_hf_dataset( rating: str, feedback_text: str, alpha_start: float, alpha_end: float, n_steps: int, input1_image: Optional[Image.Image], input2_image: Optional[Image.Image], extra_images: Optional[List[Image.Image]], negative_images: Optional[List[Image.Image]], blending_result_images: Optional[List[Image.Image]], is_public: bool = True, dataset_repo: Optional[str] = None, token: Optional[str] = None ) -> bool: """ Store feedback and images to a Hugging Face Dataset. Uses concatenate_datasets to append new entries without re-processing existing ones. This preserves existing images correctly. Args: rating: User rating (1-5) feedback_text: User feedback text alpha_start: Start alpha value used alpha_end: End alpha value used n_steps: Number of output images input1_image: First input image (PIL Image) input2_image: Second input image (PIL Image) extra_images: List of extra images (PIL Images) negative_images: List of negative images (PIL Images) blending_result_images: List of blending result images (PIL Images) is_public: Whether the feedback should be publicly visible (default True) dataset_repo: Hugging Face dataset repository (username/dataset-name) token: Hugging Face token (if None, will try to use HF_TOKEN env var) Returns: True if feedback was stored successfully, False otherwise """ if not HF_DATASETS_AVAILABLE: logging.warning("Hugging Face datasets library not available") return False if dataset_repo is None: dataset_repo = HF_FEEDBACK_DATASET_REPO if dataset_repo is None: logging.warning("HF_FEEDBACK_DATASET_REPO not set. Set it to your Hugging Face username/dataset-name") return False # Validate that input1 and input2 images are not empty if input1_image is None: error_msg = "Input 1 image cannot be empty. Please provide a valid image." logging.error(error_msg) raise ValueError(error_msg) if input2_image is None: error_msg = "Input 2 image cannot be empty. Please provide a valid image." logging.error(error_msg) raise ValueError(error_msg) # Check if image is actually empty (size 0x0) if hasattr(input1_image, 'size') and input1_image.size == (0, 0): error_msg = "Input 1 image is empty (0x0 size). Please provide a valid image." logging.error(error_msg) raise ValueError(error_msg) if hasattr(input2_image, 'size') and input2_image.size == (0, 0): error_msg = "Input 2 image is empty (0x0 size). Please provide a valid image." logging.error(error_msg) raise ValueError(error_msg) try: # Type guards - these should not be None if HF_DATASETS_AVAILABLE is True if Dataset is None or load_dataset is None or login is None: logging.error("Hugging Face datasets libraries not properly imported") return False from datasets import Features, Image as ImageFeature, Value, Sequence, concatenate_datasets # Use token from parameter, environment variable, or try to login if token is None: token = HF_TOKEN if token: login(token=token, add_to_git_credential=True) # Define features schema features = Features({ "uuid": Value("string"), "timestamp": Value("string"), "rating": Value("int64"), "feedback": Value("string"), "alpha_start": Value("float64"), "alpha_end": Value("float64"), "n_steps": Value("int64"), "is_public": Value("bool"), # Whether feedback is publicly visible "input1": ImageFeature(), "input2": ImageFeature(), "extra_images": Sequence(ImageFeature()), "negative_images": Sequence(ImageFeature()), "blending_results": Sequence(ImageFeature()), # List of blending result images }) # Ensure images are RGB PIL Images def prepare_image(img): if img is None: return None if isinstance(img, Image.Image): return img.convert("RGB") if img.mode != "RGB" else img return None # Generate a new UUID for this feedback entry new_uuid = str(uuid.uuid4()) logging.info(f"Generated UUID for new feedback entry: {new_uuid}") # Create feedback entry with PIL Images directly # ImageFeature handles PIL Images and uploads them properly feedback_entry = { "uuid": new_uuid, "timestamp": datetime.now().isoformat(), "rating": int(rating) if rating else 0, "feedback": feedback_text or "", "alpha_start": float(alpha_start), "alpha_end": float(alpha_end), "n_steps": int(n_steps), "is_public": bool(is_public), # Store public visibility flag "input1": prepare_image(input1_image), "input2": prepare_image(input2_image), "extra_images": [prepare_image(img) for img in (extra_images or []) if prepare_image(img) is not None], "negative_images": [prepare_image(img) for img in (negative_images or []) if prepare_image(img) is not None], "blending_results": [prepare_image(img) for img in (blending_result_images or []) if prepare_image(img) is not None], } # Create a new dataset with just the new entry new_entry_dataset = Dataset.from_list([feedback_entry], features=features) # Try to load existing dataset and concatenate try: existing_dataset = load_dataset(dataset_repo, split="train") logging.info(f"Loaded existing dataset with {len(existing_dataset)} entries") # Check if existing dataset has UUID field, if not add it if "uuid" not in existing_dataset.column_names: logging.info("Existing dataset missing UUID field, adding UUIDs to existing entries") def add_uuid(example): if "uuid" not in example or not example.get("uuid"): example["uuid"] = str(uuid.uuid4()) return example existing_dataset = existing_dataset.map(add_uuid) # Check if existing dataset has is_public field, if not add it (default to True for old entries) if "is_public" not in existing_dataset.column_names: logging.info("Existing dataset missing is_public field, adding is_public=True to existing entries") def add_is_public(example): if "is_public" not in example: example["is_public"] = True return example existing_dataset = existing_dataset.map(add_is_public) # Ensure the existing dataset has the UUID and is_public fields in its schema # Add fields to schema if missing, then cast to match new schema try: existing_features = existing_dataset.features from datasets import Features as DatasetFeatures updated_features_dict = dict(existing_features) schema_updated = False if "uuid" not in existing_features: # Create new features dict with UUID added updated_features_dict["uuid"] = Value("string") schema_updated = True if "is_public" not in existing_features: # Add is_public field to schema updated_features_dict["is_public"] = Value("bool") schema_updated = True if schema_updated: updated_features = DatasetFeatures(updated_features_dict) existing_dataset = existing_dataset.cast(updated_features) # Cast to match new schema (this ensures all fields match) existing_dataset = existing_dataset.cast(features) except Exception as cast_error: logging.warning(f"Could not cast existing dataset schema: {cast_error}. Attempting to proceed anyway.") # Try to add UUID field manually if cast failed if "uuid" not in existing_dataset.column_names: def ensure_uuid(example): if "uuid" not in example or not example.get("uuid"): example["uuid"] = str(uuid.uuid4()) return example existing_dataset = existing_dataset.map(ensure_uuid) # Concatenate: existing entries stay untouched, new entry is appended combined_dataset = concatenate_datasets([existing_dataset, new_entry_dataset]) logging.info(f"Combined dataset has {len(combined_dataset)} entries") except Exception as e: logging.info(f"Dataset not found or empty, creating new one: {e}") combined_dataset = new_entry_dataset # Verify UUID is present in all entries before pushing if "uuid" not in combined_dataset.column_names: logging.warning("UUID column missing from combined dataset, adding UUIDs to all entries") def ensure_all_have_uuid(example): if "uuid" not in example or not example.get("uuid"): example["uuid"] = str(uuid.uuid4()) return example combined_dataset = combined_dataset.map(ensure_all_have_uuid) # Ensure schema includes UUID try: combined_dataset = combined_dataset.cast(features) except Exception as cast_err: logging.warning(f"Could not cast combined dataset to features schema: {cast_err}") # Verify the new entry has UUID if len(combined_dataset) > 0: last_entry = combined_dataset[-1] if last_entry.get("uuid") == new_uuid: logging.info(f"Verified UUID {new_uuid} is present in the new entry") else: logging.warning(f"UUID mismatch! Expected {new_uuid}, got {last_entry.get('uuid')}") # Push to hub combined_dataset.push_to_hub( dataset_repo, private=True, token=token ) logging.info(f"Feedback stored successfully to {dataset_repo} with UUID: {new_uuid}") return True except Exception as e: logging.error(f"Error storing feedback to Hugging Face Dataset: {e}") import traceback traceback.print_exc() return False def convert_dataset_image_to_pil(image_data): """ Convert image data from Hugging Face Dataset format to PIL Image. Dataset images can be: - PIL Image (already correct) - datasets.Image object (from Hugging Face datasets library) - dict with 'path' key (file path) - dict with 'bytes' key (image bytes) - dict with 'image' key (nested PIL Image) - str (file path, filename, or hub reference) - None """ if image_data is None or image_data == "": return None # Already a PIL Image if isinstance(image_data, Image.Image): return image_data.convert("RGB") if image_data.mode != "RGB" else image_data # Handle datasets.Image type (from Hugging Face datasets library) # This is the primary format when images are loaded from hub with ImageFeature try: from datasets import Image as DatasetImage # Check if it's a DatasetImage (using isinstance or checking class name) is_dataset_image = isinstance(image_data, DatasetImage) if DatasetImage else False if not is_dataset_image and hasattr(image_data, '__class__'): class_name = str(type(image_data)) is_dataset_image = 'Image' in class_name and 'datasets' in class_name if is_dataset_image: # Convert datasets.Image to PIL Image # DatasetImage objects decode lazily when accessed try: # Method 1: Try accessing .image attribute (most common) if hasattr(image_data, 'image'): pil_img = image_data.image if isinstance(pil_img, Image.Image): return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img # Method 2: Try calling as function (some versions) if callable(image_data): try: pil_img = image_data() if isinstance(pil_img, Image.Image): return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img except Exception: pass # Method 3: Try convert method if hasattr(image_data, 'convert'): try: pil_img = image_data.convert("RGB") if isinstance(pil_img, Image.Image): return pil_img except Exception: pass # Method 4: Try direct access (some versions store PIL Image directly) if isinstance(image_data, Image.Image): return image_data.convert("RGB") if image_data.mode != "RGB" else image_data # Method 5: Try to get bytes and decode if hasattr(image_data, 'bytes'): try: bytes_data = image_data.bytes if bytes_data: return Image.open(BytesIO(bytes_data)).convert("RGB") except Exception: pass # Method 6: Try path attribute and load from hub cache if hasattr(image_data, 'path'): try: path = image_data.path if path: # Path might be relative (hub reference) or absolute (local cache) if os.path.exists(path): return Image.open(path).convert("RGB") # If path doesn't exist, it might be in HF cache # Try to find it using huggingface_hub try: from huggingface_hub import hf_hub_download # We'd need the repo name, but for now try the path # The dataset library should handle this automatically pass except: pass except Exception: pass # Method 7: Try accessing via __getitem__ or direct attribute access try: if hasattr(image_data, '__getitem__'): pil_img = image_data[0] if len(image_data) > 0 else None if isinstance(pil_img, Image.Image): return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img except Exception: pass logging.debug(f"Could not convert DatasetImage object (type: {type(image_data)}, methods tried)") return None except Exception as e: logging.debug(f"Error converting DatasetImage: {e}") return None except (ImportError, AttributeError) as e: pass # Handle dictionary formats if isinstance(image_data, dict): # Check for nested image if 'image' in image_data: return convert_dataset_image_to_pil(image_data['image']) # Check for path if 'path' in image_data: try: path = image_data['path'] # If it's a relative path (just filename), try to find it in dataset cache if not os.path.isabs(path) and not os.path.exists(path): # Try to load from huggingface cache try: from huggingface_hub import hf_hub_download # This is a fallback - we'd need the dataset repo to download # For now, just try the path as-is pass except: pass return Image.open(path).convert("RGB") except Exception as e: logging.warning(f"Could not load image from path {image_data.get('path')}: {e}") return None # Check for bytes if 'bytes' in image_data: try: return Image.open(BytesIO(image_data['bytes'])).convert("RGB") except Exception as e: logging.warning(f"Could not load image from bytes: {e}") return None # If dict has no recognized keys, try to extract image directly # Some datasets store PIL Images wrapped in dicts for key in ['pil', 'PIL', 'img', 'image_data']: if key in image_data: return convert_dataset_image_to_pil(image_data[key]) # Last resort: if dict has a single value that might be an image path if len(image_data) == 1: value = list(image_data.values())[0] if isinstance(value, str): return convert_dataset_image_to_pil(value) # Try to open if it's a string path if isinstance(image_data, str): # First try as direct file path try: if os.path.exists(image_data): return Image.open(image_data).convert("RGB") except: pass # If it's just a filename (not a full path), it might be in the dataset cache # When images are loaded from hub, they're cached locally # Try to find it in the Hugging Face cache if not os.path.isabs(image_data) and not os.path.exists(image_data): try: # Try to get the image from the currently loaded dataset # This is a workaround - ideally the dataset should return Image objects # For now, log a warning logging.debug(f"Image filename '{image_data}' not found locally. It should be loaded as Image object from dataset.") return None except: pass # Last attempt: try opening it anyway try: return Image.open(image_data).convert("RGB") except Exception as e: logging.warning(f"Could not load image from string '{image_data}': {e}") return None # If we get here, we don't know how to convert it logging.warning(f"Unknown image format: {type(image_data)}") return None def create_image_grid_for_entry(input1, input2, result): """Create a grid showing input1, input2, and result side by side.""" if input1 is None and input2 is None and result is None: return None # Create a horizontal grid: input1 | input2 | result images_to_combine = [] if input1: images_to_combine.append(input1) if input2: images_to_combine.append(input2) if result: images_to_combine.append(result) if not images_to_combine: return None # Calculate grid dimensions max_width = max(img.width for img in images_to_combine if img) max_height = max(img.height for img in images_to_combine if img) # Resize all images to same height, maintain aspect ratio resized_images = [] for img in images_to_combine: if img: aspect_ratio = img.width / img.height new_height = max_height new_width = int(new_height * aspect_ratio) resized_images.append(img.resize((new_width, new_height), Image.Resampling.LANCZOS)) else: # Create placeholder placeholder = Image.new("RGB", (max_width, max_height), color="gray") resized_images.append(placeholder) # Combine images horizontally total_width = sum(img.width for img in resized_images) combined = Image.new("RGB", (total_width, max_height)) x_offset = 0 for img in resized_images: combined.paste(img, (x_offset, 0)) x_offset += img.width return combined def pil_image_to_base64(img: Optional[Image.Image], max_size: int = 400) -> str: """Convert PIL Image to base64 data URI for HTML embedding.""" if img is None: return "" # Resize if too large if max(img.width, img.height) > max_size: aspect_ratio = img.width / img.height if img.width > img.height: new_width = max_size new_height = int(max_size / aspect_ratio) else: new_height = max_size new_width = int(max_size * aspect_ratio) img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) # Convert to RGB if needed if img.mode != "RGB": img = img.convert("RGB") # Convert to base64 buffered = BytesIO() img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return f"data:image/png;base64,{img_str}" def load_feedback_from_hf_dataset( dataset_repo: Optional[str] = None, token: Optional[str] = None, limit: Optional[int] = None, reverse: bool = False, public_only: bool = True ) -> List[dict]: """ Load feedback entries from a Hugging Face Dataset. Args: dataset_repo: Hugging Face dataset repository (username/dataset-name) token: Hugging Face token (if None, will try to use HF_TOKEN env var) limit: Maximum number of entries to return (None for all) reverse: If True, reverse the order (newest first). Default False (oldest first). public_only: If True, only return public feedback entries. Default True. Old entries without is_public field are treated as public. Returns: List of feedback entries as dictionaries """ if not HF_DATASETS_AVAILABLE: logging.warning("Hugging Face datasets library not available") return [] if dataset_repo is None: dataset_repo = HF_FEEDBACK_DATASET_REPO if dataset_repo is None: logging.warning("HF_FEEDBACK_DATASET_REPO not set") return [] try: # Type guards if load_dataset is None or login is None: logging.error("Hugging Face datasets libraries not properly imported") return [] # Use token from parameter, environment variable, or try to login if token is None: token = HF_TOKEN if token: login(token=token, add_to_git_credential=True) # Load dataset and cast image columns to ImageFeature to ensure proper decoding try: from datasets import Features, Image as ImageFeature, Value, Sequence # Load dataset first dataset = load_dataset(dataset_repo, split="train") # Cast image columns to ImageFeature to ensure they're properly decoded from hub storage # This is crucial - without casting, images stored as file paths won't be accessible # When images are uploaded with ImageFeature, they're stored in hub storage # Casting ensures they're decoded as Image objects when loaded try: # Check current feature types current_features = dataset.features logging.debug(f"Dataset features before casting: {list(current_features.keys())}") # Cast each image column - this will decode file paths from hub storage if "input1" in dataset.column_names: dataset = dataset.cast_column("input1", ImageFeature()) if "input2" in dataset.column_names: dataset = dataset.cast_column("input2", ImageFeature()) if "blending_results" in dataset.column_names: dataset = dataset.cast_column("blending_results", Sequence(ImageFeature())) elif "blending_result" in dataset.column_names: # Backward compatibility for old single-image format dataset = dataset.cast_column("blending_result", ImageFeature()) if "extra_images" in dataset.column_names: dataset = dataset.cast_column("extra_images", Sequence(ImageFeature())) if "negative_images" in dataset.column_names: dataset = dataset.cast_column("negative_images", Sequence(ImageFeature())) logging.debug(f"Successfully cast image columns to ImageFeature") except Exception as e: logging.warning(f"Could not cast image columns (may already be ImageFeature or incompatible format): {e}") # Continue anyway - images might already be ImageFeature objects or need different handling # Convert dataset to list, ensuring images are properly loaded # When ImageFeature is used, images come as Image objects that need to be accessed # Iterate through dataset and explicitly access image fields to trigger decoding data = [] for idx, entry in enumerate(dataset): # Access image fields directly from entry to trigger lazy loading/decoding # This is critical for DatasetImage objects which decode on access try: # Force access to image fields - this triggers decoding from hub storage input1_raw = entry["input1"] if "input1" in entry else None input2_raw = entry["input2"] if "input2" in entry else None # Support both new format (blending_results - list) and old format (blending_result - single) blending_results_raw = entry.get("blending_results", []) blending_result_raw = entry.get("blending_result", None) # Backward compatibility extra_images_raw = entry.get("extra_images", []) negative_images_raw = entry.get("negative_images", []) except Exception as e: logging.debug(f"Error accessing image fields in entry {idx}: {e}") input1_raw = None input2_raw = None blending_results_raw = [] blending_result_raw = None extra_images_raw = [] negative_images_raw = [] # Create a copy of the entry with decoded images entry_dict = dict(entry) # Add UUID if missing (backward compatibility for old entries) if "uuid" not in entry_dict or not entry_dict.get("uuid"): entry_dict["uuid"] = str(uuid.uuid4()) # Add is_public if missing (backward compatibility - old entries are public by default) if "is_public" not in entry_dict: entry_dict["is_public"] = True # Filter by public_only if requested if public_only and not entry_dict.get("is_public", True): continue # Convert Image objects to PIL Images using the raw accessed values # ImageFeature objects need to be converted to PIL Images for display try: # Handle single image fields using the raw accessed values entry_dict["input1"] = convert_dataset_image_to_pil(input1_raw) entry_dict["input2"] = convert_dataset_image_to_pil(input2_raw) # Handle blending results - support both new list format and old single image format if blending_results_raw: converted_results = [] for img_item in blending_results_raw: converted = convert_dataset_image_to_pil(img_item) if converted is not None: converted_results.append(converted) entry_dict["blending_results"] = converted_results elif blending_result_raw: # Backward compatibility: convert single image to list single_img = convert_dataset_image_to_pil(blending_result_raw) entry_dict["blending_results"] = [single_img] if single_img else [] else: entry_dict["blending_results"] = [] # Handle list image fields if extra_images_raw: converted_extra = [] for img_item in extra_images_raw: converted = convert_dataset_image_to_pil(img_item) if converted is not None: converted_extra.append(converted) entry_dict["extra_images"] = converted_extra else: entry_dict["extra_images"] = [] if negative_images_raw: converted_neg = [] for img_item in negative_images_raw: converted = convert_dataset_image_to_pil(img_item) if converted is not None: converted_neg.append(converted) entry_dict["negative_images"] = converted_neg else: entry_dict["negative_images"] = [] except Exception as e: logging.warning(f"Error converting images in entry {idx}: {e}") # Set to None if conversion fails entry_dict["input1"] = None entry_dict["input2"] = None entry_dict["blending_results"] = [] entry_dict["extra_images"] = [] entry_dict["negative_images"] = [] data.append(entry_dict) # Sort by timestamp to ensure proper chronological order # Parse timestamp and sort (oldest first by default) def get_timestamp(entry): timestamp_str = entry.get("timestamp", "") if not timestamp_str: return 0 try: # Parse ISO format timestamp (e.g., "2024-01-01T12:00:00" or "2024-01-01T12:00:00.123456") # Handle Z suffix for UTC if timestamp_str.endswith("Z"): timestamp_str = timestamp_str[:-1] + "+00:00" # Try parsing with timezone info first if "+" in timestamp_str or timestamp_str.count("-") > 2: # Has timezone info dt = datetime.fromisoformat(timestamp_str) else: # No timezone, assume naive datetime dt = datetime.fromisoformat(timestamp_str) return dt.timestamp() except (ValueError, AttributeError) as e: # If timestamp parsing fails, log and use 0 (will appear first in old-to-new) logging.debug(f"Could not parse timestamp '{timestamp_str}': {e}") return 0 # Sort by timestamp (oldest first) data.sort(key=get_timestamp) # Reverse if requested (newest first) if reverse: data.reverse() # Apply limit if specified if limit is not None: data = data[:limit] return data except Exception as e: logging.warning(f"Could not load dataset: {e}") return [] except Exception as e: logging.error(f"Error loading feedback from Hugging Face Dataset: {e}") return [] def create_feedback_viewer_tab(): """Create the feedback viewer tab interface.""" with gr.Tab("Feedback Viewer"): gr.Markdown(""" ## Feedback Viewer View submitted feedback and images from users. """) # Top controls group with gr.Group(): refresh_button = gr.Button("🔄 Refresh Feedback", variant="primary") # Main feedback display feedback_html = gr.HTML( label="Feedback Entries", value="

Click 'Refresh Feedback' to load entries.

" ) # Pagination controls group with gr.Group(): gr.Markdown("### Pagination") with gr.Row(): items_per_page_slider = gr.Slider( minimum=1, maximum=50, step=1, value=10, label="Items per page", scale=2 ) with gr.Column(scale=3): with gr.Row(): prev_page_button = gr.Button("◀ Previous", variant="secondary", scale=1) page_number = gr.Number( value=1, minimum=1, step=1, label="Page", precision=0, scale=2 ) next_page_button = gr.Button("Next ▶", variant="secondary", scale=1) total_pages_display = gr.Markdown("**Total Pages:** -") # Search controls group with gr.Group(): gr.Markdown("### Search & Filter") with gr.Row(): uuid_search_input = gr.Textbox( label="Search by UUID", placeholder="Enter UUID to search (leave empty to show all)", value="", scale=3 ) search_button = gr.Button("🔍 Search", variant="secondary", scale=1) with gr.Row(): timestamp_start_input = gr.Textbox( label="Start Timestamp", placeholder="YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS", value="", scale=1 ) timestamp_end_input = gr.Textbox( label="End Timestamp", placeholder="YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS", value="", scale=1 ) rating_filter = gr.Dropdown( choices=["All", "1", "2", "3", "4", "5"], value="All", label="Rating Filter", scale=1 ) with gr.Row(): filter_extra_images = gr.Checkbox( label="Only entries with extra images", value=False ) filter_negative_images = gr.Checkbox( label="Only entries with negative images", value=False ) sort_order_radio = gr.Radio( choices=["Old to New", "New to Old"], value="Old to New", label="Sort Order", scale=2 ) selected_details = gr.JSON(label="Full Feedback Details", visible=False) # Admin section - hidden behind accordion with gr.Accordion("⋮", open=False): gr.Markdown("### Admin Options") with gr.Row(): admin_password_input = gr.Textbox( label="Admin Password", placeholder="Enter admin password", type="password", value="", scale=2 ) include_private_checkbox = gr.Checkbox( label="Include private feedbacks", value=False, interactive=False, scale=1 ) verify_password_button = gr.Button("🔓 Verify", variant="secondary", scale=1) admin_status = gr.Markdown("") gr.Markdown("### Delete Entry") with gr.Row(): delete_uuid_input = gr.Textbox( label="UUID to Delete", placeholder="Enter UUID or prefix (e.g. e7132a33)", value="", scale=3 ) delete_button = gr.Button("🗑️ Delete", variant="stop", scale=1) delete_status = gr.Markdown("") def verify_admin_password(password: str, current_include_private: bool): """Verify admin password and enable/disable private feedback checkbox.""" if password == "admin": return ( gr.update(value=True, interactive=True), # Enable and check the checkbox "✅ Admin access granted. You can now view private feedbacks." ) else: return ( gr.update(value=False, interactive=False), # Disable and uncheck the checkbox "❌ Invalid password. Private feedbacks hidden." ) verify_password_button.click( verify_admin_password, inputs=[admin_password_input, include_private_checkbox], outputs=[include_private_checkbox, admin_status] ) def delete_entry_by_uuid(uuid_to_delete: str, admin_password: str): """Delete a feedback entry by UUID after password verification.""" # Check password if admin_password != "admin": return "❌ Invalid admin password. Enter password and click Verify first." if not uuid_to_delete or not uuid_to_delete.strip(): return "❌ Please enter a UUID to delete." uuid_to_delete = uuid_to_delete.strip() try: if not HF_DATASETS_AVAILABLE or load_dataset is None or login is None: return "❌ Hugging Face datasets library not available." if HF_TOKEN: login(token=HF_TOKEN, add_to_git_credential=True) if not HF_FEEDBACK_DATASET_REPO: return "❌ HF_FEEDBACK_DATASET_REPO not configured." # Load dataset dataset = load_dataset(HF_FEEDBACK_DATASET_REPO, split="train") original_count = len(dataset) if original_count == 0: return "❌ Dataset is empty. Nothing to delete." # First, find matching entries (exact or prefix match) search_term = uuid_to_delete.lower() matching_entries = [] for entry in dataset: entry_uuid = entry.get("uuid", "") if entry_uuid.lower() == search_term or entry_uuid.lower().startswith(search_term): matching_entries.append(entry_uuid) if len(matching_entries) == 0: return f"❌ No UUID matching '{uuid_to_delete}' found in dataset." elif len(matching_entries) > 1: matches_display = ", ".join([u[:12] + "..." for u in matching_entries[:5]]) if len(matching_entries) > 5: matches_display += f" (+{len(matching_entries) - 5} more)" return f"❌ Multiple UUIDs match '{uuid_to_delete}': {matches_display}. Please be more specific." # Exactly one match - use the full UUID uuid_to_delete_full = matching_entries[0] # Filter entries to keep (exclude the matched UUID) entries_to_keep = [] for entry in dataset: entry_uuid = entry.get("uuid", "") if entry_uuid == uuid_to_delete_full: continue else: entries_to_keep.append(dict(entry)) # Handle empty dataset case if len(entries_to_keep) == 0: # Create placeholder entry to avoid empty dataset issues import uuid as uuid_module placeholder = {} deleted_entry = next(entry for entry in dataset if entry.get("uuid", "") == uuid_to_delete_full) for key in dataset.features.keys(): if key == "timestamp": placeholder[key] = datetime.now().isoformat() elif key == "rating": placeholder[key] = 0 elif key == "feedback": placeholder[key] = "[PLACEHOLDER - This entry can be deleted]" elif key in ["alpha_start", "alpha_end"]: placeholder[key] = 0.0 elif key == "n_steps": placeholder[key] = 0 elif key in ["input1", "input2"]: placeholder[key] = deleted_entry.get(key) elif key in ["extra_images", "negative_images", "blending_results", "blending_result"]: placeholder[key] = [] elif key == "uuid": placeholder[key] = str(uuid_module.uuid4()) else: placeholder[key] = deleted_entry.get(key, "") new_dataset = Dataset.from_list([placeholder], features=dataset.features) new_dataset.push_to_hub( HF_FEEDBACK_DATASET_REPO, private=True, token=HF_TOKEN ) return f"✅ Deleted entry with UUID '{uuid_to_delete_full}'. Dataset now has 1 placeholder entry." # Create new dataset with remaining entries new_dataset = Dataset.from_list(entries_to_keep, features=dataset.features) # Push to hub new_dataset.push_to_hub( HF_FEEDBACK_DATASET_REPO, private=True, token=HF_TOKEN ) return f"✅ Successfully deleted entry with UUID '{uuid_to_delete_full}'. {len(entries_to_keep)} entries remaining." except Exception as e: logging.error(f"Error deleting entry: {e}") import traceback traceback.print_exc() return f"❌ Error deleting entry: {str(e)}" delete_button.click( delete_entry_by_uuid, inputs=[delete_uuid_input, admin_password_input], outputs=[delete_status] ) def parse_timestamp(timestamp_str): """Parse timestamp string to datetime object.""" if not timestamp_str or not timestamp_str.strip(): return None timestamp_str = timestamp_str.strip() # Try different formats formats = [ "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d", ] for fmt in formats: try: return datetime.strptime(timestamp_str, fmt) except ValueError: continue # Try ISO format parsing try: if timestamp_str.endswith("Z"): timestamp_str = timestamp_str[:-1] + "+00:00" return datetime.fromisoformat(timestamp_str) except (ValueError, AttributeError): pass return None def load_and_display_feedback(items_per_page, page, sort_order, uuid_search="", timestamp_start="", timestamp_end="", rating_filter="All", filter_extra=False, filter_negative=False, include_private=False): """Load feedback from dataset and format as HTML table with pagination.""" # Convert radio selection to reverse boolean sort_reverse = (sort_order == "New to Old") # Load all feedback entries (no limit, we'll paginate ourselves) # If include_private is True, set public_only to False all_feedbacks = load_feedback_from_hf_dataset(reverse=sort_reverse, public_only=not include_private) # Filter by UUID if search term provided if uuid_search and uuid_search.strip(): search_term = uuid_search.strip().lower() all_feedbacks = [ fb for fb in all_feedbacks if fb.get("uuid", "").lower().startswith(search_term) ] # Filter by timestamp range if provided start_dt = parse_timestamp(timestamp_start) end_dt = parse_timestamp(timestamp_end) if start_dt or end_dt: filtered_feedbacks = [] for fb in all_feedbacks: fb_timestamp_str = fb.get("timestamp", "") if not fb_timestamp_str: continue fb_dt = parse_timestamp(fb_timestamp_str) if not fb_dt: continue # Check if timestamp is within range if start_dt and fb_dt < start_dt: continue if end_dt and fb_dt > end_dt: continue filtered_feedbacks.append(fb) all_feedbacks = filtered_feedbacks # Filter by rating if specified if rating_filter and rating_filter != "All": try: rating_value = int(rating_filter) all_feedbacks = [ fb for fb in all_feedbacks if fb.get("rating", 0) == rating_value ] except (ValueError, TypeError): pass # Invalid rating filter, ignore it # Filter by extra images if checkbox is checked if filter_extra: all_feedbacks = [ fb for fb in all_feedbacks if fb.get("extra_images") and len(fb.get("extra_images", [])) > 0 ] # Filter by negative images if checkbox is checked if filter_negative: all_feedbacks = [ fb for fb in all_feedbacks if fb.get("negative_images") and len(fb.get("negative_images", [])) > 0 ] if not all_feedbacks: if uuid_search and uuid_search.strip(): gr.Info(f"No feedback entries found matching UUID: {uuid_search}") elif start_dt or end_dt: gr.Info("No feedback entries found in the specified timestamp range.") elif rating_filter and rating_filter != "All": gr.Info(f"No feedback entries found with rating: {rating_filter}") else: gr.Info("No feedback entries found. Make sure HF_FEEDBACK_DATASET_REPO is configured.") return "

No feedback entries found.

", 1, "**Total Pages:** 0" # Calculate pagination total_items = len(all_feedbacks) items_per_page = max(1, int(items_per_page)) total_pages = max(1, (total_items + items_per_page - 1) // items_per_page) page = max(1, min(int(page), total_pages)) # Get the slice of feedbacks for current page start_idx = (page - 1) * items_per_page end_idx = start_idx + items_per_page feedbacks = all_feedbacks[start_idx:end_idx] # Calculate global index offset for display global_start_idx = start_idx # Start building HTML table html_parts = [""" """] for local_idx, feedback in enumerate(feedbacks): # Calculate global index for display global_idx = global_start_idx + local_idx uuid_value = feedback.get("uuid", "N/A") timestamp = feedback.get("timestamp", "Unknown") # Format timestamp for display if len(timestamp) > 19: timestamp = timestamp[:19].replace("T", " ") rating = str(feedback.get("rating", "N/A")) feedback_text = feedback.get("feedback", "") or "" alpha_start = str(feedback.get("alpha_start", "N/A")) alpha_end = str(feedback.get("alpha_end", "N/A")) n_steps = str(feedback.get("n_steps", "N/A")) # Convert images from dataset format to PIL Images input1_img = convert_dataset_image_to_pil(feedback.get("input1")) input2_img = convert_dataset_image_to_pil(feedback.get("input2")) # Get blending results list (new format) or single result (old format) blending_results_raw = feedback.get("blending_results", []) blending_results_list = [] if blending_results_raw: for img in blending_results_raw: converted = convert_dataset_image_to_pil(img) if converted: blending_results_list.append(converted) if not blending_results_list: # Backward compatibility: check for old single image format old_result = convert_dataset_image_to_pil(feedback.get("blending_result")) if old_result: blending_results_list = [old_result] # Convert list of images extra_imgs = [] if feedback.get("extra_images"): for img in feedback.get("extra_images", []): converted = convert_dataset_image_to_pil(img) if converted: extra_imgs.append(converted) negative_imgs = [] if feedback.get("negative_images"): for img in feedback.get("negative_images", []): converted = convert_dataset_image_to_pil(img) if converted: negative_imgs.append(converted) # Column 1: Input Images input_images_html = '
' if input1_img: input1_base64 = pil_image_to_base64(input1_img, max_size=512) input_images_html += f'Input 1' else: input_images_html += '
No Input 1
' if input2_img: input2_base64 = pil_image_to_base64(input2_img, max_size=512) input_images_html += f'Input 2' else: input_images_html += '
No Input 2
' input_images_html += '
' # Column 2: Result Image - create grid from list of images if blending_results_list and len(blending_results_list) > 0: # Create grid from list of blending result images n_images = len(blending_results_list) cols = min(4, n_images) rows = math.ceil(n_images / cols) blending_result_grid = create_image_grid(blending_results_list, rows=rows, cols=cols) result_base64 = pil_image_to_base64(blending_result_grid, max_size=99999) result_html = f'
Result
' else: result_html = '
No Result
' # Column 3: Info uuid_display = uuid_value[:8] + "..." if len(uuid_value) > 12 else uuid_value options_html = f'''
Timestamp: {timestamp}
UUID: {uuid_display}
Rating: {rating}/5
Alpha: {alpha_start} → {alpha_end}
Steps: {n_steps}
Feedback: {feedback_text if feedback_text else "None"}
''' # Column 4: Extra Images if extra_imgs: extra_html = '' else: extra_html = '
None
' # Column 5: Negative Images if negative_imgs: negative_html = '' else: negative_html = '
None
' # Add row html_parts.append(f""" """) html_parts.append("
Input Images Result Image Info Extra Images Negative Images
{input_images_html} {result_html} {options_html} {extra_html} {negative_html}
") # Add pagination info pagination_info = f"""
Showing entries {start_idx + 1}-{min(end_idx, total_items)} of {total_items}
Page {page} of {total_pages}
""" html_parts.append(pagination_info) html_content = "".join(html_parts) gr.Info(f"Loaded {len(feedbacks)} feedback entries (page {page}/{total_pages})") return html_content, page, f"**Total Pages:** {total_pages}" # Refresh button - loads first page def refresh_feedback(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) refresh_button.click( refresh_feedback, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Sort order radio - reset to page 1 when sort order changes def on_sort_order_change(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): # Reset to page 1 when sort order changes return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) sort_order_radio.change( on_sort_order_change, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Search button def on_search(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) search_button.click( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Rating filter change rating_filter.change( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Checkbox changes filter_extra_images.change( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) filter_negative_images.change( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Include private checkbox change - refresh when toggled include_private_checkbox.change( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Pagination controls def on_page_change(items_per_page, page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): return load_and_display_feedback(items_per_page, page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) def on_items_per_page_change(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) page_number.change( on_page_change, inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) items_per_page_slider.change( on_items_per_page_change, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Previous/Next page buttons def go_to_previous_page(items_per_page, current_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): new_page = max(1, int(current_page) - 1) return load_and_display_feedback(items_per_page, new_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) def go_to_next_page(items_per_page, current_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): # Convert radio selection to reverse boolean sort_reverse = (sort_order == "New to Old") # Load all feedbacks to calculate total pages (with filters if applicable) all_feedbacks = load_feedback_from_hf_dataset(reverse=sort_reverse, public_only=not include_private) # Apply UUID filter if search term provided if uuid_search and uuid_search.strip(): search_term = uuid_search.strip().lower() all_feedbacks = [ fb for fb in all_feedbacks if fb.get("uuid", "").lower().startswith(search_term) ] # Apply timestamp range filter start_dt = parse_timestamp(timestamp_start) end_dt = parse_timestamp(timestamp_end) if start_dt or end_dt: filtered_feedbacks = [] for fb in all_feedbacks: fb_timestamp_str = fb.get("timestamp", "") if not fb_timestamp_str: continue fb_dt = parse_timestamp(fb_timestamp_str) if not fb_dt: continue if start_dt and fb_dt < start_dt: continue if end_dt and fb_dt > end_dt: continue filtered_feedbacks.append(fb) all_feedbacks = filtered_feedbacks # Apply rating filter if specified if rating_filter and rating_filter != "All": try: rating_value = int(rating_filter) all_feedbacks = [ fb for fb in all_feedbacks if fb.get("rating", 0) == rating_value ] except (ValueError, TypeError): pass # Invalid rating filter, ignore it # Apply extra images filter if checkbox is checked if filter_extra: all_feedbacks = [ fb for fb in all_feedbacks if fb.get("extra_images") and len(fb.get("extra_images", [])) > 0 ] # Apply negative images filter if checkbox is checked if filter_negative: all_feedbacks = [ fb for fb in all_feedbacks if fb.get("negative_images") and len(fb.get("negative_images", [])) > 0 ] if not all_feedbacks: return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) total_items = len(all_feedbacks) items_per_page = max(1, int(items_per_page)) total_pages = max(1, (total_items + items_per_page - 1) // items_per_page) new_page = min(total_pages, int(current_page) + 1) return load_and_display_feedback(items_per_page, new_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) prev_page_button.click( go_to_previous_page, inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) next_page_button.click( go_to_next_page, inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) # Allow Enter key to trigger search uuid_search_input.submit( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) timestamp_start_input.submit( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) timestamp_end_input.submit( on_search, inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], outputs=[feedback_html, page_number, total_pages_display] ) if __name__ == "__main__": """Run the feedback viewer as a standalone application.""" logging.basicConfig(level=logging.INFO) # Create a Gradio interface with just the feedback viewer tab demo = gr.Blocks() with demo: create_feedback_viewer_tab() # Launch the demo demo.launch( share=True, show_error=True )