import gradio as gr import torch from huggingface_hub import snapshot_download from xora.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from xora.models.transformers.transformer3d import Transformer3DModel from xora.models.transformers.symmetric_patchifier import SymmetricPatchifier from xora.schedulers.rf import RectifiedFlowScheduler from xora.pipelines.pipeline_xora_video import XoraVideoPipeline from transformers import T5EncoderModel, T5Tokenizer from xora.utils.conditioning_method import ConditioningMethod from pathlib import Path import safetensors.torch import json import numpy as np import cv2 from PIL import Image import tempfile import os import gc # Load Hugging Face token if needed hf_token = os.getenv("HF_TOKEN") # Set model download directory within Hugging Face Spaces model_path = "asset" if not os.path.exists(model_path): snapshot_download( "Lightricks/LTX-Video", local_dir=model_path, repo_type="model", token=hf_token ) # Global variables to load components vae_dir = Path(model_path) / "vae" unet_dir = Path(model_path) / "unet" scheduler_dir = Path(model_path) / "scheduler" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def load_vae(vae_dir): vae_ckpt_path = vae_dir / "vae_diffusion_pytorch_model.safetensors" vae_config_path = vae_dir / "config.json" with open(vae_config_path, "r") as f: vae_config = json.load(f) vae = CausalVideoAutoencoder.from_config(vae_config) vae_state_dict = safetensors.torch.load_file(vae_ckpt_path) vae.load_state_dict(vae_state_dict) return vae.to(device=device, dtype=torch.bfloat16) def load_unet(unet_dir): unet_ckpt_path = unet_dir / "unet_diffusion_pytorch_model.safetensors" unet_config_path = unet_dir / "config.json" transformer_config = Transformer3DModel.load_config(unet_config_path) transformer = Transformer3DModel.from_config(transformer_config) unet_state_dict = safetensors.torch.load_file(unet_ckpt_path) transformer.load_state_dict(unet_state_dict, strict=True) return transformer.to(device=device, dtype=torch.bfloat16) def load_scheduler(scheduler_dir): scheduler_config_path = scheduler_dir / "scheduler_config.json" scheduler_config = RectifiedFlowScheduler.load_config(scheduler_config_path) return RectifiedFlowScheduler.from_config(scheduler_config) # Helper function for image processing def center_crop_and_resize(frame, target_height, target_width): h, w, _ = frame.shape aspect_ratio_target = target_width / target_height aspect_ratio_frame = w / h if aspect_ratio_frame > aspect_ratio_target: new_width = int(h * aspect_ratio_target) x_start = (w - new_width) // 2 frame_cropped = frame[:, x_start : x_start + new_width] else: new_height = int(w / aspect_ratio_target) y_start = (h - new_height) // 2 frame_cropped = frame[y_start : y_start + new_height, :] frame_resized = cv2.resize(frame_cropped, (target_width, target_height)) return frame_resized def load_image_to_tensor_with_resize(image_path, target_height=512, target_width=768): image = Image.open(image_path).convert("RGB") image_np = np.array(image) frame_resized = center_crop_and_resize(image_np, target_height, target_width) frame_tensor = torch.tensor(frame_resized).permute(2, 0, 1).float() frame_tensor = (frame_tensor / 127.5) - 1.0 return frame_tensor.unsqueeze(0).unsqueeze(2) # Preset options for resolution and frame configuration preset_options = [ {"label": "1216x704, 41 frames", "width": 1216, "height": 704, "num_frames": 41}, {"label": "1088x704, 49 frames", "width": 1088, "height": 704, "num_frames": 49}, {"label": "1056x640, 57 frames", "width": 1056, "height": 640, "num_frames": 57}, {"label": "992x608, 65 frames", "width": 992, "height": 608, "num_frames": 65}, {"label": "896x608, 73 frames", "width": 896, "height": 608, "num_frames": 73}, {"label": "896x544, 81 frames", "width": 896, "height": 544, "num_frames": 81}, {"label": "832x544, 89 frames", "width": 832, "height": 544, "num_frames": 89}, {"label": "800x512, 97 frames", "width": 800, "height": 512, "num_frames": 97}, {"label": "768x512, 97 frames", "width": 768, "height": 512, "num_frames": 97}, {"label": "800x480, 105 frames", "width": 800, "height": 480, "num_frames": 105}, {"label": "736x480, 113 frames", "width": 736, "height": 480, "num_frames": 113}, {"label": "704x480, 121 frames", "width": 704, "height": 480, "num_frames": 121}, {"label": "704x448, 129 frames", "width": 704, "height": 448, "num_frames": 129}, {"label": "672x448, 137 frames", "width": 672, "height": 448, "num_frames": 137}, {"label": "640x416, 153 frames", "width": 640, "height": 416, "num_frames": 153}, {"label": "672x384, 161 frames", "width": 672, "height": 384, "num_frames": 161}, {"label": "640x384, 169 frames", "width": 640, "height": 384, "num_frames": 169}, {"label": "608x384, 177 frames", "width": 608, "height": 384, "num_frames": 177}, {"label": "576x384, 185 frames", "width": 576, "height": 384, "num_frames": 185}, {"label": "608x352, 193 frames", "width": 608, "height": 352, "num_frames": 193}, {"label": "576x352, 201 frames", "width": 576, "height": 352, "num_frames": 201}, {"label": "544x352, 209 frames", "width": 544, "height": 352, "num_frames": 209}, {"label": "512x352, 225 frames", "width": 512, "height": 352, "num_frames": 225}, {"label": "512x352, 233 frames", "width": 512, "height": 352, "num_frames": 233}, {"label": "544x320, 241 frames", "width": 544, "height": 320, "num_frames": 241}, {"label": "512x320, 249 frames", "width": 512, "height": 320, "num_frames": 249}, {"label": "512x320, 257 frames", "width": 512, "height": 320, "num_frames": 257}, ] # Function to toggle visibility of sliders based on preset selection def preset_changed(preset): if preset != "Custom": selected = next(item for item in preset_options if item["label"] == preset) return ( selected["height"], selected["width"], selected["num_frames"], gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), ) else: return ( None, None, None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), ) # Load models vae = load_vae(vae_dir) unet = load_unet(unet_dir) scheduler = load_scheduler(scheduler_dir) patchifier = SymmetricPatchifier(patch_size=1) text_encoder = T5EncoderModel.from_pretrained( "PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="text_encoder" ).to(device) tokenizer = T5Tokenizer.from_pretrained( "PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="tokenizer" ) pipeline = XoraVideoPipeline( transformer=unet, patchifier=patchifier, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler, vae=vae, ).to(device) def generate_video_from_text( prompt="", negative_prompt="", frame_rate=25, seed=171198, num_inference_steps=40, guidance_scale=3, height=512, width=768, num_frames=121, progress=gr.Progress(), ): if len(prompt.strip()) < 50: raise gr.Error( "Prompt must be at least 50 characters long. Please provide more details for the best results.", duration=5, ) sample = { "prompt": prompt, "prompt_attention_mask": None, "negative_prompt": negative_prompt, "negative_prompt_attention_mask": None, "media_items": None, } generator = torch.Generator(device="cpu").manual_seed(seed) def gradio_progress_callback(self, step, timestep, kwargs): progress((step + 1) / num_inference_steps) try: with torch.no_grad(): images = pipeline( num_inference_steps=num_inference_steps, num_images_per_prompt=1, guidance_scale=guidance_scale, generator=generator, output_type="pt", height=height, width=width, num_frames=num_frames, frame_rate=frame_rate, **sample, is_video=True, vae_per_channel_normalize=True, conditioning_method=ConditioningMethod.UNCONDITIONAL, mixed_precision=True, callback_on_step_end=gradio_progress_callback, ).images except Exception as e: raise gr.Error( f"An error occurred while generating the video. Please try again. Error: {e}", duration=5, ) finally: torch.cuda.empty_cache() gc.collect() output_path = tempfile.mktemp(suffix=".mp4") print(images.shape) video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() video_np = (video_np * 255).astype(np.uint8) height, width = video_np.shape[1:3] out = cv2.VideoWriter( output_path, cv2.VideoWriter_fourcc(*"mp4v"), frame_rate, (width, height) ) for frame in video_np[..., ::-1]: out.write(frame) out.release() # Explicitly delete tensors and clear cache del images del video_np torch.cuda.empty_cache() return output_path def generate_video_from_image( image_path, prompt="", negative_prompt="", frame_rate=25, seed=171198, num_inference_steps=40, guidance_scale=3, height=512, width=768, num_frames=121, progress=gr.Progress(), ): print("Height: ", height) print("Width: ", width) print("Num Frames: ", num_frames) if len(prompt.strip()) < 50: raise gr.Error( "Prompt must be at least 50 characters long. Please provide more details for the best results.", duration=5, ) if not image_path: raise gr.Error("Please provide an input image.", duration=5) media_items = ( load_image_to_tensor_with_resize(image_path, height, width).to(device).detach() ) sample = { "prompt": prompt, "prompt_attention_mask": None, "negative_prompt": negative_prompt, "negative_prompt_attention_mask": None, "media_items": media_items, } generator = torch.Generator(device="cpu").manual_seed(seed) def gradio_progress_callback(self, step, timestep, kwargs): progress((step + 1) / num_inference_steps) try: with torch.no_grad(): images = pipeline( num_inference_steps=num_inference_steps, num_images_per_prompt=1, guidance_scale=guidance_scale, generator=generator, output_type="pt", height=height, width=width, num_frames=num_frames, frame_rate=frame_rate, **sample, is_video=True, vae_per_channel_normalize=True, conditioning_method=ConditioningMethod.FIRST_FRAME, mixed_precision=True, callback_on_step_end=gradio_progress_callback, ).images output_path = tempfile.mktemp(suffix=".mp4") video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() video_np = (video_np * 255).astype(np.uint8) height, width = video_np.shape[1:3] out = cv2.VideoWriter( output_path, cv2.VideoWriter_fourcc(*"mp4v"), frame_rate, (width, height) ) for frame in video_np[..., ::-1]: out.write(frame) out.release() except Exception as e: raise gr.Error( f"An error occurred while generating the video. Please try again. Error: {e}", duration=5, ) finally: torch.cuda.empty_cache() gc.collect() return output_path def create_advanced_options(): with gr.Accordion("Step 4: Advanced Options (Optional)", open=False): seed = gr.Slider( label="4.1 Seed", minimum=0, maximum=1000000, step=1, value=171198 ) inference_steps = gr.Slider( label="4.2 Inference Steps", minimum=1, maximum=100, step=1, value=40 ) guidance_scale = gr.Slider( label="4.3 Guidance Scale", minimum=1.0, maximum=20.0, step=0.1, value=3.0 ) height_slider = gr.Slider( label="4.4 Height", minimum=256, maximum=1024, step=64, value=512, visible=False, ) width_slider = gr.Slider( label="4.5 Width", minimum=256, maximum=1024, step=64, value=768, visible=False, ) num_frames_slider = gr.Slider( label="4.5 Number of Frames", minimum=1, maximum=200, step=1, value=121, visible=False, ) return [ seed, inference_steps, guidance_scale, height_slider, width_slider, num_frames_slider, ] # Define the Gradio interface with tabs with gr.Blocks(theme=gr.themes.Soft()) as iface: with gr.Row(elem_id="title-row"): gr.Markdown( """