Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import tempfile | |
import os | |
import sys | |
import shutil | |
from pathlib import Path | |
import time | |
# 输出 Gradio 版本信息 | |
print(f"===== Application Startup at {time.strftime('%Y-%m-%d %H:%M:%S')} =====") | |
print(f"Gradio version: {gr.__version__}") | |
print(f"Python version: {sys.version}") | |
print(f"Python executable: {sys.executable}") | |
print("=" * 60) | |
class Wan2S2VPipeline: | |
def __init__(self): | |
self.model_loaded = False | |
self.model_path = None | |
self.script_path = None | |
self.ckpt_dir = None | |
self.model_repo = "Wan-AI/Wan2.2-S2V-14B" | |
def load_model(self): | |
"""下载Wan2.2-S2V-14B模型和脚本""" | |
try: | |
if self.model_loaded: | |
return True, "模型已加载" | |
# 设置工作目录(使用持久目录) | |
work_dir = "/tmp/wan2.2" | |
os.makedirs(work_dir, exist_ok=True) | |
# 步骤1: 克隆官方代码仓库 | |
print("步骤1: 克隆官方代码仓库...") | |
repo_path = os.path.join(work_dir, "Wan2.2") | |
if not os.path.exists(os.path.join(repo_path, ".git")): | |
# 如果目录不存在或不是git仓库,则克隆 | |
if os.path.exists(repo_path): | |
shutil.rmtree(repo_path) | |
result = subprocess.run( | |
["git", "clone", "https://github.com/Wan-Video/Wan2.2.git", repo_path], | |
capture_output=True, | |
text=True, | |
timeout=300 | |
) | |
if result.returncode != 0: | |
return False, f"❌ 克隆代码仓库失败: {result.stderr}" | |
print("✅ 代码仓库克隆成功") | |
else: | |
print("✅ 代码仓库已存在,跳过克隆") | |
# 步骤2: 下载模型权重 | |
print("步骤2: 下载模型权重...") | |
model_dir = os.path.join(work_dir, "Wan2.2-S2V-14B") | |
if not os.path.exists(model_dir): | |
from huggingface_hub import snapshot_download | |
print(f"正在下载模型 {self.model_repo}...") | |
model_path = snapshot_download( | |
repo_id=self.model_repo, | |
cache_dir="/tmp/hf_cache", | |
local_dir=model_dir, | |
local_dir_use_symlinks=False | |
) | |
print(f"✅ 模型权重下载完成: {model_path}") | |
else: | |
print("✅ 模型权重已存在,跳过下载") | |
# 步骤3: 安装依赖 | |
print("步骤3: 安装依赖...") | |
requirements_file = os.path.join(repo_path, "requirements.txt") | |
if os.path.exists(requirements_file): | |
try: | |
result = subprocess.run( | |
[sys.executable, "-m", "pip", "install", "-r", requirements_file], | |
capture_output=True, | |
text=True, | |
timeout=600, | |
cwd=repo_path | |
) | |
if result.returncode == 0: | |
print("✅ 依赖安装成功") | |
else: | |
print(f"⚠️ 依赖安装警告: {result.stderr}") | |
except Exception as e: | |
print(f"⚠️ 依赖安装跳过: {e}") | |
else: | |
print("⚠️ 未找到 requirements.txt,跳过依赖安装") | |
# 步骤4: 设置路径 | |
self.model_path = repo_path | |
self.script_path = os.path.join(repo_path, "generate.py") | |
self.ckpt_dir = model_dir | |
# 验证文件 | |
if not os.path.exists(self.script_path): | |
return False, "❌ 未找到 generate.py 脚本" | |
if not os.path.exists(self.ckpt_dir): | |
return False, "❌ 未找到模型权重目录" | |
self.model_loaded = True | |
print("🎉 Wan2.2-S2V-14B 模型准备完成!") | |
return True, "✅ 模型加载成功!" | |
except Exception as e: | |
error_msg = f"模型加载失败: {str(e)}" | |
print(error_msg) | |
return False, f"❌ {error_msg}" | |
def generate(self, task, size, prompt, image_file, audio_file, | |
num_frames=16, guidance_scale=7.5, | |
num_inference_steps=20, seed=-1, offload_model=True, | |
convert_model_dtype=True): | |
"""执行Wan2.2-S2V-14B生成命令""" | |
try: | |
if not self.model_loaded: | |
success, message = self.load_model() | |
if not success: | |
return None, message | |
# 设置环境变量解决 OMP_NUM_THREADS 问题 | |
env = os.environ.copy() | |
env["OMP_NUM_THREADS"] = "1" | |
env["TOKENIZERS_PARALLELISM"] = "false" | |
# 验证必需参数 | |
if not prompt or not prompt.strip(): | |
return None, "❌ 提示词不能为空" | |
if not image_file: | |
return None, "❌ 请上传输入图片" | |
if not audio_file: | |
return None, "❌ 请上传输入音频" | |
# 构建命令行参数 | |
cmd = [sys.executable, self.script_path] | |
# 必需参数 | |
cmd.extend(["--task", task]) | |
cmd.extend(["--size", size]) | |
cmd.extend(["--ckpt_dir", self.ckpt_dir]) | |
cmd.extend(["--prompt", prompt]) | |
cmd.extend(["--image", image_file]) | |
cmd.extend(["--audio", audio_file]) | |
# 可选参数 | |
if num_frames is not None: | |
cmd.extend(["--frame_num", str(num_frames)]) | |
# 使用 infer_frames 替代 fps 参数 | |
cmd.extend(["--infer_frames", str(num_frames)]) | |
if guidance_scale is not None: | |
cmd.extend(["--sample_guide_scale", str(guidance_scale)]) | |
if num_inference_steps is not None: | |
cmd.extend(["--sample_steps", str(num_inference_steps)]) | |
if seed is not None and seed != -1: | |
cmd.extend(["--base_seed", str(seed)]) | |
# 模型优化参数 | |
if offload_model: | |
cmd.extend(["--offload_model", "True"]) | |
else: | |
cmd.extend(["--offload_model", "False"]) | |
if convert_model_dtype: | |
cmd.append("--convert_model_dtype") | |
print(f"执行命令: {' '.join(cmd)}") | |
# 创建临时输出目录 | |
output_dir = os.path.join(self.model_path, "outputs") | |
os.makedirs(output_dir, exist_ok=True) | |
# 执行命令(实时输出日志) | |
start_time = time.time() | |
print("🚀 开始执行 generate.py 脚本...") | |
print("=" * 50) | |
# 使用 Popen 实现实时日志输出 | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # 将 stderr 重定向到 stdout | |
text=True, | |
bufsize=1, # 行缓冲 | |
cwd=self.model_path, | |
env=env | |
) | |
# 实时读取输出(带超时检查) | |
all_output = [] | |
start_read_time = time.time() | |
timeout_seconds = 3600 # 10分钟超时 | |
while True: | |
# 检查是否超时 | |
if time.time() - start_read_time > timeout_seconds: | |
process.terminate() # 尝试优雅终止 | |
try: | |
process.wait(timeout=10) # 等待10秒 | |
except subprocess.TimeoutExpired: | |
process.kill() # 强制终止 | |
raise subprocess.TimeoutExpired(cmd, timeout_seconds) | |
# 尝试读取输出(非阻塞) | |
output_line = process.stdout.readline() | |
if output_line == '' and process.poll() is not None: | |
break | |
if output_line: | |
output_line = output_line.strip() | |
if output_line: # 忽略空行 | |
print(f"[generate.py] {output_line}") | |
all_output.append(output_line) | |
# 重置超时计时器(有输出说明脚本还在运行) | |
start_read_time = time.time() | |
# 等待进程完成 | |
return_code = process.wait() | |
execution_time = time.time() - start_time | |
print("=" * 50) | |
print(f"脚本执行完成,返回码: {return_code}") | |
print(f"总耗时: {execution_time:.1f}秒") | |
if return_code == 0: | |
print("✅ 命令执行成功") | |
# 构建详细的成功消息 | |
success_msg = f"✅ 生成成功!耗时: {execution_time:.1f}秒\n\n" | |
if all_output: | |
success_msg += f"脚本输出:\n" + "\n".join(all_output) + "\n" | |
# 查找输出文件 | |
output_files = self._find_output_files() | |
if output_files: | |
# 直接返回原始输出文件路径 | |
output_file = output_files[0] | |
print(f"找到输出文件: {output_file}") | |
return output_file, success_msg | |
else: | |
return None, f"⚠️ 生成成功但未找到输出文件\n\n脚本输出:\n" + "\n".join(all_output) | |
else: | |
# 构建详细的错误消息 | |
error_msg = f"脚本执行失败,返回码: {return_code}\n\n" | |
if all_output: | |
error_msg += f"脚本输出:\n" + "\n".join(all_output) | |
else: | |
error_msg += "无输出信息" | |
print(f"❌ 命令执行失败: {error_msg}") | |
return None, f"❌ 生成失败:\n{error_msg}" | |
except subprocess.TimeoutExpired: | |
return None, "⏰ 生成超时(10分钟),请尝试减少参数或检查模型状态" | |
except Exception as e: | |
error_msg = f"执行失败: {str(e)}" | |
print(error_msg) | |
return None, f"❌ {error_msg}" | |
def _find_output_files(self): | |
"""查找输出文件""" | |
output_extensions = ['.mp4', '.gif', '.avi', '.mov', '.png', '.jpg', '.jpeg'] | |
output_files = [] | |
# 优先搜索 outputs 目录 | |
outputs_dir = os.path.join(self.model_path, "outputs") | |
if os.path.exists(outputs_dir): | |
for ext in output_extensions: | |
for file_path in Path(outputs_dir).rglob(f"*{ext}"): | |
if file_path.is_file(): | |
output_files.append(str(file_path)) | |
print(f"在 outputs 目录找到文件: {file_path}") | |
# 如果没有找到,搜索整个模型目录 | |
if not output_files: | |
print("在 outputs 目录未找到文件,搜索整个模型目录...") | |
for ext in output_extensions: | |
for file_path in Path(self.model_path).rglob(f"*{ext}"): | |
if file_path.is_file(): | |
# 排除一些不需要的文件 | |
file_path_str = str(file_path) | |
if not any(exclude in file_path_str.lower() for exclude in ['.git', '__pycache__', 'node_modules']): | |
output_files.append(file_path_str) | |
print(f"在模型目录找到文件: {file_path_str}") | |
# 按修改时间排序,最新的文件在前面 | |
if output_files: | |
output_files.sort(key=lambda x: os.path.getmtime(x), reverse=True) | |
print(f"找到 {len(output_files)} 个输出文件,按时间排序") | |
return output_files | |
def _copy_output_for_display(self, output_file): | |
"""复制输出文件到临时目录以便Gradio显示(已弃用)""" | |
# 此方法已不再使用,直接返回原始文件路径 | |
print(f"直接使用原始文件: {output_file}") | |
return output_file | |
# 创建全局实例 | |
pipeline = Wan2S2VPipeline() | |
def generate_interface(task, size, prompt, image_file, audio_file, | |
num_frames, guidance_scale, num_inference_steps, | |
seed, offload_model, convert_model_dtype): | |
"""Gradio 界面函数""" | |
# 执行生成 | |
result, message = pipeline.generate( | |
task=task, | |
size=size, | |
prompt=prompt, | |
image_file=image_file, | |
audio_file=audio_file, | |
num_frames=num_frames, | |
guidance_scale=guidance_scale, | |
num_inference_steps=num_inference_steps, | |
seed=seed, | |
offload_model=offload_model, | |
convert_model_dtype=convert_model_dtype | |
) | |
return result, message | |
def load_model_interface(): | |
"""加载模型界面函数""" | |
success, message = pipeline.load_model() | |
return message | |
# 创建 Gradio 界面 | |
with gr.Blocks(title="Wan2.2-S2V-14B 视频生成器") as demo: | |
gr.Markdown(""" | |
# 使用前说明:本项目无法正常运行是因为没有选择GPU部署 | |
# 完整的运行,请参考工程Files或者复制这个space,部署时最低选择 Nvidia 1xL40S 48G VRAM | |
# 🎬 Wan2.2-S2V-14B 视频生成器 | |
**模型介绍**: Wan2.2-S2V-14B 是一个强大的图像到视频生成模型,支持音频引导。 | |
**使用方法**: | |
1. 点击"🚀 加载模型"按钮下载模型 | |
2. 填写提示词、上传图片和音频 | |
3. 调整参数后点击"🎬 开始生成" | |
**注意**: 首次使用需要下载约14GB的模型文件,请耐心等待。 | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 模型加载 | |
gr.Markdown("### 📥 模型管理") | |
load_btn = gr.Button("🚀 加载模型", variant="primary", size="lg") | |
load_status = gr.Textbox(label="模型状态", interactive=False, value="等待加载模型...") | |
# 必需参数 | |
gr.Markdown("### 📝 必需参数") | |
task = gr.Textbox( | |
label="任务类型", | |
value="s2v-14B", | |
interactive=False | |
) | |
size = gr.Dropdown( | |
label="分辨率", | |
choices=["1024*704", "1024*1024", "704*1024", "512*512"], | |
value="1024*704" | |
) | |
prompt = gr.Textbox( | |
label="提示词 *", | |
lines=3, | |
placeholder="例如: Summer beach vacation style, a white cat wearing sunglasses sits on a surfboard." | |
) | |
image = gr.Image( | |
label="输入图片 *", | |
type="filepath" | |
) | |
audio = gr.Audio( | |
label="输入音频 *", | |
type="filepath" | |
) | |
# 高级参数 | |
with gr.Accordion("🔧 高级参数", open=False): | |
num_frames = gr.Slider( | |
8, 32, 16, | |
step=1, | |
label="帧数 (frame_num/infer_frames)" | |
) | |
guidance_scale = gr.Slider( | |
1.0, 20.0, 7.5, | |
step=0.1, | |
label="引导强度 (sample_guide_scale)" | |
) | |
num_inference_steps = gr.Slider( | |
10, 100, 20, | |
step=1, | |
label="推理步数 (sample_steps)" | |
) | |
seed = gr.Number( | |
label="随机种子 (base_seed)", | |
value=-1 | |
) | |
with gr.Row(): | |
offload_model = gr.Checkbox( | |
label="模型卸载", | |
value=True | |
) | |
convert_model_dtype = gr.Checkbox( | |
label="转换数据类型", | |
value=True | |
) | |
# 生成按钮 | |
generate_btn = gr.Button("🎬 开始生成", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
# 输出结果 | |
gr.Markdown("### 🎥 生成结果") | |
output = gr.File(label="输出视频") | |
status = gr.Textbox(label="生成状态", interactive=False, lines=3) | |
# 使用说明 | |
gr.Markdown(""" | |
### 📋 使用说明 | |
**参数说明**: | |
- **分辨率**: 选择适合你需求的视频尺寸 | |
- **提示词**: 用英文描述想要的视频内容,越详细越好 | |
- **图片**: 上传参考图片,模型会基于此生成视频 | |
- **音频**: 上传音频文件,模型会结合音频内容生成视频 | |
**高级参数**: | |
- **帧数 (frame_num/infer_frames)**: 控制视频长度,8-32帧 | |
- **引导强度 (sample_guide_scale)**: 生成质量控制,1.0-20.0 | |
- **推理步数 (sample_steps)**: 生成精度,10-100步 | |
- **随机种子 (base_seed)**: 结果重现,-1为随机 | |
**优化建议**: | |
- 首次使用建议保持默认参数 | |
- 如果显存不足,可以降低分辨率和帧数 | |
- 提示词使用英文效果更好 | |
- 音频文件建议使用清晰的语音或音乐 | |
**注意事项**: | |
- 生成时间取决于参数设置,通常需要5-10分钟 | |
- 确保上传的图片和音频文件格式正确 | |
- 如果遇到错误,请检查参数设置和文件格式 | |
""") | |
# 事件绑定 | |
load_btn.click(load_model_interface, outputs=load_status) | |
generate_btn.click( | |
generate_interface, | |
inputs=[ | |
task, size, prompt, image, audio, | |
num_frames, guidance_scale, num_inference_steps, | |
seed, offload_model, convert_model_dtype | |
], | |
outputs=[output, status] | |
) | |
# 启动应用 | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |