vta-ldm / inference_from_video.py
fffiloni's picture
Update inference_from_video.py
6800bce verified
raw
history blame contribute delete
No virus
8.47 kB
import os
import copy
import json
import time
import torch
import argparse
from PIL import Image
import numpy as np
import soundfile as sf
from tqdm import tqdm
from diffusers import DDPMScheduler
from models import build_pretrained_models, AudioDiffusion
from transformers import AutoProcessor, ClapModel
import torchaudio
import tools.torch_tools as torch_tools
from datasets import load_dataset
# Check if CUDA is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def parse_args():
parser = argparse.ArgumentParser(description="Inference for text to audio generation task.")
parser.add_argument(
"--original_args", type=str, default=None,
help="Path for summary jsonl file saved during training."
)
parser.add_argument(
"--model", type=str, default=None,
help="Path for saved model bin file."
)
parser.add_argument(
"--vae_model", type=str, default="audioldm-s-full",
help="Path for saved model bin file."
)
parser.add_argument(
"--num_steps", type=int, default=200,
help="How many denoising steps for generation.",
)
parser.add_argument(
"--guidance", type=float, default=3,
help="Guidance scale for classifier free guidance."
)
parser.add_argument(
"--batch_size", type=int, default=1,
help="Batch size for generation.",
)
parser.add_argument(
"--num_samples", type=int, default=1,
help="How many samples per prompt.",
)
parser.add_argument(
"--num_test_instances", type=int, default=-1,
help="How many test instances to evaluate.",
)
parser.add_argument(
"--sample_rate", type=int, default=16000,
help="Sample rate for audio output.",
)
parser.add_argument(
"--max_duration", type=int, default=10,
help="Maximum length duration for generated audio."
)
parser.add_argument(
"--save_dir", type=str, default="./outputs/tmp",
help="output save dir"
)
parser.add_argument(
"--data_path", type=str, default="data/video_processed/video_gt_augment",
help="inference data path"
)
args = parser.parse_args()
return args
def main():
args = parse_args()
train_args = dotdict(json.loads(open(args.original_args).readlines()[0]))
if "hf_model" not in train_args:
train_args["hf_model"] = None
# Load Models #
name = train_args.vae_model
vae, stft = build_pretrained_models(name)
vae, stft = vae.to(device), stft.to(device) # Ensure models are on the correct device
model_class = AudioDiffusion
if train_args.ib:
print("*****USING MODEL IMAGEBIND*****")
from models_imagebind import AudioDiffusion_IB
model_class = AudioDiffusion_IB
elif train_args.lb:
print("*****USING MODEL LANGUAGEBIND*****")
from models_languagebind import AudioDiffusion_LB
model_class = AudioDiffusion_LB
elif train_args.jepa:
print("*****USING MODEL JEPA*****")
from models_vjepa import AudioDiffusion_JEPA
model_class = AudioDiffusion_JEPA
model = model_class(
train_args.fea_encoder_name,
train_args.scheduler_name,
train_args.unet_model_name,
train_args.unet_model_config,
train_args.snr_gamma,
train_args.freeze_text_encoder,
train_args.uncondition,
train_args.img_pretrained_model_path,
train_args.task,
train_args.embedding_dim,
train_args.pe
)
model.eval()
# Load Trained Weight #
try:
if args.model.endswith(".pt") or args.model.endswith(".bin"):
model.load_state_dict(torch.load(args.model, map_location=device), strict=False)
else:
from safetensors.torch import load_model
load_model(model, args.model, strict=False)
except OSError as e:
print(f"Error loading model with safetensors: {e}")
print("Falling back to torch.load")
model.load_state_dict(torch.load(args.model, map_location=device), strict=False)
model.to(device)
scheduler = DDPMScheduler.from_pretrained(train_args.scheduler_name, subfolder="scheduler")
sample_rate = args.sample_rate
# Define max_len_in_seconds globally for consistency
max_len_in_seconds = args.max_duration
def audio_text_matching(waveforms, text, sample_freq=24000, max_len_in_seconds=max_len_in_seconds):
new_freq = 48000
resampled = []
for wav in waveforms:
x = torchaudio.functional.resample(torch.tensor(wav, dtype=torch.float).reshape(1, -1), orig_freq=sample_freq, new_freq=new_freq)[0].numpy()
resampled.append(x[:new_freq*max_len_in_seconds])
inputs = clap_processor(text=text, audios=resampled, return_tensors="pt", padding=True, sampling_rate=48000)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = clap(**inputs)
logits_per_audio = outputs.logits_per_audio
ranks = torch.argsort(logits_per_audio.flatten(), descending=True).cpu().numpy()
return ranks
# Load Data #
if train_args.prefix:
prefix = train_args.prefix
else:
prefix = ""
data_path = args.data_path
wavname = [f"{name.split('.')[0]}.wav" for name in os.listdir(data_path)]
video_features = []
for video_file in os.listdir(data_path):
video_path = os.path.join(data_path, video_file)
video_feature = torch_tools.load_video(video_path, frame_rate=2, size=224)
print(video_feature.shape)
video_features.append(video_feature.to(device)) # Move to device
# Generate #
num_steps, guidance, batch_size, num_samples = args.num_steps, args.guidance, args.batch_size, args.num_samples
all_outputs = []
for k in tqdm(range(0, len(wavname), batch_size)):
with torch.no_grad():
prompt = video_features[k: k+batch_size]
latents = model.inference(scheduler, None, prompt, None, num_steps, guidance, num_samples, disable_progress=True, device=device)
mel = vae.decode_first_stage(latents)
wave = vae.decode_to_waveform(mel)
# Ensure the waveform is exactly 8 seconds long
num_samples_n_seconds = sample_rate * max_len_in_seconds
wave = [wav[:num_samples_n_seconds] for wav in wave]
all_outputs += [item for item in wave]
# Save #
exp_id = str(int(time.time()))
if not os.path.exists("outputs"):
os.makedirs("outputs")
if num_samples == 1:
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}_augment".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate)
os.makedirs(output_dir, exist_ok=True)
for j, wav in enumerate(all_outputs):
sf.write("{}/{}".format(output_dir, wavname[j]), wav, samplerate=sample_rate)
else:
for i in range(num_samples):
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1)
os.makedirs(output_dir, exist_ok=True)
groups = list(chunks(all_outputs, num_samples))
for k in tqdm(range(len(groups))):
wavs_for_text = groups[k]
rank = audio_text_matching(wavs_for_text, text_prompts[k])
ranked_wavs_for_text = [wavs_for_text[r] for r in rank]
for i, wav in enumerate(ranked_wavs_for_text):
output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1)
sf.write("{}/{}".format(output_dir, wavname[k]), wav, samplerate=sample_rate)
if __name__ == "__main__":
main()