sdafd's picture
Update app.py
db62cdf verified
import gradio as gr
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionXLImg2ImgPipeline, AutoPipelineForText2Image
from diffusers.utils import load_image
from PIL import Image
import time
import random
import os
import gc # Garbage collector
import logging
# --- Configuration ---
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Ensure CPU is used
DEVICE = "cpu"
TORCH_DTYPE = torch.float32 # float16/bfloat16 not practical on CPU
# Model definitions
# We need to know the base model for LoRAs and compatible IP-Adapters
MODEL_CONFIG = {
"BlaireSilver13/youtube-thumbnail": {
"repo_id": "BlaireSilver13/youtube-thumbnail",
"is_lora": True,
"lora_filename": "FLUX-youtube-thumbnails.safetensors",
"base_model": "black-forest-labs/FLUX.1-dev",
"pipeline_class": AutoPipelineForText2Image,
"ip_adapter_repo": "h94/IP-Adapter",
"ip_adapter_weights": "ip-adapter_sd15.bin",
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K"
},
"itzzdeep/youtube-thumbnails-sdxl-lora": {
"repo_id": "itzzdeep/youtube-thumbnails-sdxl-lora",
"is_lora": True,
"lora_filename": "youtube-thumbnails-sdxl-lora.safetensors",
"base_model": "stabilityai/stable-diffusion-xl-base-1.0",
"pipeline_class": AutoPipelineForText2Image, # Handles SDXL loading better
"ip_adapter_repo": "h94/IP-Adapter", # SDXL IP-Adapter repo
"ip_adapter_weights": "ip-adapter-plus_sdxl_vit-h.bin", # SDXL weights
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K" # Usually the same encoder repo
},
"justmalhar/flux-thumbnails-v3": {
"repo_id": "justmalhar/flux-thumbnails-v3",
"is_lora": False, # Assuming this is a full SD 1.5 fine-tune based on common practice
"base_model": None,
"pipeline_class": StableDiffusionPipeline,
"ip_adapter_repo": "h94/IP-Adapter",
"ip_adapter_weights": "ip-adapter_sd15.bin",
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K"
},
"saq1b/mrbeast-thumbnail-style": {
"repo_id": "saq1b/mrbeast-thumbnail-style",
"is_lora": True, # This is typically a LoRA
"lora_filename": None, # Auto-detect or specify e.g., "pytorch_lora_weights.safetensors"
"base_model": "runwayml/stable-diffusion-v1-5", # Common base for SD 1.5 LoRAs
"pipeline_class": StableDiffusionPipeline,
"ip_adapter_repo": "h94/IP-Adapter",
"ip_adapter_weights": "ip-adapter_sd15.bin",
"ip_adapter_image_encoder": "laion/CLIP-ViT-H-14-laion2B-s32B-b79K"
}
}
AVAILABLE_MODELS = list(MODEL_CONFIG.keys())
# Global variable to potentially hold the pipeline to avoid reloading *if memory allows*
# NOTE: On restricted CPU environments, it's SAFER to load inside the function.
# Set to None initially. Let's load dynamically inside the function for safety.
# current_pipeline = None
# current_model_key = None
# --- Helper Functions ---
def cleanup_memory():
"""Attempts to free GPU memory (less relevant for CPU but good practice)."""
logger.info("Attempting to clean up memory...")
try:
# If a pipeline exists globally (if we change strategy), unload it
# global current_pipeline, current_model_key
# if current_pipeline is not None:
# logger.info(f"Unloading model {current_model_key} from memory.")
# del current_pipeline
# current_pipeline = None
# current_model_key = None
gc.collect()
if torch.cuda.is_available(): # Only run cuda cache empty if cuda is present
torch.cuda.empty_cache()
logger.info("Memory cleanup potentially done.")
except Exception as e:
logger.error(f"Error during memory cleanup: {e}")
# --- Main Generation Function ---
def generate_thumbnail(
model_key: str,
prompt: str,
negative_prompt: str,
reference_image_pil: Image.Image | None, # Gradio provides PIL image
num_inference_steps: int,
guidance_scale: float,
seed: int,
ip_adapter_scale: float,
progress=gr.Progress()
):
"""Generates an image using the selected model, IP-Adapter, and settings."""
start_time = time.time()
debug_log = f"--- Generation Log ({time.strftime('%Y-%m-%d %H:%M:%S')}) ---\n"
debug_log += f"Selected Model Key: {model_key}\n"
debug_log += f"Prompt: {prompt}\n"
debug_log += f"Negative Prompt: {negative_prompt}\n"
debug_log += f"Steps: {num_inference_steps}, CFG Scale: {guidance_scale}\n"
debug_log += f"Seed: {seed}\n"
debug_log += f"Reference Image Provided: {'Yes' if reference_image_pil else 'No'}\n"
debug_log += f"IP Adapter Scale: {ip_adapter_scale}\n"
debug_log += f"Device: {DEVICE}, Dtype: {TORCH_DTYPE}\n\n"
pipeline = None # Ensure pipeline is defined in this scope
try:
if not model_key:
raise ValueError("No model selected.")
config = MODEL_CONFIG[model_key]
repo_id = config["repo_id"]
is_lora = config["is_lora"]
base_model = config["base_model"]
pipeline_class = config["pipeline_class"]
ip_adapter_repo = config["ip_adapter_repo"]
ip_adapter_weights = config["ip_adapter_weights"]
# ip_adapter_image_encoder = config["ip_adapter_image_encoder"] # Encoder loaded via IP-Adapter itself usually
# --- Model Loading ---
load_start_time = time.time()
debug_log += f"[{time.time() - start_time:.2f}s] Cleaning up memory before loading...\n"
progress(0.1, desc="Cleaning up memory...")
cleanup_memory() # Attempt cleanup before loading new model
debug_log += f"[{time.time() - start_time:.2f}s] Loading model: {'LoRA ' + repo_id if is_lora else repo_id}...\n"
progress(0.2, desc=f"Loading {'LoRA ' + repo_id if is_lora else repo_id}...")
model_load_id = base_model if is_lora else repo_id
debug_log += f"[{time.time() - start_time:.2f}s] Base/Model ID for pipeline: {model_load_id}\n"
pipeline = pipeline_class.from_pretrained(
model_load_id,
torch_dtype=TORCH_DTYPE,
# Add any specific args needed for the pipeline class if necessary
# safety_checker=None, # Disable safety checker if needed/causes issues on CPU
# requires_safety_checker=False,
)
pipeline.to(DEVICE)
debug_log += f"[{time.time() - start_time:.2f}s] Base pipeline loaded onto {DEVICE}.\n"
if is_lora:
lora_load_start = time.time()
debug_log += f"[{time.time() - start_time:.2f}s] Loading LoRA weights from {repo_id}...\n"
progress(0.4, desc=f"Loading LoRA {repo_id}...")
try:
lora_filename = config.get("lora_filename") # Get specific filename if provided
if lora_filename:
debug_log += f"[{time.time() - start_time:.2f}s] Using specified LoRA filename: {lora_filename}\n"
pipeline.load_lora_weights(repo_id, weight_name=lora_filename, torch_dtype=TORCH_DTYPE)
else:
# Let diffusers try to auto-detect standard names like .safetensors or .bin
debug_log += f"[{time.time() - start_time:.2f}s] Attempting auto-detection of LoRA filename.\n"
pipeline.load_lora_weights(repo_id, torch_dtype=TORCH_DTYPE)
# When using LoRA with diffusers >= 0.22, explicitly fuse *or* set adapters
# pipeline.fuse_lora() # Fuse creates a new pipeline state (might use more memory)
pipeline.set_adapters(pipeline.get_active_adapters(), adapter_weights=1.0) # Recommended for flexibility
debug_log += f"[{time.time() - start_time:.2f}s] LoRA weights loaded and adapters set in {time.time() - lora_load_start:.2f}s.\n"
except Exception as e:
debug_log += f"[{time.time() - start_time:.2f}s] ERROR loading LoRA: {e}. Check LoRA repo structure/filename.\n"
# Decide whether to continue without LoRA or raise error
raise ValueError(f"Failed to load LoRA weights for {repo_id}: {e}")
# --- IP Adapter Loading ---
if reference_image_pil and ip_adapter_scale > 0:
ip_load_start = time.time()
debug_log += f"[{time.time() - start_time:.2f}s] Loading IP-Adapter: {ip_adapter_repo} ({ip_adapter_weights})...\n"
progress(0.6, desc="Loading IP-Adapter...")
try:
# Ensure the pipeline has the load_ip_adapter method
if not hasattr(pipeline, "load_ip_adapter"):
raise AttributeError("The current pipeline class does not support load_ip_adapter. Check diffusers version or pipeline type.")
pipeline.load_ip_adapter(
ip_adapter_repo,
subfolder="models", # Common subfolder, adjust if needed
weight_name=ip_adapter_weights,
# image_encoder_folder=ip_adapter_image_encoder # Let diffusers handle encoder loading usually
)
pipeline.set_ip_adapter_scale(ip_adapter_scale)
debug_log += f"[{time.time() - start_time:.2f}s] IP-Adapter loaded and scale set ({ip_adapter_scale}) in {time.time() - ip_load_start:.2f}s.\n"
# Prepare the image for IP-Adapter (often just needs to be a PIL image)
ip_image = reference_image_pil.convert("RGB")
debug_log += f"[{time.time() - start_time:.2f}s] Reference image prepared for IP-Adapter.\n"
except Exception as e:
debug_log += f"[{time.time() - start_time:.2f}s] WARNING: Failed to load IP-Adapter: {e}. Proceeding without image guidance.\n"
ip_image = None
ip_adapter_scale = 0 # Effectively disable it if loading failed
pipeline.set_ip_adapter_scale(0) # Ensure scale is 0
else:
ip_image = None
if hasattr(pipeline, "set_ip_adapter_scale"):
pipeline.set_ip_adapter_scale(0) # Ensure scale is 0 if no image/scale=0
debug_log += f"[{time.time() - start_time:.2f}s] No reference image provided or IP Adapter scale is 0. Skipping IP-Adapter loading.\n"
debug_log += f"[{time.time() - start_time:.2f}s] Total Model & IP-Adapter Loading time: {time.time() - load_start_time:.2f}s\n"
# --- Generation ---
gen_start_time = time.time()
debug_log += f"[{time.time() - start_time:.2f}s] Starting generation...\n"
progress(0.7, desc="Generating image...")
# Handle seed
if seed == -1:
seed = random.randint(0, 2**32 - 1)
debug_log += f"[{time.time() - start_time:.2f}s] Using random seed: {seed}\n"
generator = torch.Generator(device=DEVICE).manual_seed(seed)
# Prepare arguments for pipeline call
pipeline_args = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
"generator": generator,
}
# Add IP-Adapter image if it's loaded and ready
if ip_image is not None and hasattr(pipeline, "set_ip_adapter_scale") and ip_adapter_scale > 0:
pipeline_args["ip_adapter_image"] = ip_image
# Scale was set earlier with set_ip_adapter_scale
debug_log += f"[{time.time() - start_time:.2f}s] Passing reference image to pipeline with IP scale {ip_adapter_scale}.\n"
else:
debug_log += f"[{time.time() - start_time:.2f}s] Not passing reference image to pipeline.\n"
# Run inference
with torch.inference_mode(): # More modern than no_grad for inference
output_image = pipeline(**pipeline_args).images[0]
gen_end_time = time.time()
debug_log += f"[{time.time() - start_time:.2f}s] Generation finished in {gen_end_time - gen_start_time:.2f}s.\n"
# --- Cleanup ---
debug_log += f"[{time.time() - start_time:.2f}s] Unloading model from memory (CPU strategy)...\n"
progress(0.95, desc="Cleaning up...")
del pipeline # Explicitly delete pipeline
cleanup_memory() # Call garbage collection
total_time = time.time() - start_time
debug_log += f"\n--- Total time: {total_time:.2f} seconds ---\n"
return output_image, debug_log
except Exception as e:
logger.exception(f"Error during generation for model {model_key}") # Log full traceback
error_time = time.time() - start_time
debug_log += f"\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
debug_log += f"ERROR occurred after {error_time:.2f}s:\n{e}\n"
debug_log += f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
# Try cleanup even on error
if 'pipeline' in locals() and pipeline is not None:
del pipeline
cleanup_memory()
# Return None for image, and the log containing the error
return None, debug_log
# --- Gradio Interface ---
css = """
#warning {
background-color: #FFCCCB; /* Light red */
padding: 10px;
border-radius: 5px;
text-align: center;
font-weight: bold;
}
#debug_log_area textarea {
font-family: monospace;
font-size: 10px; /* Smaller font for logs */
white-space: pre-wrap; /* Wrap long lines */
word-wrap: break-word; /* Break words if necessary */
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# YouTube Thumbnail Generator with IP-Adapter")
gr.Markdown(
"Select a thumbnail model, provide a text prompt, and optionally upload a reference image "
"to guide the generation using IP-Adapter."
)
gr.HTML("<div id='warning'>⚠️ Warning: Inference on CPU is VERY SLOW (minutes per image, especially SDXL models). Please be patient.</div>")
with gr.Row():
with gr.Column(scale=1):
model_dropdown = gr.Dropdown(
label="Select Thumbnail Model",
choices=AVAILABLE_MODELS,
value=AVAILABLE_MODELS[0] if AVAILABLE_MODELS else None,
)
prompt_input = gr.Textbox(label="Prompt", lines=3, placeholder="e.g., Epic landscape, dramatic lighting, YouTube thumbnail style")
negative_prompt_input = gr.Textbox(label="Negative Prompt", lines=2, placeholder="e.g., blurry, low quality, text, signature, watermark")
reference_image_input = gr.Image(label="Reference Image (for IP-Adapter)", type="pil", sources=["upload"])
with gr.Accordion("Advanced Settings", open=False):
steps_slider = gr.Slider(label="Inference Steps", minimum=10, maximum=100, value=30, step=1)
cfg_slider = gr.Slider(label="Guidance Scale (CFG)", minimum=1.0, maximum=20.0, value=7.0, step=0.5)
ip_adapter_scale_slider = gr.Slider(label="IP-Adapter Scale", minimum=0.0, maximum=1.5, value=0.6, step=0.05,
info="Strength of the reference image influence (0 = disabled).")
seed_input = gr.Number(label="Seed", value=-1, precision=0, info="-1 for random seed")
generate_button = gr.Button("Generate Thumbnail", variant="primary")
with gr.Column(scale=1):
output_image = gr.Image(label="Generated Thumbnail", type="pil")
debug_output = gr.Textbox(label="Debug Log", lines=20, interactive=False, elem_id="debug_log_area")
generate_button.click(
fn=generate_thumbnail,
inputs=[
model_dropdown,
prompt_input,
negative_prompt_input,
reference_image_input,
steps_slider,
cfg_slider,
seed_input,
ip_adapter_scale_slider
],
outputs=[output_image, debug_output]
)
# --- Launch ---
if __name__ == "__main__":
logger.info("Starting Gradio App...")
# Queueing is important for handling multiple users on Spaces, even if slow
demo.queue().launch(debug=True) # debug=True provides Gradio debug info in console