Spaces:
Running on Zero
Running on Zero
| import os | |
| import subprocess | |
| import sys | |
| # Disable torch.compile / dynamo before any torch import | |
| os.environ["TORCH_COMPILE_DISABLE"] = "1" | |
| os.environ["TORCHDYNAMO_DISABLE"] = "1" | |
| # Install xformers for memory-efficient attention | |
| subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) | |
| # Clone LTX-2 repo and install packages | |
| LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" | |
| LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") | |
| LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" # known working commit with decode_video | |
| if not os.path.exists(LTX_REPO_DIR): | |
| print(f"Cloning {LTX_REPO_URL}...") | |
| subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) | |
| subprocess.run(["git", "checkout", LTX_COMMIT], cwd=LTX_REPO_DIR, check=True) | |
| print("Installing ltx-core and ltx-pipelines from cloned repo...") | |
| subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", | |
| os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), | |
| "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], | |
| check=True, | |
| ) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) | |
| import logging | |
| import random | |
| import tempfile | |
| from pathlib import Path | |
| import gc | |
| import hashlib | |
| import torch | |
| torch._dynamo.config.suppress_errors = True | |
| torch._dynamo.config.disable = True | |
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| from huggingface_hub import hf_hub_download, snapshot_download | |
| from safetensors.torch import load_file, save_file | |
| from safetensors import safe_open | |
| import json | |
| import requests | |
| from ltx_core.components.diffusion_steps import EulerDiffusionStep | |
| from ltx_core.components.guiders import MultiModalGuider, MultiModalGuiderParams | |
| from ltx_core.components.noisers import GaussianNoiser | |
| from ltx_core.model.audio_vae import encode_audio as vae_encode_audio | |
| from ltx_core.model.audio_vae import decode_audio as vae_decode_audio | |
| from ltx_core.model.upsampler import upsample_video | |
| from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number, decode_video as vae_decode_video | |
| from ltx_core.quantization import QuantizationPolicy | |
| from ltx_core.types import Audio, AudioLatentShape, VideoPixelShape | |
| from ltx_pipelines.distilled import DistilledPipeline | |
| from ltx_pipelines.utils import euler_denoising_loop | |
| from ltx_pipelines.utils.args import ImageConditioningInput | |
| from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES | |
| from ltx_pipelines.utils.helpers import ( | |
| cleanup_memory, | |
| combined_image_conditionings, | |
| denoise_video_only, | |
| encode_prompts, | |
| simple_denoising_func, | |
| multi_modal_guider_denoising_func, | |
| ) | |
| from ltx_pipelines.utils.media_io import decode_audio_from_file, encode_video | |
| from ltx_core.loader.primitives import LoraPathStrengthAndSDOps, StateDict, LoraStateDictWithStrength | |
| from ltx_core.loader.sd_ops import LTXV_LORA_COMFY_RENAMING_MAP | |
| from ltx_core.loader.fuse_loras import apply_loras | |
| from safetensors import safe_open | |
| # Force-patch xformers attention into the LTX attention module. | |
| from ltx_core.model.transformer import attention as _attn_mod | |
| print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| try: | |
| from xformers.ops import memory_efficient_attention as _mea | |
| _attn_mod.memory_efficient_attention = _mea | |
| print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") | |
| except Exception as e: | |
| print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") | |
| logging.getLogger().setLevel(logging.INFO) | |
| MAX_SEED = np.iinfo(np.int32).max | |
| DEFAULT_PROMPT = ( | |
| "An astronaut hatches from a fragile egg on the surface of the Moon, " | |
| "the shell cracking and peeling apart in gentle low-gravity motion. " | |
| "Fine lunar dust lifts and drifts outward with each movement, floating " | |
| "in slow arcs before settling back onto the ground." | |
| ) | |
| DEFAULT_NEGATIVE_PROMPT = ( | |
| "worst quality, inconsistent motion, blurry, jittery, distorted, " | |
| "deformed, artifacts, text, watermark, logo, frame, border, " | |
| "low resolution, pixelated, unnatural, fake, CGI, cartoon" | |
| ) | |
| DEFAULT_FRAME_RATE = 24.0 | |
| # Resolution presets: (width, height) | |
| RESOLUTIONS = { | |
| "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)}, | |
| "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)}, | |
| } | |
| class LTX23DistilledA2VPipeline: | |
| """Standalone pipeline with optional audio conditioning — no parent class.""" | |
| def __init__( | |
| self, | |
| distilled_checkpoint_path: str, | |
| spatial_upsampler_path: str, | |
| gemma_root: str, | |
| loras: tuple, | |
| quantization: QuantizationPolicy | None = None, | |
| ): | |
| from ltx_pipelines.utils import ModelLedger, denoise_audio_video | |
| from ltx_pipelines.utils.types import PipelineComponents | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.dtype = torch.bfloat16 | |
| self.model_ledger = ModelLedger( | |
| dtype=self.dtype, | |
| device=self.device, | |
| checkpoint_path=distilled_checkpoint_path, | |
| gemma_root_path=gemma_root, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| loras=loras, | |
| quantization=quantization, | |
| ) | |
| self.pipeline_components = PipelineComponents( | |
| dtype=self.dtype, | |
| device=self.device, | |
| ) | |
| def __call__( | |
| self, | |
| prompt: str, | |
| negative_prompt: str, | |
| seed: int, | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| frame_rate: float, | |
| video_guider_params: MultiModalGuiderParams, | |
| audio_guider_params: MultiModalGuiderParams, | |
| images: list[ImageConditioningInput], | |
| audio_path: str | None = None, | |
| tiling_config: TilingConfig | None = None, | |
| enhance_prompt: bool = False, | |
| ): | |
| print(prompt) | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| noiser = GaussianNoiser(generator=generator) | |
| stepper = EulerDiffusionStep() | |
| dtype = torch.bfloat16 | |
| ctx_p, ctx_n = encode_prompts( | |
| [prompt, negative_prompt], | |
| self.model_ledger, | |
| enhance_first_prompt=enhance_prompt, | |
| enhance_prompt_image=images[0].path if len(images) > 0 else None, | |
| ) | |
| v_context_p, a_context_p = ctx_p.video_encoding, ctx_p.audio_encoding | |
| v_context_n, a_context_n = ctx_n.video_encoding, ctx_n.audio_encoding | |
| # ── Audio encoding (only for conditioning, not output generation) ── | |
| encoded_audio_latent = None | |
| decoded_audio = None | |
| if audio_path is not None: | |
| video_duration = num_frames / frame_rate | |
| decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) | |
| if decoded_audio is None: | |
| raise ValueError(f"Could not extract audio stream from {audio_path}") | |
| encoded_audio_latent = vae_encode_audio(decoded_audio, self.model_ledger.audio_encoder()) | |
| audio_shape = AudioLatentShape.from_duration(batch=1, duration=video_duration, channels=8, mel_bins=16) | |
| expected_frames = audio_shape.frames | |
| actual_frames = encoded_audio_latent.shape[2] | |
| if actual_frames > expected_frames: | |
| encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] | |
| elif actual_frames < expected_frames: | |
| pad = torch.zeros( | |
| encoded_audio_latent.shape[0], | |
| encoded_audio_latent.shape[1], | |
| expected_frames - actual_frames, | |
| encoded_audio_latent.shape[3], | |
| device=encoded_audio_latent.device, | |
| dtype=encoded_audio_latent.dtype, | |
| ) | |
| encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) | |
| video_encoder = self.model_ledger.video_encoder() | |
| transformer = self.model_ledger.transformer() | |
| stage_1_sigmas = torch.tensor(DISTILLED_SIGMA_VALUES, device=self.device) | |
| def stage1_denoising_loop(sigmas, video_state, audio_state, stepper): | |
| return euler_denoising_loop( | |
| sigmas=sigmas, | |
| video_state=video_state, | |
| audio_state=audio_state, | |
| stepper=stepper, | |
| denoise_fn=multi_modal_guider_denoising_func( | |
| video_guider=MultiModalGuider( | |
| params=video_guider_params, | |
| negative_context=v_context_n, | |
| ), | |
| audio_guider=MultiModalGuider( | |
| params=audio_guider_params, | |
| negative_context=a_context_n, | |
| ), | |
| v_context=v_context_p, | |
| a_context=a_context_p, | |
| transformer=transformer, | |
| ), | |
| ) | |
| def stage2_denoising_loop(sigmas, video_state, audio_state, stepper): | |
| return euler_denoising_loop( | |
| sigmas=sigmas, | |
| video_state=video_state, | |
| audio_state=audio_state, | |
| stepper=stepper, | |
| denoise_fn=simple_denoising_func( | |
| video_context=v_context_p, | |
| audio_context=a_context_p, | |
| transformer=transformer, | |
| ), | |
| ) | |
| # ── Stage 1: Half resolution ── | |
| stage_1_output_shape = VideoPixelShape( | |
| batch=1, | |
| frames=num_frames, | |
| width=width // 2, | |
| height=height // 2, | |
| fps=frame_rate, | |
| ) | |
| stage_1_conditionings = combined_image_conditionings( | |
| images=images, | |
| height=stage_1_output_shape.height, | |
| width=stage_1_output_shape.width, | |
| video_encoder=video_encoder, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| # Use denoise_audio_video so audio is ALWAYS generated | |
| from ltx_pipelines.utils import denoise_audio_video | |
| video_state, audio_state = denoise_audio_video( | |
| output_shape=stage_1_output_shape, | |
| conditionings=stage_1_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_1_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=stage1_denoising_loop, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| initial_audio_latent=encoded_audio_latent, | |
| ) | |
| torch.cuda.synchronize() | |
| cleanup_memory() | |
| # ── Upscaling ── | |
| upscaled_video_latent = upsample_video( | |
| latent=video_state.latent[:1], | |
| video_encoder=video_encoder, | |
| upsampler=self.model_ledger.spatial_upsampler(), | |
| ) | |
| # ── Stage 2: Full resolution ── | |
| stage_2_sigmas = torch.tensor(STAGE_2_DISTILLED_SIGMA_VALUES, device=self.device) | |
| stage_2_output_shape = VideoPixelShape(batch=1, frames=num_frames, width=width, height=height, fps=frame_rate) | |
| stage_2_conditionings = combined_image_conditionings( | |
| images=images, | |
| height=stage_2_output_shape.height, | |
| width=stage_2_output_shape.width, | |
| video_encoder=video_encoder, | |
| dtype=dtype, | |
| device=self.device, | |
| ) | |
| video_state, audio_state = denoise_audio_video( | |
| output_shape=stage_2_output_shape, | |
| conditionings=stage_2_conditionings, | |
| noiser=noiser, | |
| sigmas=stage_2_sigmas, | |
| stepper=stepper, | |
| denoising_loop_fn=stage2_denoising_loop, | |
| components=self.pipeline_components, | |
| dtype=dtype, | |
| device=self.device, | |
| noise_scale=stage_2_sigmas[0], | |
| initial_video_latent=upscaled_video_latent, | |
| initial_audio_latent=audio_state.latent, | |
| ) | |
| torch.cuda.synchronize() | |
| del transformer | |
| del video_encoder | |
| cleanup_memory() | |
| # ── Decode both video and audio ── | |
| decoded_video = vae_decode_video( | |
| video_state.latent, | |
| self.model_ledger.video_decoder(), | |
| tiling_config, | |
| generator, | |
| ) | |
| decoded_audio_output = vae_decode_audio( | |
| audio_state.latent, | |
| self.model_ledger.audio_decoder(), | |
| self.model_ledger.vocoder(), | |
| ) | |
| return decoded_video, decoded_audio_output | |
| # Model repos | |
| LTX_MODEL_REPO = "Lightricks/LTX-2.3" | |
| GEMMA_REPO ="Lightricks/gemma-3-12b-it-qat-q4_0-unquantized" | |
| # Download model checkpoints | |
| print("=" * 80) | |
| print("Downloading LTX-2.3 distilled model + Gemma...") | |
| print("=" * 80) | |
| # LoRA cache directory and currently-applied key | |
| LORA_CACHE_DIR = Path("lora_cache") | |
| LORA_CACHE_DIR.mkdir(exist_ok=True) | |
| current_lora_key: str | None = None | |
| weights_dir = Path("weights") | |
| weights_dir.mkdir(exist_ok=True) | |
| checkpoint_path = hf_hub_download( | |
| repo_id=LTX_MODEL_REPO, | |
| filename="ltx-2.3-22b-distilled-1.1.safetensors", | |
| local_dir=str(weights_dir), | |
| local_dir_use_symlinks=False, | |
| ) | |
| spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.1.safetensors") | |
| gemma_root = snapshot_download(repo_id=GEMMA_REPO) | |
| # ---- Insert block (LoRA downloads) between lines 268 and 269 ---- | |
| # LoRA repo + download the requested LoRA adapters | |
| LORA_REPO = "dagloop5/LoRA" | |
| print("=" * 80) | |
| print("Downloading LoRA adapters from dagloop5/LoRA...") | |
| print("=" * 80) | |
| pose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2_3_NSFW_furry_concat_v2.safetensors") | |
| general_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2.3_reasoning_I2V_V3.safetensors") | |
| motion_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="motion_helper.safetensors") | |
| dreamlay_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="DR34ML4Y_LTXXX_PREVIEW_RC1.safetensors") # m15510n4ry, bl0wj0b, d0ubl3_bj, d0gg1e, c0wg1rl | |
| mself_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="Furry Hyper Masturbation - LTX-2 I2V v1.safetensors") # Hyperfap | |
| dramatic_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2.3 - Orgasm.safetensors") # "[He | She] is having am orgasm." (am or an?) | |
| fluid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="cr3ampi3_animation_i2v_ltx2_v1.0.safetensors") # cr3ampi3 animation., missionary animation, doggystyle bouncy animation, double penetration animation | |
| liquid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="liquid_wet_dr1pp_ltx2_v1.0_scaled.safetensors") # wet dr1pp | |
| demopose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="clapping-cheeks-audio-v001-alpha.safetensors") | |
| voice_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="hentai_voice_ltx23.safetensors") | |
| realism_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="FurryenhancerLTX2.3V1.215.safetensors") | |
| transition_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2_takerpov_lora_v1.2.safetensors") # takerpov1, taker pov | |
| print(f"Pose LoRA: {pose_lora_path}") | |
| print(f"General LoRA: {general_lora_path}") | |
| print(f"Motion LoRA: {motion_lora_path}") | |
| print(f"Dreamlay LoRA: {dreamlay_lora_path}") | |
| print(f"Mself LoRA: {mself_lora_path}") | |
| print(f"Dramatic LoRA: {dramatic_lora_path}") | |
| print(f"Fluid LoRA: {fluid_lora_path}") | |
| print(f"Liquid LoRA: {liquid_lora_path}") | |
| print(f"Demopose LoRA: {demopose_lora_path}") | |
| print(f"Voice LoRA: {voice_lora_path}") | |
| print(f"Realism LoRA: {realism_lora_path}") | |
| print(f"Transition LoRA: {transition_lora_path}") | |
| # ---------------------------------------------------------------- | |
| print(f"Checkpoint: {checkpoint_path}") | |
| print(f"Spatial upsampler: {spatial_upsampler_path}") | |
| print(f"[Gemma] Root ready: {gemma_root}") | |
| pipeline = LTX23DistilledA2VPipeline( | |
| distilled_checkpoint_path=checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root=gemma_root, | |
| loras=[], | |
| quantization=QuantizationPolicy.fp8_cast(), | |
| ) | |
| def _make_lora_key(pose_strength: float, general_strength: float, motion_strength: float, dreamlay_strength: float, mself_strength: float, dramatic_strength: float, fluid_strength: float, liquid_strength: float, demopose_strength: float, voice_strength: float, realism_strength: float, transition_strength: float) -> tuple[str, str]: | |
| rp = round(float(pose_strength), 2) | |
| rg = round(float(general_strength), 2) | |
| rm = round(float(motion_strength), 2) | |
| rd = round(float(dreamlay_strength), 2) | |
| rs = round(float(mself_strength), 2) | |
| rr = round(float(dramatic_strength), 2) | |
| rf = round(float(fluid_strength), 2) | |
| rl = round(float(liquid_strength), 2) | |
| ro = round(float(demopose_strength), 2) | |
| rv = round(float(voice_strength), 2) | |
| re = round(float(realism_strength), 2) | |
| rt = round(float(transition_strength), 2) | |
| key_str = f"{pose_lora_path}:{rp}|{general_lora_path}:{rg}|{motion_lora_path}:{rm}|{dreamlay_lora_path}:{rd}|{mself_lora_path}:{rs}|{dramatic_lora_path}:{rr}|{fluid_lora_path}:{rf}|{liquid_lora_path}:{rl}|{demopose_lora_path}:{ro}|{voice_lora_path}:{rv}|{realism_lora_path}:{re}|{transition_lora_path}:{rt}" | |
| key = hashlib.sha256(key_str.encode("utf-8")).hexdigest() | |
| return key, key_str | |
| # ============================================================================= | |
| # LoRA Cache (In-Memory) - Ultra-Fast In-Place Application | |
| # ============================================================================= | |
| # In-memory caches to avoid redundant disk I/O | |
| LORA_SD_CACHE: dict[str, StateDict] = {} # lora_path -> loaded StateDict | |
| FUSED_CACHE: dict[str, dict] = {} # cache key -> fused state dict (CPU) | |
| current_lora_key: str | None = None | |
| def load_lora_into_cache(lora_path: str) -> StateDict: | |
| """ | |
| Load a LoRA safetensor file into a cached StateDict. | |
| Subsequent calls return the cached version instantly. | |
| This replaces repeated disk reads with a one-time load + memory cache. | |
| """ | |
| if lora_path in LORA_SD_CACHE: | |
| return LORA_SD_CACHE[lora_path] | |
| print(f"[LoRA] Loading {os.path.basename(lora_path)} into memory cache...") | |
| # Use safe_open for memory-efficient streaming reads of large files | |
| tensors = {} | |
| with safe_open(lora_path, framework="pt", device="cpu") as f: | |
| for key in f.keys(): | |
| tensors[key] = f.get_tensor(key) | |
| state_dict = StateDict( | |
| sd=tensors, | |
| device=torch.device("cpu"), | |
| size=sum(t.nbytes for t in tensors.values()), | |
| dtype=set(t.dtype for t in tensors.values()) | |
| ) | |
| LORA_SD_CACHE[lora_path] = state_dict | |
| print(f"[LoRA] Cached {len(tensors)} tensors from {os.path.basename(lora_path)}") | |
| return state_dict | |
| def _rename_lora_keys_for_base_model(lora_sd: StateDict, base_keys: set[str]) -> StateDict: | |
| """ | |
| Rename LoRA state dict keys to match the base model's key format. | |
| LoRA keys: transformer_blocks.0.attn1.to_k.lora_A.weight | |
| Base keys: velocity_model.transformer_blocks.4.audio_attn1.to_out.0.weight | |
| We need to: | |
| 1. Add 'velocity_model.' prefix | |
| 2. Match block indices (LoRA block 0 might correspond to base block 4, etc.) | |
| Actually, LTX LoRA files typically have entries for ALL blocks. | |
| The LoRA files from ComfyUI exports may have different structures. | |
| We need to normalize to: velocity_model.transformer_blocks.X.attn.to_weight | |
| """ | |
| renamed_sd = {} | |
| # First pass: identify the pattern | |
| # LoRA keys like: transformer_blocks.0.attn1.to_k.lora_A.weight | |
| # Need to become: velocity_model.transformer_blocks.0.attn1.to_k.weight | |
| for key, tensor in lora_sd.sd.items(): | |
| new_key = key | |
| # Strip "diffusion_model." prefix if present | |
| if new_key.startswith("diffusion_model."): | |
| new_key = new_key[len("diffusion_model."):] | |
| # Now add "velocity_model." prefix | |
| if not new_key.startswith("velocity_model."): | |
| new_key = "velocity_model." + new_key | |
| # Convert LoRA key suffix to base weight suffix | |
| # .lora_A.weight -> .weight | |
| # .lora_B.weight -> .weight | |
| if new_key.endswith(".lora_A.weight"): | |
| new_key = new_key[:-len(".lora_A.weight")] + ".weight" | |
| elif new_key.endswith(".lora_B.weight"): | |
| new_key = new_key[:-len(".lora_B.weight")] + ".weight" | |
| renamed_sd[new_key] = tensor | |
| return StateDict( | |
| sd=renamed_sd, | |
| device=lora_sd.device, | |
| size=lora_sd.size, | |
| dtype=lora_sd.dtype | |
| ) | |
| def build_fused_state_dict( | |
| base_transformer, | |
| lora_configs: list[tuple[str, float]], | |
| progress_callback=None | |
| ) -> dict[str, torch.Tensor]: | |
| """ | |
| Fuse multiple LoRAs into a single state dict ready for load_state_dict(). | |
| Uses LTX's apply_loras function which handles FP8 quantization correctly. | |
| Args: | |
| base_transformer: The preloaded transformer model | |
| lora_configs: List of (lora_path, strength) tuples for non-zero LoRAs | |
| progress_callback: Optional callback(step, desc) for progress updates | |
| Returns: | |
| Dictionary of fused weights ready for load_state_dict() | |
| """ | |
| if not lora_configs: | |
| # No LoRAs - return base transformer state dict | |
| return {k: v.clone() for k, v in base_transformer.state_dict().items()} | |
| if progress_callback: | |
| progress_callback(0.1, "Loading LoRA state dicts into memory") | |
| # Step 1: Load all LoRA state dicts and rename keys to match base model | |
| lora_sd_with_strengths = [] | |
| # Get base key set for matching | |
| base_dict = base_transformer.state_dict() | |
| base_key_set = set(base_dict.keys()) | |
| print(f"[LoRA DEBUG] Total base model keys: {len(base_key_set)}") | |
| for lora_path, strength in lora_configs: | |
| sd = load_lora_into_cache(lora_path) | |
| sd_renamed = _rename_lora_keys_for_base_model(sd, base_key_set) | |
| # Show before/after for first few keys | |
| original_keys = list(sd.sd.keys())[:3] | |
| renamed_keys = list(sd_renamed.sd.keys())[:3] | |
| print(f"[LoRA DEBUG] Before: {original_keys}") | |
| print(f"[LoRA DEBUG] After: {renamed_keys}") | |
| # Check if renamed keys exist in base model | |
| sample_renamed = list(sd_renamed.sd.keys())[0] | |
| exists_in_base = sample_renamed in base_key_set | |
| print(f"[LoRA DEBUG] Sample renamed key exists in base? {exists_in_base}") | |
| if not exists_in_base: | |
| print(f"[LoRA DEBUG] Checking for similar base keys...") | |
| prefix = sample_renamed.rsplit(".", 1)[0] # Remove .weight suffix | |
| similar = [k for k in base_key_set if k.startswith(prefix[:30])] | |
| print(f"[LoRA DEBUG] Similar base keys: {similar[:3]}") | |
| if progress_callback: | |
| progress_callback(0.3, "Extracting base transformer state dict") | |
| # Step 2: Get base transformer state dict (already in memory from preloading!) | |
| base_dict = base_transformer.state_dict() | |
| base_sd = StateDict( | |
| sd={k: v.detach().cpu().contiguous() for k, v in base_dict.items()}, | |
| device=torch.device("cpu"), | |
| size=sum(v.nbytes for v in base_dict.values()), | |
| dtype=set(v.dtype for v in base_dict.values()) | |
| ) | |
| if progress_callback: | |
| progress_callback(0.5, "Fusing LoRAs with base weights (CPU)") | |
| # Step 3: Fuse using LTX's apply_loras function | |
| # This function handles: | |
| # - FP8 quantized weights (_fuse_delta_with_scaled_fp8 / _fuse_delta_with_cast_fp8) | |
| # - BFloat16 weights (_fuse_delta_with_bfloat16) | |
| # - Proper delta accumulation for multiple LoRAs | |
| fused_sd = apply_loras( | |
| model_sd=base_sd, | |
| lora_sd_and_strengths=lora_sd_with_strengths, | |
| dtype=torch.bfloat16 | |
| ) | |
| if progress_callback: | |
| progress_callback(0.9, "Extracting fused state dict") | |
| # Step 4: Return the fused state dict as a plain dict (for load_state_dict) | |
| return fused_sd.sd | |
| def on_prepare_loras_click( | |
| pose_strength: float, | |
| general_strength: float, | |
| motion_strength: float, | |
| dreamlay_strength: float, | |
| mself_strength: float, | |
| dramatic_strength: float, | |
| fluid_strength: float, | |
| liquid_strength: float, | |
| demopose_strength: float, | |
| voice_strength: float, | |
| realism_strength: float, | |
| transition_strength: float, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Called when user clicks the 'Prepare LoRA Cache' button. | |
| This function: | |
| 1. Checks if LoRA combination is already applied (skip if so) | |
| 2. Checks in-memory FUSED_CACHE (skip building if cached) | |
| 3. Loads LoRA files into cache (reuses LORA_SD_CACHE on subsequent calls) | |
| 4. Builds fused state dict if needed (only new combinations) | |
| 5. Applies to the preloaded transformer | |
| Only runs on button click, NOT on slider change. | |
| """ | |
| global current_lora_key, FUSED_CACHE | |
| # Debug: Verify transformer consistency | |
| ledger_transformer = ledger.transformer() | |
| pipeline_transformer = pipeline.model_ledger.transformer() | |
| print(f"[LoRA DEBUG] ledger.transformer() id: {id(ledger_transformer)}") | |
| print(f"[LoRA DEBUG] pipeline.model_ledger.transformer() id: {id(pipeline_transformer)}") | |
| print(f"[LoRA DEBUG] Same object? {ledger_transformer is pipeline_transformer}") | |
| print(f"[LoRA DEBUG] _transformer id: {id(_transformer)}") | |
| # Compute the cache key for this combination of strengths | |
| key, _ = _make_lora_key( | |
| pose_strength, general_strength, motion_strength, dreamlay_strength, | |
| mself_strength, dramatic_strength, fluid_strength, liquid_strength, | |
| demopose_strength, voice_strength, realism_strength, transition_strength | |
| ) | |
| # Already applied with these exact strengths? Nothing to do. | |
| if current_lora_key == key: | |
| return f"✓ LoRAs already applied with current strengths" | |
| progress(0.0, desc="Starting LoRA preparation") | |
| # Build the list of active (non-zero) LoRAs | |
| active_loras = [] | |
| lora_entries = [ | |
| (pose_lora_path, pose_strength, "Anthro Enhancer"), | |
| (general_lora_path, general_strength, "Reasoning Enhancer"), | |
| (motion_lora_path, motion_strength, "Anthro Posing"), | |
| (dreamlay_lora_path, dreamlay_strength, "Dreamlay"), | |
| (mself_lora_path, mself_strength, "Mself"), | |
| (dramatic_lora_path, dramatic_strength, "Dramatic"), | |
| (fluid_lora_path, fluid_strength, "Fluid Helper"), | |
| (liquid_lora_path, liquid_strength, "Liquid Helper"), | |
| (demopose_lora_path, demopose_strength, "Audio Helper"), | |
| (voice_lora_path, voice_strength, "Voice Helper"), | |
| (realism_lora_path, realism_strength, "Anthro Realism"), | |
| (transition_lora_path, transition_strength, "POV"), | |
| ] | |
| for path, strength, name in lora_entries: | |
| if float(strength) != 0.0: | |
| active_loras.append((path, float(strength))) | |
| print(f"[LoRA] Active: {name} = {strength}") | |
| if not active_loras: | |
| # No LoRAs selected - apply base model weights (reset from any previous LoRAs) | |
| print("[LoRA] No LoRAs selected, resetting to base model weights") | |
| try: | |
| transformer = ledger.transformer() | |
| target_device = next(transformer.parameters()).device | |
| # Get base weights and keep them on the same device as transformer | |
| base_weights = {k: v.to(target_device) for k, v in transformer.state_dict().items()} | |
| transformer.load_state_dict(base_weights, strict=False) | |
| current_lora_key = key | |
| progress(1.0, desc="Done") | |
| return "✓ Reset to base model (no LoRAs active)" | |
| except Exception as e: | |
| return f"✗ Reset failed: {e}" | |
| # Check in-memory cache for this strength combination | |
| if key in FUSED_CACHE: | |
| print(f"[LoRA] Using cached fused state for: {key[:16]}...") | |
| fused_state = FUSED_CACHE[key] | |
| progress(0.85, desc="Using cached fused state") | |
| else: | |
| # Need to build the fused state dict (the expensive part) | |
| print(f"[LoRA] Building new fused state dict for {len(active_loras)} LoRA(s)...") | |
| # Progress callback that maps to Gradio's progress tracker | |
| def progress_cb(step, desc): | |
| progress(0.1 + step * 0.8, desc=desc) | |
| transformer = ledger.transformer() | |
| fused_state = build_fused_state_dict(transformer, active_loras, progress_cb) | |
| # Cache the fused state for future reuse (keyed by strength combination) | |
| FUSED_CACHE[key] = fused_state | |
| print(f"[LoRA] Cached fused state for: {key[:16]}...") | |
| # Apply fused state to transformer | |
| progress(0.92, desc="Applying fused weights to transformer") | |
| try: | |
| transformer = ledger.transformer() | |
| # Determine target device - the transformer should already be on GPU | |
| target_device = next(transformer.parameters()).device | |
| # Load fused state dict directly into GPU transformer | |
| # Convert CPU tensors to GPU tensors inline, then load | |
| fused_state_gpu = {k: v.to(target_device) for k, v in fused_state.items()} | |
| missing, unexpected = transformer.load_state_dict(fused_state_gpu, strict=False) | |
| if missing: | |
| print(f"[LoRA] Warning: {len(missing)} keys not found in fused state") | |
| if unexpected: | |
| print(f"[LoRA] Warning: {len(unexpected)} unexpected keys in fused state") | |
| current_lora_key = key | |
| progress(1.0, desc="Done") | |
| return f"✓ Applied {len(active_loras)} LoRA(s) successfully" | |
| except Exception as e: | |
| import traceback | |
| print(f"[LoRA] Apply failed: {e}") | |
| print(traceback.format_exc()) | |
| # Try to restore transformer to GPU on error | |
| try: | |
| transformer = ledger.transformer() | |
| if next(transformer.parameters()).device.type == "cpu": | |
| if torch.cuda.is_available(): | |
| transformer = transformer.to("cuda") | |
| except Exception: | |
| pass | |
| return f"✗ LoRA application failed: {e}" | |
| # Preload all models for ZeroGPU tensor packing. | |
| print("Preloading all models (including Gemma and audio components)...") | |
| ledger = pipeline.model_ledger | |
| # Save the original factory methods so we can rebuild individual components later. | |
| # These are bound callables on ledger that will call the builder when invoked. | |
| _orig_transformer_factory = ledger.transformer | |
| _orig_video_encoder_factory = ledger.video_encoder | |
| _orig_video_decoder_factory = ledger.video_decoder | |
| _orig_audio_encoder_factory = ledger.audio_encoder | |
| _orig_audio_decoder_factory = ledger.audio_decoder | |
| _orig_vocoder_factory = ledger.vocoder | |
| _orig_spatial_upsampler_factory = ledger.spatial_upsampler | |
| _orig_text_encoder_factory = ledger.text_encoder | |
| _orig_gemma_embeddings_factory = ledger.gemma_embeddings_processor | |
| # Call the original factories once to create the cached instances we will serve by default. | |
| _transformer = _orig_transformer_factory() | |
| _video_encoder = _orig_video_encoder_factory() | |
| _video_decoder = _orig_video_decoder_factory() | |
| _audio_encoder = _orig_audio_encoder_factory() | |
| _audio_decoder = _orig_audio_decoder_factory() | |
| _vocoder = _orig_vocoder_factory() | |
| _spatial_upsampler = _orig_spatial_upsampler_factory() | |
| _text_encoder = _orig_text_encoder_factory() | |
| _embeddings_processor = _orig_gemma_embeddings_factory() | |
| # Replace ledger methods with lightweight lambdas that return the cached instances. | |
| # We keep the original factories above so we can call them later to rebuild components. | |
| ledger.transformer = lambda: _transformer | |
| ledger.video_encoder = lambda: _video_encoder | |
| ledger.video_decoder = lambda: _video_decoder | |
| ledger.audio_encoder = lambda: _audio_encoder | |
| ledger.audio_decoder = lambda: _audio_decoder | |
| ledger.vocoder = lambda: _vocoder | |
| ledger.spatial_upsampler = lambda: _spatial_upsampler | |
| ledger.text_encoder = lambda: _text_encoder | |
| ledger.gemma_embeddings_processor = lambda: _embeddings_processor | |
| print("All models preloaded (including Gemma text encoder and audio encoder)!") | |
| # ---- REPLACE PRELOAD BLOCK END ---- | |
| print("=" * 80) | |
| print("Pipeline ready!") | |
| print("=" * 80) | |
| def log_memory(tag: str): | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated() / 1024**3 | |
| peak = torch.cuda.max_memory_allocated() / 1024**3 | |
| free, total = torch.cuda.mem_get_info() | |
| print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB") | |
| def detect_aspect_ratio(image) -> str: | |
| if image is None: | |
| return "16:9" | |
| if hasattr(image, "size"): | |
| w, h = image.size | |
| elif hasattr(image, "shape"): | |
| h, w = image.shape[:2] | |
| else: | |
| return "16:9" | |
| ratio = w / h | |
| candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0} | |
| return min(candidates, key=lambda k: abs(ratio - candidates[k])) | |
| def on_image_upload(first_image, last_image, high_res): | |
| ref_image = first_image if first_image is not None else last_image | |
| aspect = detect_aspect_ratio(ref_image) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| def on_highres_toggle(first_image, last_image, high_res): | |
| ref_image = first_image if first_image is not None else last_image | |
| aspect = detect_aspect_ratio(ref_image) | |
| tier = "high" if high_res else "low" | |
| w, h = RESOLUTIONS[tier][aspect] | |
| return gr.update(value=w), gr.update(value=h) | |
| def get_gpu_duration( | |
| first_image, | |
| last_image, | |
| input_audio, | |
| prompt: str, | |
| negative_prompt: str, | |
| duration: float, | |
| gpu_duration: float, | |
| enhance_prompt: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = 1024, | |
| width: int = 1536, | |
| video_cfg_scale: float = 1.0, | |
| video_stg_scale: float = 0.0, | |
| video_rescale_scale: float = 0.45, | |
| video_a2v_scale: float = 3.0, | |
| audio_cfg_scale: float = 1.0, | |
| audio_stg_scale: float = 0.0, | |
| audio_rescale_scale: float = 1.0, | |
| audio_v2a_scale: float = 3.0, | |
| pose_strength: float = 0.0, | |
| general_strength: float = 0.0, | |
| motion_strength: float = 0.0, | |
| dreamlay_strength: float = 0.0, | |
| mself_strength: float = 0.0, | |
| dramatic_strength: float = 0.0, | |
| fluid_strength: float = 0.0, | |
| liquid_strength: float = 0.0, | |
| demopose_strength: float = 0.0, | |
| voice_strength: float = 0.0, | |
| realism_strength: float = 0.0, | |
| transition_strength: float = 0.0, | |
| progress=None, | |
| ): | |
| return int(gpu_duration) | |
| def generate_video( | |
| first_image, | |
| last_image, | |
| input_audio, | |
| prompt: str, | |
| negative_prompt: str, | |
| duration: float, | |
| gpu_duration: float, | |
| enhance_prompt: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = 1024, | |
| width: int = 1536, | |
| video_cfg_scale: float = 1.0, | |
| video_stg_scale: float = 0.0, | |
| video_rescale_scale: float = 0.45, | |
| video_a2v_scale: float = 3.0, | |
| audio_cfg_scale: float = 1.0, | |
| audio_stg_scale: float = 0.0, | |
| audio_rescale_scale: float = 1.0, | |
| audio_v2a_scale: float = 3.0, | |
| pose_strength: float = 0.0, | |
| general_strength: float = 0.0, | |
| motion_strength: float = 0.0, | |
| dreamlay_strength: float = 0.0, | |
| mself_strength: float = 0.0, | |
| dramatic_strength: float = 0.0, | |
| fluid_strength: float = 0.0, | |
| liquid_strength: float = 0.0, | |
| demopose_strength: float = 0.0, | |
| voice_strength: float = 0.0, | |
| realism_strength: float = 0.0, | |
| transition_strength: float = 0.0, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| try: | |
| torch.cuda.reset_peak_memory_stats() | |
| log_memory("start") | |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
| frame_rate = DEFAULT_FRAME_RATE | |
| num_frames = int(duration * frame_rate) + 1 | |
| num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 | |
| print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}") | |
| images = [] | |
| output_dir = Path("outputs") | |
| output_dir.mkdir(exist_ok=True) | |
| if first_image is not None: | |
| temp_first_path = output_dir / f"temp_first_{current_seed}.jpg" | |
| if hasattr(first_image, "save"): | |
| first_image.save(temp_first_path) | |
| else: | |
| temp_first_path = Path(first_image) | |
| images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0)) | |
| if last_image is not None: | |
| temp_last_path = output_dir / f"temp_last_{current_seed}.jpg" | |
| if hasattr(last_image, "save"): | |
| last_image.save(temp_last_path) | |
| else: | |
| temp_last_path = Path(last_image) | |
| images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0)) | |
| tiling_config = TilingConfig.default() | |
| video_chunks_number = get_video_chunks_number(num_frames, tiling_config) | |
| video_guider_params = MultiModalGuiderParams( | |
| cfg_scale=video_cfg_scale, | |
| stg_scale=video_stg_scale, | |
| rescale_scale=video_rescale_scale, | |
| modality_scale=video_a2v_scale, | |
| skip_step=0, | |
| stg_blocks=[], | |
| ) | |
| audio_guider_params = MultiModalGuiderParams( | |
| cfg_scale=audio_cfg_scale, | |
| stg_scale=audio_stg_scale, | |
| rescale_scale=audio_rescale_scale, | |
| modality_scale=audio_v2a_scale, | |
| skip_step=0, | |
| stg_blocks=[], | |
| ) | |
| log_memory("before pipeline call") | |
| video, audio = pipeline( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| seed=current_seed, | |
| height=int(height), | |
| width=int(width), | |
| num_frames=num_frames, | |
| frame_rate=frame_rate, | |
| video_guider_params=video_guider_params, | |
| audio_guider_params=audio_guider_params, | |
| images=images, | |
| audio_path=input_audio, | |
| tiling_config=tiling_config, | |
| enhance_prompt=enhance_prompt, | |
| ) | |
| log_memory("after pipeline call") | |
| output_path = tempfile.mktemp(suffix=".mp4") | |
| encode_video( | |
| video=video, | |
| fps=frame_rate, | |
| audio=audio, | |
| output_path=output_path, | |
| video_chunks_number=video_chunks_number, | |
| ) | |
| log_memory("after encode_video") | |
| return str(output_path), current_seed | |
| except Exception as e: | |
| import traceback | |
| log_memory("on error") | |
| print(f"Error: {str(e)}\n{traceback.format_exc()}") | |
| return None, current_seed | |
| # ============================================================================= | |
| # Gradio UI | |
| # ============================================================================= | |
| css = """ | |
| .fillable {max-width: 1200px !important} | |
| .progress-text {color: black} | |
| """ | |
| with gr.Blocks(title="LTX-2.3 Distilled with LoRAs, Negative Prompting, and Advanced Settings") as demo: | |
| gr.Markdown("# LTX-2.3 Two-Stage HQ Video Generation") | |
| gr.Markdown( | |
| "High-quality text/image-to-video with cached LoRA state + CFG guidance. " | |
| "[[Model]](https://huggingface.co/Lightricks/LTX-2.3)" | |
| ) | |
| with gr.Row(): | |
| # LEFT SIDE: Input Controls | |
| with gr.Column(): | |
| with gr.Row(): | |
| first_image = gr.Image(label="First Frame (Optional)", type="pil") | |
| last_image = gr.Image(label="Last Frame (Optional)", type="pil") | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| value="Make this image come alive with cinematic motion, smooth animation", | |
| lines=3, | |
| placeholder="Describe the motion and animation you want...", | |
| ) | |
| negative_prompt = gr.Textbox( | |
| label="Negative Prompt", | |
| value="worst quality, inconsistent motion, blurry, jittery, distorted, deformed, artifacts, text, watermark, logo, frame, border, low resolution, pixelated, unnatural, fake, CGI, cartoon", | |
| lines=2, | |
| ) | |
| duration = gr.Slider( | |
| label="Duration (seconds)", | |
| minimum=1.0, maximum=30.0, value=10.0, step=0.1, | |
| ) | |
| with gr.Row(): | |
| seed = gr.Number(label="Seed", value=42, precision=0, minimum=0, maximum=MAX_SEED) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) | |
| with gr.Row(): | |
| high_res = gr.Checkbox(label="High Resolution", value=True) | |
| enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False) | |
| with gr.Row(): | |
| width = gr.Number(label="Width", value=1536, precision=0) | |
| height = gr.Number(label="Height", value=1024, precision=0) | |
| generate_btn = gr.Button("Generate Video", variant="primary", size="lg") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| gr.Markdown("### Video Guidance Parameters") | |
| with gr.Row(): | |
| video_cfg_scale = gr.Slider( | |
| label="Video CFG Scale", minimum=1.0, maximum=10.0, value=1.0, step=0.1 | |
| ) | |
| video_stg_scale = gr.Slider( | |
| label="Video STG Scale", minimum=0.0, maximum=2.0, value=0.0, step=0.1 | |
| ) | |
| with gr.Row(): | |
| video_rescale_scale = gr.Slider( | |
| label="Video Rescale", minimum=0.0, maximum=2.0, value=0.45, step=0.1 | |
| ) | |
| video_a2v_scale = gr.Slider( | |
| label="A2V Scale", minimum=0.0, maximum=5.0, value=3.0, step=0.1 | |
| ) | |
| gr.Markdown("### Audio Guidance Parameters") | |
| with gr.Row(): | |
| audio_cfg_scale = gr.Slider( | |
| label="Audio CFG Scale", minimum=1.0, maximum=15.0, value=1.0, step=0.1 | |
| ) | |
| audio_stg_scale = gr.Slider( | |
| label="Audio STG Scale", minimum=0.0, maximum=2.0, value=0.0, step=0.1 | |
| ) | |
| with gr.Row(): | |
| audio_rescale_scale = gr.Slider( | |
| label="Audio Rescale", minimum=0.0, maximum=2.0, value=1.0, step=0.1 | |
| ) | |
| audio_v2a_scale = gr.Slider( | |
| label="V2A Scale", minimum=0.0, maximum=5.0, value=3.0, step=0.1 | |
| ) | |
| with gr.Row(): | |
| input_audio = gr.Audio(label="Audio Input (Optional)", type="filepath") | |
| # RIGHT SIDE: Output and LoRA | |
| with gr.Column(): | |
| output_video = gr.Video(label="Generated Video", autoplay=False) | |
| gpu_duration = gr.Slider( | |
| label="ZeroGPU duration (seconds)", | |
| minimum=30.0, maximum=240.0, value=90.0, step=1.0, | |
| info="Increase for longer videos, higher resolution, or LoRA usage" | |
| ) | |
| gr.Markdown("### LoRA Adapter Strengths") | |
| gr.Markdown("Set to 0 to disable, then click 'Prepare LoRA Cache'") | |
| with gr.Row(): | |
| pose_strength = gr.Slider(label="Anthro Enhancer", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| gr.Markdown("") # Spacer for alignment | |
| with gr.Row(): | |
| general_strength = gr.Slider(label="Reasoning Enhancer", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| motion_strength = gr.Slider(label="Anthro Posing", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| with gr.Row(): | |
| dreamlay_strength = gr.Slider(label="Dreamlay", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| mself_strength = gr.Slider(label="Mself", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| with gr.Row(): | |
| dramatic_strength = gr.Slider(label="Dramatic", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| fluid_strength = gr.Slider(label="Fluid Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| with gr.Row(): | |
| liquid_strength = gr.Slider(label="Liquid Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| demopose_strength = gr.Slider(label="Audio Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| with gr.Row(): | |
| voice_strength = gr.Slider(label="Voice Helper", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| realism_strength = gr.Slider(label="Anthro Realism", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| with gr.Row(): | |
| transition_strength = gr.Slider(label="POV", minimum=0.0, maximum=2.0, value=0.0, step=0.01) | |
| gr.Markdown("") # Spacer for alignment | |
| prepare_lora_btn = gr.Button("Prepare / Load LoRA Cache", variant="secondary") | |
| lora_status = gr.Textbox( | |
| label="LoRA Cache Status", | |
| value="No LoRA state prepared yet.", | |
| interactive=False, | |
| ) | |
| # Event handlers | |
| first_image.change(fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height]) | |
| last_image.change(fn=on_image_upload, inputs=[first_image, last_image, high_res], outputs=[width, height]) | |
| high_res.change(fn=on_highres_toggle, inputs=[first_image, last_image, high_res], outputs=[width, height]) | |
| prepare_lora_btn.click( | |
| fn=on_prepare_loras_click, | |
| inputs=[pose_strength, general_strength, motion_strength, dreamlay_strength, | |
| mself_strength, dramatic_strength, fluid_strength, liquid_strength, | |
| demopose_strength, voice_strength, realism_strength, transition_strength], | |
| outputs=[lora_status], | |
| ) | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[ | |
| first_image, last_image, input_audio, prompt, negative_prompt, duration, gpu_duration, | |
| enhance_prompt, seed, randomize_seed, height, width, | |
| video_cfg_scale, video_stg_scale, video_rescale_scale, video_a2v_scale, | |
| audio_cfg_scale, audio_stg_scale, audio_rescale_scale, audio_v2a_scale, | |
| pose_strength, general_strength, motion_strength, | |
| dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, | |
| liquid_strength, demopose_strength, voice_strength, realism_strength, | |
| transition_strength, | |
| ], | |
| outputs=[output_video, seed], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(theme=gr.themes.Citrus(), css=css, mcp_server=False) |