Spaces:
Paused
Paused
| # PyTorch 2.8 (temporary hack) | |
| import os | |
| os.system('pip install --upgrade --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu126 "torch<2.9"') | |
| from huggingface_hub import HfApi, upload_file | |
| import os | |
| import uuid | |
| import subprocess | |
| import tempfile | |
| import logging | |
| import shutil | |
| import os | |
| from huggingface_hub import HfApi, upload_file | |
| from datetime import datetime | |
| import uuid | |
| # Actual demo code | |
| import spaces | |
| import torch | |
| from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline | |
| from diffusers.models.transformers.transformer_wan import WanTransformer3DModel | |
| from diffusers.utils.export_utils import export_to_video | |
| import gradio as gr | |
| import tempfile | |
| import numpy as np | |
| from PIL import Image | |
| import random | |
| import gc | |
| from optimization import optimize_pipeline_ | |
| from huggingface_hub import hf_hub_download | |
| MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-Diffusers" | |
| HF_MODEL = os.environ.get("HF_UPLOAD_REPO", "rahul7star/wan22lora-text-img-video-analysis") | |
| from huggingface_hub import HfApi, upload_file | |
| import os | |
| import uuid | |
| import os | |
| import uuid | |
| import logging | |
| from datetime import datetime | |
| def upscale_and_upload_4k(input_video_path: str, input_image, summary_text: str) -> str: | |
| """ | |
| Upscale a video to 4K and upload it to Hugging Face Hub along with the input image and a text summary. | |
| Args: | |
| input_video_path (str): Path to the original video. | |
| input_image (PIL.Image.Image or path-like): Input image to upload alongside the video. | |
| summary_text (str): Text summary or prompt to upload alongside the video. | |
| Returns: | |
| str: Hugging Face folder path where the video, image, and summary were uploaded. | |
| """ | |
| logging.info(f"Upscaling video to 4K for upload: {input_video_path}") | |
| # --- Upscale video --- | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled: | |
| upscaled_path = tmp_upscaled.name | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", input_video_path, | |
| "-vf", "scale=3840:2160:flags=lanczos", | |
| "-c:v", "libx264", | |
| "-crf", "18", | |
| "-preset", "slow", | |
| "-y", | |
| upscaled_path, | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| logging.info(f"✅ Upscaled video created at: {upscaled_path}") | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"FFmpeg failed:\n{e.stderr.decode()}") | |
| raise | |
| # --- Create HF folder --- | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| unique_subfolder = f"upload_{uuid.uuid4().hex[:8]}" | |
| hf_folder = f"{today_str}-WAN-I2V/{unique_subfolder}" | |
| # --- Upload video --- | |
| video_filename = os.path.basename(input_video_path) | |
| video_hf_path = f"{hf_folder}/{video_filename}" | |
| upload_file( | |
| path_or_fileobj=upscaled_path, | |
| path_in_repo=video_hf_path, | |
| repo_id=HF_MODEL, | |
| repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
| ) | |
| logging.info(f"✅ Uploaded 4K video to HF: {video_hf_path}") | |
| # --- Upload input image --- | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_img: | |
| if isinstance(input_image, str): | |
| import shutil | |
| shutil.copy(input_image, tmp_img.name) | |
| else: | |
| input_image.save(tmp_img.name, format="PNG") | |
| tmp_img_path = tmp_img.name | |
| image_hf_path = f"{hf_folder}/input_image.png" | |
| upload_file( | |
| path_or_fileobj=tmp_img_path, | |
| path_in_repo=image_hf_path, | |
| repo_id=HF_MODEL, | |
| repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
| ) | |
| logging.info(f"✅ Uploaded input image to HF: {image_hf_path}") | |
| # --- Upload summary text --- | |
| summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name | |
| with open(summary_file, "w", encoding="utf-8") as f: | |
| f.write(summary_text) | |
| summary_hf_path = f"{hf_folder}/summary.txt" | |
| upload_file( | |
| path_or_fileobj=summary_file, | |
| path_in_repo=summary_hf_path, | |
| repo_id=HF_MODEL, | |
| repo_type="model", | |
| token=os.environ.get("HUGGINGFACE_HUB_TOKEN"), | |
| ) | |
| logging.info(f"✅ Uploaded summary to HF: {summary_hf_path}") | |
| # --- Cleanup temporary files --- | |
| os.remove(upscaled_path) | |
| os.remove(tmp_img_path) | |
| os.remove(summary_file) | |
| return hf_folder | |
| LORA_REPO_ID = "rahul7star/wan2.2Lora" | |
| LORA_SETS = { | |
| "NF": { | |
| "high_noise": {"file": "DR34ML4Y_I2V_14B_HIGH.safetensors", "adapter_name": "nf_high"}, | |
| "low_noise": {"file": "DR34ML4Y_I2V_14B_LOW.safetensors", "adapter_name": "nf_low"} | |
| }, | |
| "BP": { | |
| "high_noise": {"file": "Wan2.2_BP-v1-HighNoise-I2V_T2V.safetensors", "adapter_name": "bp_high"}, | |
| "low_noise": {"file": "Wan2.2_BP-v1-LowNoise-I2V_T2V.safetensors", "adapter_name": "bp_low"} | |
| }, | |
| "Py-v1": { | |
| "high_noise": {"file": "wan2.2_i2v_highnoise_pov_missionary_v1.0.safetensors", "adapter_name": "py_high"}, | |
| "low_noise": {"file": "wan2.2_i2v_lownoise_pov_missionary_v1.0.safetensors", "adapter_name": "py_low"} | |
| } | |
| } | |
| LANDSCAPE_WIDTH = 832 | |
| LANDSCAPE_HEIGHT = 576 | |
| MAX_SEED = np.iinfo(np.int32).max | |
| FIXED_FPS = 16 | |
| MIN_FRAMES_MODEL = 8 | |
| MAX_FRAMES_MODEL = 81 | |
| MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS,1) | |
| MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS,1) | |
| pipe = WanImageToVideoPipeline.from_pretrained(MODEL_ID, | |
| transformer=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', | |
| subfolder='transformer', | |
| torch_dtype=torch.bfloat16, | |
| device_map='cuda', | |
| ), | |
| transformer_2=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers', | |
| subfolder='transformer_2', | |
| torch_dtype=torch.bfloat16, | |
| device_map='cuda', | |
| ), | |
| torch_dtype=torch.bfloat16, | |
| ).to('cuda') | |
| optimize_pipeline_(pipe, | |
| image=Image.new('RGB', (LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT)), | |
| prompt='prompt', | |
| height=LANDSCAPE_HEIGHT, | |
| width=LANDSCAPE_WIDTH, | |
| num_frames=MAX_FRAMES_MODEL, | |
| ) | |
| for name, lora_set in LORA_SETS.items(): | |
| print(f"---LoRA 集合: {name} ---") | |
| # 加载 High Noise | |
| high_noise_config = lora_set["high_noise"] | |
| print(f"High Noise: {high_noise_config['file']}...") | |
| pipe.load_lora_weights(LORA_REPO_ID, weight_name=high_noise_config['file'], adapter_name=high_noise_config['adapter_name']) | |
| print("High Noise LoRA 加载完成。") | |
| # 加载 Low Noise | |
| low_noise_config = lora_set["low_noise"] | |
| print(f" Low Noise: {low_noise_config['file']}...") | |
| pipe.load_lora_weights(LORA_REPO_ID, weight_name=low_noise_config['file'], adapter_name=low_noise_config['adapter_name']) | |
| print("Low Noise LoRA ") | |
| print("。") | |
| for i in range(3): | |
| gc.collect() | |
| torch.cuda.synchronize() | |
| torch.cuda.empty_cache() | |
| default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation" | |
| default_negative_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走" | |
| def resize_image(image: Image.Image) -> Image.Image: | |
| if image.height > image.width: | |
| transposed = image.transpose(Image.Transpose.ROTATE_90) | |
| resized = resize_image_landscape(transposed) | |
| return resized.transpose(Image.Transpose.ROTATE_270) | |
| return resize_image_landscape(image) | |
| def resize_image_landscape(image: Image.Image) -> Image.Image: | |
| target_aspect = LANDSCAPE_WIDTH / LANDSCAPE_HEIGHT | |
| width, height = image.size | |
| in_aspect = width / height | |
| if in_aspect > target_aspect: | |
| new_width = round(height * target_aspect) | |
| left = (width - new_width) // 2 | |
| image = image.crop((left, 0, left + new_width, height)) | |
| else: | |
| new_height = round(width / target_aspect) | |
| top = (height - new_height) // 2 | |
| image = image.crop((0, top, width, top + new_height)) | |
| return image.resize((LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT), Image.LANCZOS) | |
| def get_duration( | |
| input_image, | |
| prompt, | |
| steps, | |
| negative_prompt, | |
| duration_seconds, | |
| guidance_scale, | |
| guidance_scale_2, | |
| seed, | |
| randomize_seed, | |
| selected_loras, | |
| progress, | |
| ): | |
| return int(steps) * 15 | |
| def generate_video( | |
| input_image, | |
| prompt, | |
| steps = 4, | |
| negative_prompt=default_negative_prompt, | |
| duration_seconds = MAX_DURATION, | |
| guidance_scale = 1, | |
| guidance_scale_2 = 1, | |
| seed = 42, | |
| randomize_seed = False, | |
| selected_loras = [], | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| if input_image is None: | |
| raise gr.Error("Please upload an input image.") | |
| print("potmpt is ") | |
| print(prompt) | |
| num_frames = np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL) | |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
| resized_image = resize_image(input_image) | |
| num_inference_steps = int(steps) | |
| switch_step = num_inference_steps // 2 | |
| class LoraSwitcher: | |
| def __init__(self, selected_lora_names): | |
| self.switched = False | |
| self.high_noise_adapters = [] | |
| self.low_noise_adapters = [] | |
| if selected_lora_names: | |
| for name in selected_lora_names: | |
| if name in LORA_SETS: | |
| self.high_noise_adapters.append(LORA_SETS[name]["high_noise"]["adapter_name"]) | |
| self.low_noise_adapters.append(LORA_SETS[name]["low_noise"]["adapter_name"]) | |
| def __call__(self, pipe, step_index, timestep, callback_kwargs): | |
| # LoRA 状态 | |
| if step_index == 0: | |
| self.switched = False | |
| # LoRA,则激活 High Noise 版本 | |
| if self.high_noise_adapters: | |
| print(f"激活 High Noise LoRA: {self.high_noise_adapters}") | |
| pipe.set_adapters(self.high_noise_adapters, adapter_weights=[1.0] * len(self.high_noise_adapters)) | |
| # 🔥 同时 fuse_lora | |
| try: | |
| print(f"Fuse High Noise LoRA: {self.high_noise_adapters}") | |
| pipe.fuse_lora() | |
| except Exception as e: | |
| print(f"Fuse High Noise LoRA 失败: {e}") | |
| # LoRA,则通过将权重设为0来禁用任何可能残留的 LoRA | |
| elif pipe.get_active_adapters(): | |
| active_adapters = pipe.get_active_adapters() | |
| print(f"未选择 LoRA,通过设置权重为0来禁用残留的 LoRA: {active_adapters}") | |
| pipe.set_adapters(active_adapters, adapter_weights=[0.0] * len(active_adapters)) | |
| #Low Noise LoRA(仅当有 LoRA 被选择时) | |
| if self.low_noise_adapters and step_index >= switch_step and not self.switched: | |
| print(f"在第 {step_index} 步切换到 Low Noise LoRA: {self.low_noise_adapters}") | |
| pipe.set_adapters(self.low_noise_adapters, adapter_weights=[1.0] * len(self.low_noise_adapters)) | |
| try: | |
| print(f"Fuse Low Noise LoRA: {self.low_noise_adapters}") | |
| pipe.fuse_lora() | |
| except Exception as e: | |
| print(f"Fuse Low Noise LoRA 失败: {e}") | |
| self.switched = True | |
| return callback_kwargs | |
| lora_switcher_callback = LoraSwitcher(selected_loras) | |
| output_frames_list = pipe( | |
| image=resized_image, | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| height=resized_image.height, | |
| width=resized_image.width, | |
| num_frames=num_frames, | |
| guidance_scale=float(guidance_scale), | |
| guidance_scale_2=float(guidance_scale_2), | |
| num_inference_steps=num_inference_steps, | |
| generator=torch.Generator(device="cuda").manual_seed(current_seed), | |
| callback_on_step_end=lora_switcher_callback, | |
| ).frames[0] | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: | |
| video_path = tmpfile.name | |
| export_to_video(output_frames_list, video_path, fps=FIXED_FPS) | |
| #upscale_and_upload_4k(video_path, input_image, prompt) | |
| return video_path, current_seed | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Fast 4 steps Wan 2.2 I2V (14B) with Lightning LoRA") | |
| gr.Markdown("run Wan 2.2 in just 4-8 steps, with [Lightning LoRA](https://huggingface.co/Kijai/WanVideo_comfy/tree/main/Wan22-Lightning), fp8 quantization & AoT compilation - compatible with 🧨 diffusers and ZeroGPU⚡️") | |
| with gr.Row(): # ensures columns align in height | |
| with gr.Column(): | |
| input_image_component = gr.Image( | |
| type="pil", | |
| label="Input Image (auto-resized to target H/W)", | |
| interactive=True, | |
| elem_classes=["flex-image"] | |
| ) | |
| prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v) | |
| duration_seconds_input = gr.Slider( | |
| minimum=MIN_DURATION, | |
| maximum=MAX_DURATION, | |
| step=0.1, | |
| value=3.5, | |
| label="Duration (seconds)", | |
| info=f"Clamped to model's {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} frames at {FIXED_FPS}fps." | |
| ) | |
| lora_selection_checkbox = gr.CheckboxGroup( | |
| choices=list(LORA_SETS.keys()), | |
| label="选择要应用的 LoRA (可多选)", | |
| info="选择一个或多个 LoRA 风格进行组合。" | |
| ) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_negative_prompt, lines=3) | |
| seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True) | |
| randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True) | |
| steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps") | |
| guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale - high noise stage") | |
| guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2 - low noise stage") | |
| generate_button = gr.Button("Generate Video", variant="primary") | |
| with gr.Column(): | |
| video_output = gr.Video(label="Generated Video", autoplay=True, interactive=False, elem_classes=["stretch-video"]) | |
| ui_inputs = [ | |
| input_image_component, prompt_input, steps_slider, | |
| negative_prompt_input, duration_seconds_input, | |
| guidance_scale_input, guidance_scale_2_input, seed_input, randomize_seed_checkbox, | |
| lora_selection_checkbox | |
| ] | |
| generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, seed_input]) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |