Spaces:
Running
Running
import gradio as gr | |
import torch | |
from diffusers import StableDiffusionPipeline, StableDiffusionXLImg2ImgPipeline, AutoPipelineForText2Image | |
from diffusers.utils import load_image | |
from PIL import Image | |
import time | |
import random | |
import os | |
import gc # Garbage collector | |
import logging | |
# --- Configuration --- | |
# Setup basic logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Ensure CPU is used | |
DEVICE = "cpu" | |
TORCH_DTYPE = torch.float32 # float16/bfloat16 not practical on CPU | |
# Model definitions | |
# We need to know the base model for LoRAs and compatible IP-Adapters | |
MODEL_CONFIG = { | |
"BlaireSilver13/youtube-thumbnail": { | |
"repo_id": "BlaireSilver13/youtube-thumbnail", | |
"is_lora": True, | |
"lora_filename": "FLUX-youtube-thumbnails.safetensors", | |
"base_model": "black-forest-labs/FLUX.1-dev", | |
"pipeline_class": AutoPipelineForText2Image, | |
"ip_adapter_repo": "h94/IP-Adapter", | |
"ip_adapter_weights": "ip-adapter_sd15.bin", | |
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" | |
}, | |
"itzzdeep/youtube-thumbnails-sdxl-lora": { | |
"repo_id": "itzzdeep/youtube-thumbnails-sdxl-lora", | |
"is_lora": True, | |
"lora_filename": "youtube-thumbnails-sdxl-lora.safetensors", | |
"base_model": "stabilityai/stable-diffusion-xl-base-1.0", | |
"pipeline_class": AutoPipelineForText2Image, # Handles SDXL loading better | |
"ip_adapter_repo": "h94/IP-Adapter", # SDXL IP-Adapter repo | |
"ip_adapter_weights": "ip-adapter-plus_sdxl_vit-h.bin", # SDXL weights | |
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" # Usually the same encoder repo | |
}, | |
"justmalhar/flux-thumbnails-v3": { | |
"repo_id": "justmalhar/flux-thumbnails-v3", | |
"is_lora": False, # Assuming this is a full SD 1.5 fine-tune based on common practice | |
"base_model": None, | |
"pipeline_class": StableDiffusionPipeline, | |
"ip_adapter_repo": "h94/IP-Adapter", | |
"ip_adapter_weights": "ip-adapter_sd15.bin", | |
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" | |
}, | |
"saq1b/mrbeast-thumbnail-style": { | |
"repo_id": "saq1b/mrbeast-thumbnail-style", | |
"is_lora": True, # This is typically a LoRA | |
"lora_filename": None, # Auto-detect or specify e.g., "pytorch_lora_weights.safetensors" | |
"base_model": "runwayml/stable-diffusion-v1-5", # Common base for SD 1.5 LoRAs | |
"pipeline_class": StableDiffusionPipeline, | |
"ip_adapter_repo": "h94/IP-Adapter", | |
"ip_adapter_weights": "ip-adapter_sd15.bin", | |
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" | |
} | |
} | |
AVAILABLE_MODELS = list(MODEL_CONFIG.keys()) | |
# Global variable to potentially hold the pipeline to avoid reloading *if memory allows* | |
# NOTE: On restricted CPU environments, it's SAFER to load inside the function. | |
# Set to None initially. Let's load dynamically inside the function for safety. | |
# current_pipeline = None | |
# current_model_key = None | |
# --- Helper Functions --- | |
def cleanup_memory(): | |
"""Attempts to free GPU memory (less relevant for CPU but good practice).""" | |
logger.info("Attempting to clean up memory...") | |
try: | |
# If a pipeline exists globally (if we change strategy), unload it | |
# global current_pipeline, current_model_key | |
# if current_pipeline is not None: | |
# logger.info(f"Unloading model {current_model_key} from memory.") | |
# del current_pipeline | |
# current_pipeline = None | |
# current_model_key = None | |
gc.collect() | |
if torch.cuda.is_available(): # Only run cuda cache empty if cuda is present | |
torch.cuda.empty_cache() | |
logger.info("Memory cleanup potentially done.") | |
except Exception as e: | |
logger.error(f"Error during memory cleanup: {e}") | |
# --- Main Generation Function --- | |
def generate_thumbnail( | |
model_key: str, | |
prompt: str, | |
negative_prompt: str, | |
reference_image_pil: Image.Image | None, # Gradio provides PIL image | |
num_inference_steps: int, | |
guidance_scale: float, | |
seed: int, | |
ip_adapter_scale: float, | |
progress=gr.Progress() | |
): | |
"""Generates an image using the selected model, IP-Adapter, and settings.""" | |
start_time = time.time() | |
debug_log = f"--- Generation Log ({time.strftime('%Y-%m-%d %H:%M:%S')}) ---\n" | |
debug_log += f"Selected Model Key: {model_key}\n" | |
debug_log += f"Prompt: {prompt}\n" | |
debug_log += f"Negative Prompt: {negative_prompt}\n" | |
debug_log += f"Steps: {num_inference_steps}, CFG Scale: {guidance_scale}\n" | |
debug_log += f"Seed: {seed}\n" | |
debug_log += f"Reference Image Provided: {'Yes' if reference_image_pil else 'No'}\n" | |
debug_log += f"IP Adapter Scale: {ip_adapter_scale}\n" | |
debug_log += f"Device: {DEVICE}, Dtype: {TORCH_DTYPE}\n\n" | |
pipeline = None # Ensure pipeline is defined in this scope | |
try: | |
if not model_key: | |
raise ValueError("No model selected.") | |
config = MODEL_CONFIG[model_key] | |
repo_id = config["repo_id"] | |
is_lora = config["is_lora"] | |
base_model = config["base_model"] | |
pipeline_class = config["pipeline_class"] | |
ip_adapter_repo = config["ip_adapter_repo"] | |
ip_adapter_weights = config["ip_adapter_weights"] | |
# ip_adapter_image_encoder = config["ip_adapter_image_encoder"] # Encoder loaded via IP-Adapter itself usually | |
# --- Model Loading --- | |
load_start_time = time.time() | |
debug_log += f"[{time.time() - start_time:.2f}s] Cleaning up memory before loading...\n" | |
progress(0.1, desc="Cleaning up memory...") | |
cleanup_memory() # Attempt cleanup before loading new model | |
debug_log += f"[{time.time() - start_time:.2f}s] Loading model: {'LoRA ' + repo_id if is_lora else repo_id}...\n" | |
progress(0.2, desc=f"Loading {'LoRA ' + repo_id if is_lora else repo_id}...") | |
model_load_id = base_model if is_lora else repo_id | |
debug_log += f"[{time.time() - start_time:.2f}s] Base/Model ID for pipeline: {model_load_id}\n" | |
pipeline = pipeline_class.from_pretrained( | |
model_load_id, | |
torch_dtype=TORCH_DTYPE, | |
# Add any specific args needed for the pipeline class if necessary | |
# safety_checker=None, # Disable safety checker if needed/causes issues on CPU | |
# requires_safety_checker=False, | |
) | |
pipeline.to(DEVICE) | |
debug_log += f"[{time.time() - start_time:.2f}s] Base pipeline loaded onto {DEVICE}.\n" | |
if is_lora: | |
lora_load_start = time.time() | |
debug_log += f"[{time.time() - start_time:.2f}s] Loading LoRA weights from {repo_id}...\n" | |
progress(0.4, desc=f"Loading LoRA {repo_id}...") | |
try: | |
lora_filename = config.get("lora_filename") # Get specific filename if provided | |
if lora_filename: | |
debug_log += f"[{time.time() - start_time:.2f}s] Using specified LoRA filename: {lora_filename}\n" | |
pipeline.load_lora_weights(repo_id, weight_name=lora_filename, torch_dtype=TORCH_DTYPE) | |
else: | |
# Let diffusers try to auto-detect standard names like .safetensors or .bin | |
debug_log += f"[{time.time() - start_time:.2f}s] Attempting auto-detection of LoRA filename.\n" | |
pipeline.load_lora_weights(repo_id, torch_dtype=TORCH_DTYPE) | |
# When using LoRA with diffusers >= 0.22, explicitly fuse *or* set adapters | |
# pipeline.fuse_lora() # Fuse creates a new pipeline state (might use more memory) | |
pipeline.set_adapters(pipeline.get_active_adapters(), adapter_weights=1.0) # Recommended for flexibility | |
debug_log += f"[{time.time() - start_time:.2f}s] LoRA weights loaded and adapters set in {time.time() - lora_load_start:.2f}s.\n" | |
except Exception as e: | |
debug_log += f"[{time.time() - start_time:.2f}s] ERROR loading LoRA: {e}. Check LoRA repo structure/filename.\n" | |
# Decide whether to continue without LoRA or raise error | |
raise ValueError(f"Failed to load LoRA weights for {repo_id}: {e}") | |
# --- IP Adapter Loading --- | |
if reference_image_pil and ip_adapter_scale > 0: | |
ip_load_start = time.time() | |
debug_log += f"[{time.time() - start_time:.2f}s] Loading IP-Adapter: {ip_adapter_repo} ({ip_adapter_weights})...\n" | |
progress(0.6, desc="Loading IP-Adapter...") | |
try: | |
# Ensure the pipeline has the load_ip_adapter method | |
if not hasattr(pipeline, "load_ip_adapter"): | |
raise AttributeError("The current pipeline class does not support load_ip_adapter. Check diffusers version or pipeline type.") | |
pipeline.load_ip_adapter( | |
ip_adapter_repo, | |
subfolder="models", # Common subfolder, adjust if needed | |
weight_name=ip_adapter_weights, | |
# image_encoder_folder=ip_adapter_image_encoder # Let diffusers handle encoder loading usually | |
) | |
pipeline.set_ip_adapter_scale(ip_adapter_scale) | |
debug_log += f"[{time.time() - start_time:.2f}s] IP-Adapter loaded and scale set ({ip_adapter_scale}) in {time.time() - ip_load_start:.2f}s.\n" | |
# Prepare the image for IP-Adapter (often just needs to be a PIL image) | |
ip_image = reference_image_pil.convert("RGB") | |
debug_log += f"[{time.time() - start_time:.2f}s] Reference image prepared for IP-Adapter.\n" | |
except Exception as e: | |
debug_log += f"[{time.time() - start_time:.2f}s] WARNING: Failed to load IP-Adapter: {e}. Proceeding without image guidance.\n" | |
ip_image = None | |
ip_adapter_scale = 0 # Effectively disable it if loading failed | |
pipeline.set_ip_adapter_scale(0) # Ensure scale is 0 | |
else: | |
ip_image = None | |
if hasattr(pipeline, "set_ip_adapter_scale"): | |
pipeline.set_ip_adapter_scale(0) # Ensure scale is 0 if no image/scale=0 | |
debug_log += f"[{time.time() - start_time:.2f}s] No reference image provided or IP Adapter scale is 0. Skipping IP-Adapter loading.\n" | |
debug_log += f"[{time.time() - start_time:.2f}s] Total Model & IP-Adapter Loading time: {time.time() - load_start_time:.2f}s\n" | |
# --- Generation --- | |
gen_start_time = time.time() | |
debug_log += f"[{time.time() - start_time:.2f}s] Starting generation...\n" | |
progress(0.7, desc="Generating image...") | |
# Handle seed | |
if seed == -1: | |
seed = random.randint(0, 2**32 - 1) | |
debug_log += f"[{time.time() - start_time:.2f}s] Using random seed: {seed}\n" | |
generator = torch.Generator(device=DEVICE).manual_seed(seed) | |
# Prepare arguments for pipeline call | |
pipeline_args = { | |
"prompt": prompt, | |
"negative_prompt": negative_prompt, | |
"num_inference_steps": num_inference_steps, | |
"guidance_scale": guidance_scale, | |
"generator": generator, | |
} | |
# Add IP-Adapter image if it's loaded and ready | |
if ip_image is not None and hasattr(pipeline, "set_ip_adapter_scale") and ip_adapter_scale > 0: | |
pipeline_args["ip_adapter_image"] = ip_image | |
# Scale was set earlier with set_ip_adapter_scale | |
debug_log += f"[{time.time() - start_time:.2f}s] Passing reference image to pipeline with IP scale {ip_adapter_scale}.\n" | |
else: | |
debug_log += f"[{time.time() - start_time:.2f}s] Not passing reference image to pipeline.\n" | |
# Run inference | |
with torch.inference_mode(): # More modern than no_grad for inference | |
output_image = pipeline(**pipeline_args).images[0] | |
gen_end_time = time.time() | |
debug_log += f"[{time.time() - start_time:.2f}s] Generation finished in {gen_end_time - gen_start_time:.2f}s.\n" | |
# --- Cleanup --- | |
debug_log += f"[{time.time() - start_time:.2f}s] Unloading model from memory (CPU strategy)...\n" | |
progress(0.95, desc="Cleaning up...") | |
del pipeline # Explicitly delete pipeline | |
cleanup_memory() # Call garbage collection | |
total_time = time.time() - start_time | |
debug_log += f"\n--- Total time: {total_time:.2f} seconds ---\n" | |
return output_image, debug_log | |
except Exception as e: | |
logger.exception(f"Error during generation for model {model_key}") # Log full traceback | |
error_time = time.time() - start_time | |
debug_log += f"\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" | |
debug_log += f"ERROR occurred after {error_time:.2f}s:\n{e}\n" | |
debug_log += f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" | |
# Try cleanup even on error | |
if 'pipeline' in locals() and pipeline is not None: | |
del pipeline | |
cleanup_memory() | |
# Return None for image, and the log containing the error | |
return None, debug_log | |
# --- Gradio Interface --- | |
css = """ | |
#warning { | |
background-color: #FFCCCB; /* Light red */ | |
padding: 10px; | |
border-radius: 5px; | |
text-align: center; | |
font-weight: bold; | |
} | |
#debug_log_area textarea { | |
font-family: monospace; | |
font-size: 10px; /* Smaller font for logs */ | |
white-space: pre-wrap; /* Wrap long lines */ | |
word-wrap: break-word; /* Break words if necessary */ | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown("# YouTube Thumbnail Generator with IP-Adapter") | |
gr.Markdown( | |
"Select a thumbnail model, provide a text prompt, and optionally upload a reference image " | |
"to guide the generation using IP-Adapter." | |
) | |
gr.HTML("<div id='warning'>⚠️ Warning: Inference on CPU is VERY SLOW (minutes per image, especially SDXL models). Please be patient.</div>") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
model_dropdown = gr.Dropdown( | |
label="Select Thumbnail Model", | |
choices=AVAILABLE_MODELS, | |
value=AVAILABLE_MODELS[0] if AVAILABLE_MODELS else None, | |
) | |
prompt_input = gr.Textbox(label="Prompt", lines=3, placeholder="e.g., Epic landscape, dramatic lighting, YouTube thumbnail style") | |
negative_prompt_input = gr.Textbox(label="Negative Prompt", lines=2, placeholder="e.g., blurry, low quality, text, signature, watermark") | |
reference_image_input = gr.Image(label="Reference Image (for IP-Adapter)", type="pil", sources=["upload"]) | |
with gr.Accordion("Advanced Settings", open=False): | |
steps_slider = gr.Slider(label="Inference Steps", minimum=10, maximum=100, value=30, step=1) | |
cfg_slider = gr.Slider(label="Guidance Scale (CFG)", minimum=1.0, maximum=20.0, value=7.0, step=0.5) | |
ip_adapter_scale_slider = gr.Slider(label="IP-Adapter Scale", minimum=0.0, maximum=1.5, value=0.6, step=0.05, | |
info="Strength of the reference image influence (0 = disabled).") | |
seed_input = gr.Number(label="Seed", value=-1, precision=0, info="-1 for random seed") | |
generate_button = gr.Button("Generate Thumbnail", variant="primary") | |
with gr.Column(scale=1): | |
output_image = gr.Image(label="Generated Thumbnail", type="pil") | |
debug_output = gr.Textbox(label="Debug Log", lines=20, interactive=False, elem_id="debug_log_area") | |
generate_button.click( | |
fn=generate_thumbnail, | |
inputs=[ | |
model_dropdown, | |
prompt_input, | |
negative_prompt_input, | |
reference_image_input, | |
steps_slider, | |
cfg_slider, | |
seed_input, | |
ip_adapter_scale_slider | |
], | |
outputs=[output_image, debug_output] | |
) | |
# --- Launch --- | |
if __name__ == "__main__": | |
logger.info("Starting Gradio App...") | |
# Queueing is important for handling multiple users on Spaces, even if slow | |
demo.queue().launch(debug=True) # debug=True provides Gradio debug info in console |