import logging import os import tempfile import uuid from typing import List, Union, Optional from datetime import datetime from pathlib import Path import gradio as gr from PIL import Image import numpy as np import pandas as pd from vibe_blending import run_vibe_blend_safe, run_vibe_blend_not_safe from ipadapter_model import create_image_grid from feedback_viewer import create_feedback_viewer_tab, store_feedback_to_hf_dataset # Hugging Face Datasets for feedback storage try: from datasets import Dataset, load_dataset # type: ignore from huggingface_hub import login # type: ignore HF_DATASETS_AVAILABLE = True except ImportError: Dataset = None # type: ignore load_dataset = None # type: ignore login = None # type: ignore HF_DATASETS_AVAILABLE = False logging.warning("Hugging Face datasets not available. Feedback will not be stored.") def _get_env_flag(name: str, default: bool) -> bool: raw_value = os.getenv(name) if raw_value is None: return default return raw_value.strip().lower() in {"1", "true", "yes", "on"} USE_HUGGINGFACE_ZEROGPU = _get_env_flag("USE_HUGGINGFACE_ZEROGPU", default=True) DEFAULT_CONFIG_PATH = "./config.yaml" # Hugging Face Dataset repository for storing feedback # Set this to your Hugging Face username/dataset-name, e.g., "your-username/vibe-blending-feedback" HF_FEEDBACK_DATASET_REPO = os.getenv("HF_FEEDBACK_DATASET_REPO", None) HF_TOKEN = os.getenv("HF_TOKEN", None) if USE_HUGGINGFACE_ZEROGPU: try: import spaces except ImportError: USE_HUGGINGFACE_ZEROGPU = False logging.warning("HuggingFace Spaces not available, running without GPU acceleration") if USE_HUGGINGFACE_ZEROGPU: run_vibe_blend_safe = spaces.GPU(duration=90)(run_vibe_blend_safe) run_vibe_blend_not_safe = spaces.GPU(duration=90)(run_vibe_blend_not_safe) try: from download_models import download_ipadapter download_ipadapter() except ImportError: logging.warning("Could not import download_models") def create_gif_from_images(images: List[Image.Image], fps: float = 3.0) -> str: """ Create a GIF from a list of PIL Images. Args: images: List of PIL Images to combine into a GIF fps: Frames per second for the GIF (default: 3.0) Returns: Path to the temporary GIF file """ if not images: return None # Calculate duration in milliseconds (1000ms / fps) duration_ms = int(1000 / fps) # Create a temporary file for the GIF gif_path = os.path.join(tempfile.gettempdir(), f"vibe_blend_{uuid.uuid4().hex}.gif") # Save as GIF with loop images[0].save( gif_path, save_all=True, append_images=images[1:], duration=duration_ms, loop=0 # 0 = infinite loop ) return gif_path def load_gradio_images_helper(pil_images: Union[List, Image.Image, str]) -> List[Image.Image]: """ Convert various image input formats to a list of PIL Images. """ if pil_images is None: return [] # Handle single image if isinstance(pil_images, np.ndarray): return Image.fromarray(pil_images).convert("RGB") if isinstance(pil_images, Image.Image): return pil_images.convert("RGB") if isinstance(pil_images, str): return Image.open(pil_images).convert("RGB") # Handle list of images processed_images = [] for image in pil_images: if isinstance(image, tuple): # Gradio gallery format image = image[0] if isinstance(image, str): image = Image.open(image) elif isinstance(image, Image.Image): pass # Already PIL Image else: continue processed_images.append(image.convert("RGB")) return processed_images def create_vibe_blending_tab(): """Create the vibe blending tab interface.""" with gr.Tab("Vibe Blending"): gr.Markdown(""" ## Vibe Blending Demo This is the demo for the paper "*Vibe Spaces for Creatively Connecting and Expressing Visual Concepts*". [Paper]() | [Code]() | [Website]() Given a pair of images, vibe blending will generate a set of images that creatively connect the input images. """) with gr.Row(): with gr.Column(): with gr.Group(): with gr.Row(): gr.Markdown("**Step 1:** Upload 2 images") with gr.Row(): input1 = gr.Image(label="Input 1", show_label=True, format="png") input2 = gr.Image(label="Input 2", show_label=True, format="png") with gr.Accordion("Options", open=False): with gr.Group(): with gr.Row(): alpha_start = gr.Slider(minimum=0, maximum=2, step=0.1, value=0.0, label="Start α", info="interpolation weight") alpha_end = gr.Slider(minimum=0, maximum=2, step=0.1, value=1.0, label="End α", info="use α>1 for extrapolation") # n_steps = gr.Slider(minimum=1, maximum=40, step=1, value=10, label="Number of Output Images") n_steps = gr.Number(value=12, label="Number of Output Images", interactive=True) with gr.Row(): extra_images = gr.Gallery(label="Extra Images (optional)", show_label=True, columns=3, rows=2, height=150) negative_images = gr.Gallery(label="Negative Images (optional)", show_label=True, columns=3, rows=2, height=150) with gr.Group(): gr.Markdown("**Step 3:** Submit your feedback") rating = gr.Radio(label="How do you like the results?", choices=["1", "2", "3", "4", "5"]) feedback_form = gr.TextArea(label="Feedback (optional)", show_label=True, lines=1, placeholder="Enter your feedback here...") make_public = gr.Checkbox(label="Make feedback public", value=True, info="Allow this feedback to be visible in the public feedback viewer") feedback_button = gr.Button("Submit Feedback", variant="secondary", size="sm") with gr.Column(): with gr.Group(): gr.Markdown("**Step 2:** Run Vibe Blending") blending_results = gr.Gallery(label="Gallery View", show_label=False, columns=4, rows=3, interactive=False) with gr.Accordion("Vibe Blending Results (grid view)", open=False): blending_results_grid = gr.Image(label="Grid View", show_label=True, format="png", interactive=False) with gr.Accordion("Vibe Blending Results (GIF view)", open=False): blending_results_gif = gr.Image(label="GIF View", show_label=True, format="gif", interactive=False) blend_button = gr.Button("🔴 Run Vibe Blending", variant="primary") def _process_input_images(input1, input2, extra_images, negative_images): input1 = load_gradio_images_helper(input1) input2 = load_gradio_images_helper(input2) extra_images = load_gradio_images_helper(extra_images) negative_images = load_gradio_images_helper(negative_images) if extra_images is None: extra_images = [] elif isinstance(extra_images, Image.Image): extra_images = [extra_images] if negative_images is None: negative_images = [] elif isinstance(negative_images, Image.Image): negative_images = [negative_images] return input1, input2, extra_images, negative_images # Training wrapper function def blend_button_click(input1, input2, extra_images, negative_images, alpha_start, alpha_end, n_steps): input1, input2, extra_images, negative_images = _process_input_images(input1, input2, extra_images, negative_images) alpha_weights = np.linspace(alpha_start, alpha_end, n_steps+2)[1:-1].tolist() blended_images_list = run_vibe_blend_not_safe(input1, input2, extra_images, negative_images, DEFAULT_CONFIG_PATH, alpha_weights) blended_images_grid = create_image_grid(blended_images_list, rows=np.ceil(len(blended_images_list)/4).astype(int), cols=4) # Create GIF at 3 frames per second gif_path = create_gif_from_images(blended_images_list, fps=3.0) return blended_images_grid, blended_images_list, gif_path # Return grid, list, and GIF for display blend_button.click(blend_button_click, inputs=[input1, input2, extra_images, negative_images, alpha_start, alpha_end, n_steps], outputs=[blending_results_grid, blending_results, blending_results_gif]) def feedback_button_click(rating, feedback_form, make_public, input1, input2, extra_images, negative_images, alpha_start, alpha_end, n_steps, blending_results): """Handle feedback submission and store to Hugging Face Dataset.""" if not rating: gr.Warning("Please select a rating before submitting feedback.") return gr.update(value=None), gr.update(value=""), gr.update(value=True) # Validate that required images exist if input1 is None: gr.Warning("Please upload Input 1 image before submitting feedback.") return gr.update(value=rating), gr.update(value=feedback_form), gr.update(value=make_public) if input2 is None: gr.Warning("Please upload Input 2 image before submitting feedback.") return gr.update(value=rating), gr.update(value=feedback_form), gr.update(value=make_public) if blending_results is None or len(blending_results) == 0: gr.Warning("Please run vibe blending first to generate results before submitting feedback.") return gr.update(value=rating), gr.update(value=feedback_form), gr.update(value=make_public) # Process images to check if they exist input1_img, input2_img, extra_images_processed, negative_images_processed = _process_input_images( input1, input2, extra_images, negative_images ) # Extract images from gallery (gallery returns list of tuples (image_path, caption) or just image paths) blending_result_images = [] for item in blending_results: if isinstance(item, tuple): image_path = item[0] else: image_path = item blending_result_images.append(Image.open(image_path).convert("RGB")) # Store feedback and images to Hugging Face Dataset (upload list of images) success = store_feedback_to_hf_dataset( rating=rating, feedback_text=feedback_form or "", alpha_start=alpha_start, alpha_end=alpha_end, n_steps=n_steps, input1_image=input1_img, input2_image=input2_img, extra_images=extra_images_processed if extra_images_processed else None, negative_images=negative_images_processed if negative_images_processed else None, blending_result_images=blending_result_images, # Upload list of images is_public=make_public, # Pass the public flag ) if success: gr.Info("Thank you! Your feedback has been submitted successfully.") return gr.update(value=None), gr.update(value=""), gr.update(value=True) # Reset rating, feedback form, and checkbox else: error_msg = "Feedback could not be stored. " if not HF_FEEDBACK_DATASET_REPO: error_msg += "Please configure `HF_FEEDBACK_DATASET_REPO` environment variable (e.g., 'your-username/vibe-blending-feedback')." else: error_msg += "Please check the logs for details." gr.Warning(error_msg) return gr.update(value=rating), gr.update(value=feedback_form), gr.update(value=make_public) # Keep rating, feedback form, and checkbox feedback_button.click( feedback_button_click, inputs=[rating, feedback_form, make_public, input1, input2, extra_images, negative_images, alpha_start, alpha_end, n_steps, blending_results], outputs=[rating, feedback_form, make_public] # Reset rating, feedback form, and checkbox ) example_cases = [ [Image.open("./images/playviolin_hr.png"), Image.open("./images/playguitar_hr.png")], [Image.open("./images/input_cat.png"), Image.open("./images/input_bread.png")], [Image.open("./images/02140_left.jpg"), Image.open("./images/02140_right.jpg")], #[Image.open("./images/02718_l.jpg"), Image.open("./images/02718_r.jpg")], [Image.open("./images/03969_l.jpg"), Image.open("./images/03969_r.jpg")], [Image.open("./images/04963_l.jpg"), Image.open("./images/04963_r.jpg")], #[Image.open("./images/05358_l.jpg"), Image.open("./images/05358_r.jpg")], [Image.open("./images/00436_l.jpg"), Image.open("./images/00436_r.jpg")], [Image.open("./images/archi/input_A.jpg"), Image.open("./images/archi/input_B.jpg")], ] gr.Examples(examples=example_cases, label="Example Cases", inputs=[input1, input2], outputs=[blending_results_grid, blending_results]) extra_image_examples = [ [Image.open("./images/archi/input_A.jpg"), Image.open("./images/archi/input_B.jpg"), [Image.open("./images/archi/extra1.jpg"), Image.open("./images/archi/extra2.jpg"), Image.open("./images/archi/extra3.jpg")]], ] gr.Examples(examples=extra_image_examples, label="Extra Image Examples", inputs=[input1, input2, extra_images], outputs=[blending_results_grid, blending_results]) negative_image_examples = [ [Image.open("./images/pink_bear1.jpg"), Image.open("./images/black_bear2.jpg"), [Image.open("./images/pink_bear1.jpg"), Image.open("./images/black_bear1.jpg")]], ] gr.Examples(examples=negative_image_examples, label="Negative Image Examples", inputs=[input1, input2, negative_images], outputs=[blending_results_grid, blending_results]) # Feedback viewer functions moved to feedback_viewer.py def create_merged_interface(): """Create merged interface with both tabs.""" theme = gr.themes.Base( spacing_size='md', text_size='lg', primary_hue='blue', neutral_hue='slate', secondary_hue='pink' ) demo = gr.Blocks(theme=theme) with demo: # Create both tabs create_vibe_blending_tab() create_feedback_viewer_tab() return demo if __name__ == "__main__": logging.basicConfig(level=logging.INFO) demo = create_merged_interface() demo.launch( share=True, server_name="0.0.0.0" if USE_HUGGINGFACE_ZEROGPU else None, show_error=True )