Spaces:
Running
Running
File size: 8,493 Bytes
6afce9f 9661bf3 a9449c3 bc9ab22 9661bf3 26a1de5 bc9ab22 894b916 bc9ab22 9661bf3 2708ba5 9661bf3 bc9ab22 9661bf3 26f43cb 9661bf3 b61e6b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
'''
sudo apt-get update && sudo apt-get install git-lfs ffmpeg cbm
conda create --name py310 python=3.10
conda activate py310
pip install ipykernel
python -m ipykernel install --user --name py310 --display-name "py310"
git clone https://huggingface.co/spaces/svjack/LatentSync && cd LatentSync
pip install -r requirements.txt
pip install spaces gradio huggingface_hub
mkdir -p ~/.cache/torch/hub/checkpoints
ln -s $(pwd)/checkpoints/auxiliary/2DFAN4-cd938726ad.zip ~/.cache/torch/hub/checkpoints/2DFAN4-cd938726ad.zip
ln -s $(pwd)/checkpoints/auxiliary/s3fd-619a316812.pth ~/.cache/torch/hub/checkpoints/s3fd-619a316812.pth
ln -s $(pwd)/checkpoints/auxiliary/vgg16-397923af.pth ~/.cache/torch/hub/checkpoints/vgg16-397923af.pth
python app.py
'''
import gradio as gr
import os
import sys
import shutil
import uuid
import subprocess
from glob import glob
from huggingface_hub import snapshot_download
# Download models
os.makedirs("checkpoints", exist_ok=True)
snapshot_download(
repo_id = "chunyu-li/LatentSync",
local_dir = "./checkpoints"
)
import tempfile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
def process_video(input_video_path, temp_dir="temp_dir"):
"""
Crop a given MP4 video to a maximum duration of 10 seconds if it is longer than 10 seconds.
Save the new video in the specified folder (default is temp_dir).
Args:
input_video_path (str): Path to the input video file.
temp_dir (str): Directory where the processed video will be saved.
Returns:
str: Path to the cropped video file.
"""
# Ensure the temp_dir exists
os.makedirs(temp_dir, exist_ok=True)
# Load the video
video = VideoFileClip(input_video_path)
# Determine the output path
input_file_name = os.path.basename(input_video_path)
output_video_path = os.path.join(temp_dir, f"cropped_{input_file_name}")
# Crop the video to 10 seconds if necessary
if video.duration > 10:
video = video.subclip(0, 10)
# Write the cropped video to the output path
video.write_videofile(output_video_path, codec="libx264", audio_codec="aac")
# Return the path to the cropped video
return output_video_path
def process_audio(file_path, temp_dir):
# Load the audio file
audio = AudioSegment.from_file(file_path)
# Check and cut the audio if longer than 4 seconds
max_duration = 8 * 1000 # 4 seconds in milliseconds
if len(audio) > max_duration:
audio = audio[:max_duration]
# Save the processed audio in the temporary directory
output_path = os.path.join(temp_dir, "trimmed_audio.wav")
audio.export(output_path, format="wav")
# Return the path to the trimmed file
print(f"Processed audio saved at: {output_path}")
return output_path
import argparse
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature
def main(video_path, audio_path, progress=gr.Progress(track_tqdm=True)):
inference_ckpt_path = "checkpoints/latentsync_unet.pt"
unet_config_path = "configs/unet/second_stage.yaml"
config = OmegaConf.load(unet_config_path)
print(f"Input video path: {video_path}")
print(f"Input audio path: {audio_path}")
print(f"Loaded checkpoint path: {inference_ckpt_path}")
#is_shared_ui = True if "fffiloni/LatentSync" in os.environ['SPACE_ID'] else False
is_shared_ui = True
temp_dir = None
if is_shared_ui:
temp_dir = tempfile.mkdtemp()
cropped_video_path = process_video(video_path)
print(f"Cropped video saved to: {cropped_video_path}")
video_path=cropped_video_path
trimmed_audio_path = process_audio(audio_path, temp_dir)
print(f"Processed file was stored temporarily at: {trimmed_audio_path}")
audio_path=trimmed_audio_path
scheduler = DDIMScheduler.from_pretrained("configs")
if config.model.cross_attention_dim == 768:
whisper_model_path = "checkpoints/whisper/small.pt"
elif config.model.cross_attention_dim == 384:
whisper_model_path = "checkpoints/whisper/tiny.pt"
else:
raise NotImplementedError("cross_attention_dim must be 768 or 384")
audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16)
vae.config.scaling_factor = 0.18215
vae.config.shift_factor = 0
unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(config.model),
inference_ckpt_path, # load checkpoint
device="cpu",
)
unet = unet.to(dtype=torch.float16)
# set xformers
if is_xformers_available():
unet.enable_xformers_memory_efficient_attention()
pipeline = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
unet=unet,
scheduler=scheduler,
).to("cuda")
seed = -1
if seed != -1:
set_seed(seed)
else:
torch.seed()
print(f"Initial seed: {torch.initial_seed()}")
unique_id = str(uuid.uuid4())
video_out_path = f"video_out{unique_id}.mp4"
pipeline(
video_path=video_path,
audio_path=audio_path,
video_out_path=video_out_path,
video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"),
num_frames=config.data.num_frames,
num_inference_steps=config.run.inference_steps,
guidance_scale=1.0,
weight_dtype=torch.float16,
width=config.data.resolution,
height=config.data.resolution,
)
if is_shared_ui:
# Clean up the temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Temporary directory {temp_dir} deleted.")
return video_out_path
css="""
div#col-container{
margin: 0 auto;
max-width: 982px;
}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("# LatentSync: Audio Conditioned Latent Diffusion Models for Lip Sync")
gr.Markdown("LatentSync, an end-to-end lip sync framework based on audio conditioned latent diffusion models without any intermediate motion representation, diverging from previous diffusion-based lip sync methods based on pixel space diffusion or two-stage generation.")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href="https://github.com/bytedance/LatentSync">
<img src='https://img.shields.io/badge/GitHub-Repo-blue'>
</a>
<a href="https://arxiv.org/abs/2412.09262">
<img src='https://img.shields.io/badge/ArXiv-Paper-red'>
</a>
<a href="https://huggingface.co/spaces/fffiloni/LatentSync?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
<a href="https://huggingface.co/fffiloni">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF">
</a>
</div>
""")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Video Control", format="mp4")
audio_input = gr.Audio(label="Audio Input", type="filepath")
submit_btn = gr.Button("Submit")
with gr.Column():
video_result = gr.Video(label="Result")
gr.Examples(
examples = [
["assets/demo1_video.mp4", "assets/demo1_audio.wav"],
["assets/demo2_video.mp4", "assets/demo2_audio.wav"],
["assets/demo3_video.mp4", "assets/demo3_audio.wav"],
],
inputs = [video_input, audio_input]
)
submit_btn.click(
fn = main,
inputs = [video_input, audio_input],
outputs = [video_result]
)
demo.queue().launch(show_api=True, show_error=True, share = True)
|