|
from email.mime import audio
|
|
import json
|
|
import os
|
|
from pyexpat import model
|
|
from regex import B, D
|
|
import tqdm
|
|
from typing import List, Dict, Any
|
|
import nltk
|
|
from dataclasses import dataclass
|
|
from abc import ABC, abstractmethod
|
|
import math
|
|
import time
|
|
from urllib.request import urlopen
|
|
import librosa
|
|
import torch
|
|
from torch import nn
|
|
import numpy as np
|
|
from encodec import EncodecModel
|
|
import laion_clap
|
|
import resampy
|
|
import soundfile as sf
|
|
from scipy import linalg
|
|
from multiprocessing.dummy import Pool as ThreadPool
|
|
import copy
|
|
import pickle
|
|
from collections import defaultdict
|
|
|
|
|
|
|
|
def read_json(file_path: str) -> Dict[str, Any]:
|
|
with open(file_path, "r") as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_audio_task(fname, sample_rate, channels, dtype="float32"):
|
|
if dtype not in ['float64', 'float32', 'int32', 'int16']:
|
|
raise ValueError(f"dtype not supported: {dtype}")
|
|
|
|
wav_data, sr = sf.read(fname, dtype=dtype)
|
|
|
|
if dtype == 'int16':
|
|
wav_data = wav_data / 32768.0
|
|
elif dtype == 'int32':
|
|
wav_data = wav_data / float(2**31)
|
|
|
|
|
|
assert channels in [1, 2], "channels must be 1 or 2"
|
|
if len(wav_data.shape) > channels:
|
|
wav_data = np.mean(wav_data, axis=1)
|
|
|
|
if sr != sample_rate:
|
|
wav_data = resampy.resample(wav_data, sr, sample_rate)
|
|
|
|
return wav_data
|
|
|
|
|
|
class FrechetAudioDistance:
|
|
def __init__(
|
|
self,
|
|
ckpt_dir=None,
|
|
model_name="clap",
|
|
submodel_name="630k-audioset",
|
|
sample_rate=16000,
|
|
channels=1,
|
|
use_pca=False,
|
|
use_activation=False,
|
|
verbose=False,
|
|
audio_load_worker=8,
|
|
enable_fusion=False,
|
|
):
|
|
"""
|
|
Initialize FAD
|
|
|
|
-- ckpt_dir: folder where the downloaded checkpoints are stored
|
|
-- model_name: one between vggish, pann, clap or encodec
|
|
-- submodel_name: only for clap models - determines which checkpoint to use.
|
|
options: ["630k-audioset", "630k", "music_audioset", "music_speech", "music_speech_audioset"]
|
|
-- sample_rate: one between [8000, 16000, 32000, 48000]. depending on the model set the sample rate to use
|
|
-- channels: number of channels in an audio track
|
|
-- use_pca: whether to apply PCA to the vggish embeddings
|
|
-- use_activation: whether to use the output activation in vggish
|
|
-- enable_fusion: whether to use fusion for clap models (valid depending on the specific submodel used)
|
|
"""
|
|
assert model_name in ["vggish", "clap", "encodec"], "model_name must be either 'vggish', 'pann', 'clap' or 'encodec'"
|
|
if model_name == "vggish":
|
|
assert sample_rate == 16000, "sample_rate must be 16000"
|
|
elif model_name == "clap":
|
|
assert sample_rate == 48000, "sample_rate must be 48000"
|
|
assert submodel_name in ["630k-audioset", "630k", "music_audioset", "music_speech", "music_speech_audioset"]
|
|
elif model_name == "encodec":
|
|
assert sample_rate in [24000, 48000], "sample_rate must be 24000 or 48000"
|
|
if sample_rate == 48000:
|
|
assert channels == 2, "channels must be 2 for 48khz encodec model"
|
|
self.model_name = model_name
|
|
self.submodel_name = submodel_name
|
|
self.sample_rate = sample_rate
|
|
self.channels = channels
|
|
self.verbose = verbose
|
|
self.device = torch.device(
|
|
'cuda') if torch.cuda.is_available() else torch.device('mps') if torch.backends.mps.is_available() else torch.device('cpu')
|
|
if self.device == torch.device('mps') and self.model_name == "clap":
|
|
if self.verbose:
|
|
print("[Frechet Audio Distance] CLAP does not support MPS device yet, because:")
|
|
print("[Frechet Audio Distance] The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device.")
|
|
print("[Frechet Audio Distance] Using CPU device instead.")
|
|
self.device = torch.device('cpu')
|
|
if self.verbose:
|
|
print("[Frechet Audio Distance] Using device: {}".format(self.device))
|
|
self.audio_load_worker = audio_load_worker
|
|
self.enable_fusion = enable_fusion
|
|
if ckpt_dir is not None:
|
|
os.makedirs(ckpt_dir, exist_ok=True)
|
|
torch.hub.set_dir(ckpt_dir)
|
|
self.ckpt_dir = ckpt_dir
|
|
else:
|
|
|
|
self.ckpt_dir = torch.hub.get_dir()
|
|
self.__get_model(model_name=model_name, use_pca=use_pca, use_activation=use_activation)
|
|
|
|
def __get_model(self, model_name="vggish", use_pca=False, use_activation=False):
|
|
"""
|
|
Get ckpt and set model for the specified model_name
|
|
|
|
Params:
|
|
-- model_name: one between vggish, pann or clap
|
|
-- use_pca: whether to apply PCA to the vggish embeddings
|
|
-- use_activation: whether to use the output activation in vggish
|
|
"""
|
|
|
|
if model_name == "vggish":
|
|
|
|
self.model = torch.hub.load(repo_or_dir='harritaylor/torchvggish', model='vggish')
|
|
if not use_pca:
|
|
self.model.postprocess = False
|
|
if not use_activation:
|
|
self.model.embeddings = nn.Sequential(*list(self.model.embeddings.children())[:-1])
|
|
self.model.device = self.device
|
|
|
|
elif model_name == "clap":
|
|
|
|
if self.submodel_name == "630k-audioset":
|
|
if self.enable_fusion:
|
|
download_name = "630k-audioset-fusion-best.pt"
|
|
else:
|
|
download_name = "630k-audioset-best.pt"
|
|
elif self.submodel_name == "630k":
|
|
if self.enable_fusion:
|
|
download_name = "630k-fusion-best.pt"
|
|
else:
|
|
download_name = "630k-best.pt"
|
|
elif self.submodel_name == "music_audioset":
|
|
download_name = "music_audioset_epoch_15_esc_90.14.pt"
|
|
elif self.submodel_name == "music_speech":
|
|
download_name = "music_speech_epoch_15_esc_89.25.pt"
|
|
elif self.submodel_name == "music_speech_audioset":
|
|
download_name = "music_speech_audioset_epoch_15_esc_89.98.pt"
|
|
|
|
model_path = os.path.join(self.ckpt_dir, download_name)
|
|
|
|
|
|
if not (os.path.exists(model_path)):
|
|
if self.verbose:
|
|
print("[Frechet Audio Distance] Downloading {}...".format(model_path))
|
|
torch.hub.download_url_to_file(
|
|
url=f"https://huggingface.co/lukewys/laion_clap/resolve/main/{download_name}",
|
|
dst=model_path
|
|
)
|
|
|
|
if self.submodel_name in ["630k-audioset", "630k"]:
|
|
self.model = laion_clap.CLAP_Module(enable_fusion=self.enable_fusion,
|
|
device=self.device)
|
|
elif self.submodel_name in ["music_audioset", "music_speech", "music_speech_audioset"]:
|
|
self.model = laion_clap.CLAP_Module(enable_fusion=self.enable_fusion,
|
|
amodel='HTSAT-base',
|
|
device=self.device)
|
|
self.model.load_ckpt(model_path)
|
|
|
|
|
|
if self.submodel_name in ["630k-audioset", "630k"]:
|
|
self.model = laion_clap.CLAP_Module(enable_fusion=self.enable_fusion,
|
|
device=self.device)
|
|
elif self.submodel_name in ["music_audioset", "music_speech", "music_speech_audioset"]:
|
|
self.model = laion_clap.CLAP_Module(enable_fusion=self.enable_fusion,
|
|
amodel='HTSAT-base',
|
|
device=self.device)
|
|
self.model.load_ckpt(model_path)
|
|
|
|
|
|
elif model_name == "encodec":
|
|
|
|
|
|
if self.sample_rate == 24000:
|
|
self.model = EncodecModel.encodec_model_24khz()
|
|
elif self.sample_rate == 48000:
|
|
self.model = EncodecModel.encodec_model_48khz()
|
|
|
|
|
|
self.model.set_target_bandwidth(24.0)
|
|
|
|
self.model.to(self.device)
|
|
self.model.eval()
|
|
|
|
def get_embeddings(self, x, sr):
|
|
"""
|
|
Get embeddings using VGGish, PANN, CLAP or EnCodec models.
|
|
Params:
|
|
-- x : a list of np.ndarray audio samples
|
|
-- sr : sampling rate.
|
|
"""
|
|
embd_lst = []
|
|
try:
|
|
for audio in tqdm(x, disable=(not self.verbose)):
|
|
if self.model_name == "vggish":
|
|
embd = self.model.forward(audio, sr)
|
|
elif self.model_name == "clap":
|
|
audio = torch.tensor(audio).float().unsqueeze(0)
|
|
embd = self.model.get_audio_embedding_from_data(audio, use_tensor=True)
|
|
elif self.model_name == "encodec":
|
|
|
|
audio = torch.tensor(
|
|
audio).float().unsqueeze(0).unsqueeze(0).to(self.device)
|
|
|
|
if self.model.sample_rate == 48000:
|
|
if audio.shape[-1] != 2:
|
|
if self.verbose:
|
|
print(
|
|
"[Frechet Audio Distance] Audio is mono, converting to stereo for 48khz model..."
|
|
)
|
|
audio = torch.cat((audio, audio), dim=1)
|
|
else:
|
|
|
|
audio = audio[:, 0].transpose(1, 2)
|
|
|
|
if self.verbose:
|
|
print(
|
|
"[Frechet Audio Distance] Audio shape: {}".format(
|
|
audio.shape
|
|
)
|
|
)
|
|
|
|
with torch.no_grad():
|
|
|
|
embd = self.model.encoder(audio)
|
|
embd = embd.squeeze(0)
|
|
|
|
if self.verbose:
|
|
print(
|
|
"[Frechet Audio Distance] Embedding shape: {}".format(
|
|
embd.shape
|
|
)
|
|
)
|
|
|
|
if embd.device != torch.device("cpu"):
|
|
embd = embd.cpu()
|
|
|
|
if torch.is_tensor(embd):
|
|
embd = embd.detach().numpy()
|
|
|
|
embd_lst.append(embd)
|
|
except Exception as e:
|
|
print("[Frechet Audio Distance] get_embeddings throw an exception: {}".format(str(e)))
|
|
|
|
return np.concatenate(embd_lst, axis=0)
|
|
|
|
def calculate_embd_statistics(self, embd_lst):
|
|
if isinstance(embd_lst, list):
|
|
embd_lst = np.array(embd_lst)
|
|
mu = np.mean(embd_lst, axis=0)
|
|
sigma = np.cov(embd_lst, rowvar=False)
|
|
return mu, sigma
|
|
|
|
def calculate_frechet_distance(self, mu1, sigma1, mu2, sigma2, eps=1e-6):
|
|
"""
|
|
Adapted from: https://github.com/mseitzer/pytorch-fid/blob/master/src/pytorch_fid/fid_score.py
|
|
|
|
Numpy implementation of the Frechet Distance.
|
|
The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1)
|
|
and X_2 ~ N(mu_2, C_2) is
|
|
d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)).
|
|
Stable version by Dougal J. Sutherland.
|
|
Params:
|
|
-- mu1 : Numpy array containing the activations of a layer of the
|
|
inception net (like returned by the function 'get_predictions')
|
|
for generated samples.
|
|
-- mu2 : The sample mean over activations, precalculated on an
|
|
representative data set.
|
|
-- sigma1: The covariance matrix over activations for generated samples.
|
|
-- sigma2: The covariance matrix over activations, precalculated on an
|
|
representative data set.
|
|
Returns:
|
|
-- : The Frechet Distance.
|
|
"""
|
|
|
|
mu1 = np.atleast_1d(mu1)
|
|
mu2 = np.atleast_1d(mu2)
|
|
|
|
sigma1 = np.atleast_2d(sigma1)
|
|
sigma2 = np.atleast_2d(sigma2)
|
|
|
|
assert mu1.shape == mu2.shape, \
|
|
'Training and test mean vectors have different lengths'
|
|
assert sigma1.shape == sigma2.shape, \
|
|
'Training and test covariances have different dimensions'
|
|
|
|
diff = mu1 - mu2
|
|
|
|
|
|
covmean, _ = linalg.sqrtm(sigma1.dot(sigma2).astype(complex), disp=False)
|
|
if not np.isfinite(covmean).all():
|
|
msg = ('fid calculation produces singular product; '
|
|
'adding %s to diagonal of cov estimates') % eps
|
|
print(msg)
|
|
offset = np.eye(sigma1.shape[0]) * eps
|
|
covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset).astype(complex))
|
|
|
|
|
|
if np.iscomplexobj(covmean):
|
|
if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3):
|
|
m = np.max(np.abs(covmean.imag))
|
|
raise ValueError('Imaginary component {}'.format(m))
|
|
covmean = covmean.real
|
|
|
|
tr_covmean = np.trace(covmean)
|
|
|
|
return (diff.dot(diff) + np.trace(sigma1)
|
|
+ np.trace(sigma2) - 2 * tr_covmean)
|
|
|
|
def __load_audio_files(self, dir, dtype="float32"):
|
|
task_results = []
|
|
|
|
pool = ThreadPool(self.audio_load_worker)
|
|
pbar = tqdm(total=len(os.listdir(dir)), disable=(not self.verbose))
|
|
|
|
def update(*a):
|
|
pbar.update()
|
|
|
|
if self.verbose:
|
|
print("[Frechet Audio Distance] Loading audio from {}...".format(dir))
|
|
for fname in os.listdir(dir):
|
|
res = pool.apply_async(
|
|
load_audio_task,
|
|
args=(os.path.join(dir, fname), self.sample_rate, self.channels, dtype),
|
|
callback=update,
|
|
)
|
|
task_results.append(res)
|
|
pool.close()
|
|
pool.join()
|
|
|
|
return [k.get() for k in task_results]
|
|
|
|
def score(self,
|
|
background_dir,
|
|
eval_dir,
|
|
background_embds_path=None,
|
|
eval_embds_path=None,
|
|
dtype="float32"
|
|
):
|
|
"""
|
|
Computes the Frechet Audio Distance (FAD) between two directories of audio files.
|
|
|
|
Parameters:
|
|
- background_dir (str): Path to the directory containing background audio files.
|
|
- eval_dir (str): Path to the directory containing evaluation audio files.
|
|
- background_embds_path (str, optional): Path to save/load background audio embeddings (e.g., /folder/bkg_embs.npy). If None, embeddings won't be saved.
|
|
- eval_embds_path (str, optional): Path to save/load evaluation audio embeddings (e.g., /folder/test_embs.npy). If None, embeddings won't be saved.
|
|
- dtype (str, optional): Data type for loading audio. Default is "float32".
|
|
|
|
Returns:
|
|
- float: The Frechet Audio Distance (FAD) score between the two directories of audio files.
|
|
"""
|
|
try:
|
|
|
|
if background_embds_path is not None and os.path.exists(background_embds_path):
|
|
if self.verbose:
|
|
print(f"[Frechet Audio Distance] Loading embeddings from {background_embds_path}...")
|
|
embds_background = np.load(background_embds_path)
|
|
else:
|
|
audio_background = self.__load_audio_files(background_dir, dtype=dtype)
|
|
embds_background = self.get_embeddings(audio_background, sr=self.sample_rate)
|
|
if background_embds_path:
|
|
os.makedirs(os.path.dirname(background_embds_path), exist_ok=True)
|
|
np.save(background_embds_path, embds_background)
|
|
|
|
|
|
if eval_embds_path is not None and os.path.exists(eval_embds_path):
|
|
if self.verbose:
|
|
print(f"[Frechet Audio Distance] Loading embeddings from {eval_embds_path}...")
|
|
embds_eval = np.load(eval_embds_path)
|
|
else:
|
|
audio_eval = self.__load_audio_files(eval_dir, dtype=dtype)
|
|
embds_eval = self.get_embeddings(audio_eval, sr=self.sample_rate)
|
|
if eval_embds_path:
|
|
os.makedirs(os.path.dirname(eval_embds_path), exist_ok=True)
|
|
np.save(eval_embds_path, embds_eval)
|
|
|
|
|
|
if len(embds_background) == 0:
|
|
print("[Frechet Audio Distance] background set dir is empty, exiting...")
|
|
return -1
|
|
if len(embds_eval) == 0:
|
|
print("[Frechet Audio Distance] eval set dir is empty, exiting...")
|
|
return -1
|
|
|
|
|
|
mu_background, sigma_background = self.calculate_embd_statistics(embds_background)
|
|
mu_eval, sigma_eval = self.calculate_embd_statistics(embds_eval)
|
|
|
|
fad_score = self.calculate_frechet_distance(
|
|
mu_background,
|
|
sigma_background,
|
|
mu_eval,
|
|
sigma_eval
|
|
)
|
|
|
|
return fad_score
|
|
except Exception as e:
|
|
print(f"[Frechet Audio Distance] An error occurred: {e}")
|
|
return -1
|
|
|
|
|
|
def calculate_fad_score(background_dir, eval_dir, background_embds_path=None, eval_embds_path=None, dtype="float32", ckpt_dir=None, model_name="clap", submodel_name="630k-audioset", sample_rate=16000, channels=1, use_pca=False, use_activation=False, verbose=False, audio_load_worker=8, enable_fusion=False):
|
|
"""
|
|
Calculate the Frechet Audio Distance (FAD) score between two directories of audio files.
|
|
|
|
Parameters:
|
|
- background_dir: Directory containing background audio files.
|
|
- eval_dir: Directory containing evaluation audio files.
|
|
- background_embds_path: Path to save/load background audio embeddings.
|
|
- eval_embds_path: Path to save/load evaluation audio embeddings.
|
|
- dtype: Data type for loading audio files (default is "float32").
|
|
- ckpt_dir: Directory where the model checkpoints are stored.
|
|
- model_name: Name of the model to use (default is "clap").
|
|
- submodel_name: Submodel name for CLAP (default is "630k-audioset").
|
|
- sample_rate: Sample rate for audio files (default is 16000).
|
|
- channels: Number of channels in the audio files (default is 1).
|
|
- use_pca: Whether to apply PCA to VGGish embeddings (default is False).
|
|
- use_activation: Whether to use output activation in VGGish (default is False).
|
|
- verbose: Whether to print verbose output (default is False).
|
|
- audio_load_worker: Number of workers for loading audio files (default is 8).
|
|
- enable_fusion: Whether to enable fusion for CLAP models (default is False).
|
|
|
|
Returns:
|
|
- FAD score as a float.
|
|
"""
|
|
|
|
fad = FrechetAudioDistance(
|
|
ckpt_dir=ckpt_dir,
|
|
model_name=model_name,
|
|
submodel_name=submodel_name,
|
|
sample_rate=sample_rate,
|
|
channels=channels,
|
|
use_pca=use_pca,
|
|
use_activation=use_activation,
|
|
verbose=verbose,
|
|
audio_load_worker=audio_load_worker,
|
|
enable_fusion=enable_fusion
|
|
)
|
|
|
|
return {
|
|
"FAD_score": fad.score(background_dir, eval_dir, background_embds_path, eval_embds_path, dtype)
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def int16_to_float32(x):
|
|
return (x / 32767.0).astype('float32')
|
|
|
|
|
|
def float32_to_int16(x):
|
|
x = np.clip(x, a_min=-1., a_max=1.)
|
|
return (x * 32767.).astype('int16')
|
|
|
|
|
|
def calculate_cosine_similarity(embeddings1, embeddings2):
|
|
dot_product = np.dot(embeddings1, embeddings2)
|
|
norm1 = np.linalg.norm(embeddings1)
|
|
norm2 = np.linalg.norm(embeddings2)
|
|
return dot_product / (norm1 * norm2) if norm1 and norm2 else 0.0
|
|
|
|
|
|
def calculate_clap_score(clap_checkpoint=None, model_id=-1, verbose=True, audio_file_list=None, text_file_list=None):
|
|
"""Load the pretrained checkpoint of CLAP model
|
|
|
|
Parameters
|
|
----------
|
|
ckpt: str
|
|
if ckpt is specified, the model will load this ckpt, otherwise the model will download the ckpt from zenodo. \n
|
|
For fusion model, it will download the 630k+audioset fusion model (id=3). For non-fusion model, it will download the 630k+audioset model (id=1).
|
|
model_id:
|
|
if model_id is specified, you can download our best ckpt, as:
|
|
id = 0 --> 630k non-fusion ckpt \n
|
|
id = 1 --> 630k+audioset non-fusion ckpt \n
|
|
id = 2 --> 630k fusion ckpt \n
|
|
id = 3 --> 630k+audioset fusion ckpt \n
|
|
Note that if your model is specied as non-fusion model but you download a fusion model ckpt, you will face an error.
|
|
"""
|
|
model = laion_clap.CLAP_Module(enable_fusion=False)
|
|
model.load_ckpt(ckpt = clap_checkpoint, model_id = model_id, verbose=verbose)
|
|
audio_embeddings = []
|
|
for file in audio_file_list:
|
|
audio, sr = librosa.load(file, sr=16000)
|
|
audio = int16_to_float32(audio)
|
|
embeddings = laion_clap.get_audio_embedding(audio)
|
|
audio_embeddings.append(embeddings)
|
|
|
|
text_embeddings = []
|
|
for file in text_file_list:
|
|
if os.path.exists(file):
|
|
with open(file, 'r') as f:
|
|
text = f.read()
|
|
else:
|
|
text = file
|
|
embeddings = laion_clap.get_text_embedding(text)
|
|
text_embeddings.append(embeddings)
|
|
|
|
|
|
scores = []
|
|
for audio_emb, text_emb in zip(audio_embeddings, text_embeddings):
|
|
score = calculate_cosine_similarity(audio_emb, text_emb)
|
|
scores.append(score)
|
|
|
|
|
|
if len(scores) > 0:
|
|
average_score = sum(scores) / len(scores)
|
|
else:
|
|
average_score = 0.0
|
|
|
|
return {"CLAP_score": average_score, "scores": scores}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import whisper
|
|
|
|
|
|
def speech_to_text(model_name="turbo", audio_file="audio.mp3"):
|
|
"""
|
|
Convert speech to text using a speech recognition model.
|
|
"""
|
|
model = whisper.load_model(model_name)
|
|
|
|
|
|
audio = whisper.load_audio(audio_file)
|
|
audio = whisper.pad_or_trim(audio)
|
|
|
|
|
|
mel = whisper.log_mel_spectrogram(audio, n_mels=model.dims.n_mels).to(model.device)
|
|
|
|
|
|
_, probs = model.detect_language(mel)
|
|
print(f"Detected language: {max(probs, key=probs.get)}")
|
|
|
|
|
|
options = whisper.DecodingOptions()
|
|
result = whisper.decode(model, mel, options)
|
|
|
|
|
|
print(result.text)
|
|
return result.text
|
|
|
|
|
|
def precook(s, n=4, out=False):
|
|
"""
|
|
Takes a string as input and returns an object that can be given to
|
|
either cook_refs or cook_test. This is optional: cook_refs and cook_test
|
|
can take string arguments as well.
|
|
:param s: string : sentence to be converted into ngrams
|
|
:param n: int : number of ngrams for which representation is calculated
|
|
:return: term frequency vector for occuring ngrams
|
|
"""
|
|
words = s.split()
|
|
counts = defaultdict(int)
|
|
for k in range(1,n+1):
|
|
for i in range(len(words)-k+1):
|
|
ngram = tuple(words[i:i+k])
|
|
counts[ngram] += 1
|
|
return counts
|
|
|
|
def cook_refs(refs, n=4):
|
|
'''Takes a list of reference sentences for a single segment
|
|
and returns an object that encapsulates everything that BLEU
|
|
needs to know about them.
|
|
:param refs: list of string : reference sentences for some image
|
|
:param n: int : number of ngrams for which (ngram) representation is calculated
|
|
:return: result (list of dict)
|
|
'''
|
|
return [precook(ref, n) for ref in refs]
|
|
|
|
def cook_test(test, n=4):
|
|
'''Takes a test sentence and returns an object that
|
|
encapsulates everything that BLEU needs to know about it.
|
|
:param test: list of string : hypothesis sentence for some image
|
|
:param n: int : number of ngrams for which (ngram) representation is calculated
|
|
:return: result (dict)
|
|
'''
|
|
return precook(test, n, True)
|
|
|
|
|
|
|
|
class CiderScorer(object):
|
|
"""CIDEr scorer.
|
|
"""
|
|
|
|
def copy(self):
|
|
''' copy the refs.'''
|
|
new = CiderScorer(n=self.n)
|
|
new.ctest = copy.copy(self.ctest)
|
|
new.crefs = copy.copy(self.crefs)
|
|
return new
|
|
|
|
def __init__(self, test=None, refs=None, n=4, sigma=6.0):
|
|
''' singular instance '''
|
|
self.n = n
|
|
self.sigma = sigma
|
|
self.crefs = []
|
|
self.ctest = []
|
|
self.document_frequency = defaultdict(float)
|
|
self.cook_append(test, refs)
|
|
self.ref_len = None
|
|
|
|
def cook_append(self, test, refs):
|
|
'''called by constructor and __iadd__ to avoid creating new instances.'''
|
|
|
|
if refs is not None:
|
|
self.crefs.append(cook_refs(refs))
|
|
if test is not None:
|
|
self.ctest.append(cook_test(test))
|
|
else:
|
|
self.ctest.append(None)
|
|
|
|
def size(self):
|
|
assert len(self.crefs) == len(self.ctest), "refs/test mismatch! %d<>%d" % (len(self.crefs), len(self.ctest))
|
|
return len(self.crefs)
|
|
|
|
def __iadd__(self, other):
|
|
'''add an instance (e.g., from another sentence).'''
|
|
|
|
if type(other) is tuple:
|
|
|
|
self.cook_append(other[0], other[1])
|
|
else:
|
|
self.ctest.extend(other.ctest)
|
|
self.crefs.extend(other.crefs)
|
|
|
|
return self
|
|
|
|
def compute_doc_freq(self):
|
|
'''
|
|
Compute term frequency for reference data.
|
|
This will be used to compute idf (inverse document frequency later)
|
|
The term frequency is stored in the object
|
|
:return: None
|
|
'''
|
|
for refs in self.crefs:
|
|
|
|
for ngram in set([ngram for ref in refs for (ngram,count) in ref.iteritems()]):
|
|
self.document_frequency[ngram] += 1
|
|
|
|
|
|
def compute_cider(self, df_mode="corpus"):
|
|
def counts2vec(cnts):
|
|
"""
|
|
Function maps counts of ngram to vector of tfidf weights.
|
|
The function returns vec, an array of dictionary that store mapping of n-gram and tf-idf weights.
|
|
The n-th entry of array denotes length of n-grams.
|
|
:param cnts:
|
|
:return: vec (array of dict), norm (array of float), length (int)
|
|
"""
|
|
vec = [defaultdict(float) for _ in range(self.n)]
|
|
length = 0
|
|
norm = [0.0 for _ in range(self.n)]
|
|
for (ngram,term_freq) in cnts.iteritems():
|
|
|
|
df = np.log(max(1.0, self.document_frequency[ngram]))
|
|
|
|
n = len(ngram)-1
|
|
|
|
vec[n][ngram] = float(term_freq)*(self.ref_len - df)
|
|
|
|
|
|
norm[n] += pow(vec[n][ngram], 2)
|
|
|
|
if n == 1:
|
|
length += term_freq
|
|
norm = [np.sqrt(n) for n in norm]
|
|
return vec, norm, length
|
|
|
|
def sim(vec_hyp, vec_ref, norm_hyp, norm_ref, length_hyp, length_ref):
|
|
'''
|
|
Compute the cosine similarity of two vectors.
|
|
:param vec_hyp: array of dictionary for vector corresponding to hypothesis
|
|
:param vec_ref: array of dictionary for vector corresponding to reference
|
|
:param norm_hyp: array of float for vector corresponding to hypothesis
|
|
:param norm_ref: array of float for vector corresponding to reference
|
|
:param length_hyp: int containing length of hypothesis
|
|
:param length_ref: int containing length of reference
|
|
:return: array of score for each n-grams cosine similarity
|
|
'''
|
|
delta = float(length_hyp - length_ref)
|
|
|
|
val = np.array([0.0 for _ in range(self.n)])
|
|
for n in range(self.n):
|
|
|
|
for (ngram,count) in vec_hyp[n].iteritems():
|
|
val[n] += vec_hyp[n][ngram] * vec_ref[n][ngram]
|
|
|
|
if (norm_hyp[n] != 0) and (norm_ref[n] != 0):
|
|
val[n] /= (norm_hyp[n]*norm_ref[n])
|
|
|
|
assert(not math.isnan(val[n]))
|
|
return val
|
|
|
|
|
|
if df_mode == "corpus":
|
|
self.ref_len = np.log(float(len(self.crefs)))
|
|
elif df_mode == "coco-val-df":
|
|
|
|
self.ref_len = np.log(float(40504))
|
|
|
|
scores = []
|
|
for test, refs in zip(self.ctest, self.crefs):
|
|
|
|
vec, norm, length = counts2vec(test)
|
|
|
|
score = np.array([0.0 for _ in range(self.n)])
|
|
for ref in refs:
|
|
vec_ref, norm_ref, length_ref = counts2vec(ref)
|
|
score += sim(vec, vec_ref, norm, norm_ref, length, length_ref)
|
|
|
|
score_avg = np.mean(score)
|
|
|
|
score_avg /= len(refs)
|
|
|
|
score_avg *= 10.0
|
|
|
|
scores.append(score_avg)
|
|
return scores
|
|
|
|
def compute_score(self, df_mode, option=None, verbose=0):
|
|
|
|
if df_mode == "corpus":
|
|
self.compute_doc_freq()
|
|
|
|
assert(len(self.ctest) >= max(self.document_frequency.values()))
|
|
|
|
else:
|
|
self.document_frequency = pickle.load(open(os.path.join('data', df_mode + '.p'),'r'))
|
|
|
|
score = self.compute_cider(df_mode)
|
|
|
|
|
|
return np.mean(np.array(score)), np.array(score)
|
|
|
|
|
|
|
|
class Cider:
|
|
"""
|
|
Main Class to compute the CIDEr metric
|
|
|
|
"""
|
|
def __init__(self, n=4, df="corpus"):
|
|
"""
|
|
Initialize the CIDEr scoring function
|
|
: param n (int): n-gram size
|
|
: param df (string): specifies where to get the IDF values from
|
|
takes values 'corpus', 'coco-train'
|
|
: return: None
|
|
"""
|
|
|
|
self._n = n
|
|
self._df = df
|
|
|
|
def compute_score(self, gts, res):
|
|
"""
|
|
Main function to compute CIDEr score
|
|
: param gts (dict) : {image:tokenized reference sentence}
|
|
: param res (dict) : {image:tokenized candidate sentence}
|
|
: return: cider (float) : computed CIDEr score for the corpus
|
|
"""
|
|
|
|
cider_scorer = CiderScorer(n=self._n)
|
|
|
|
for res_id in res:
|
|
|
|
hypo = res_id['caption']
|
|
ref = gts[res_id['image_id']]
|
|
|
|
|
|
assert(type(hypo) is list)
|
|
assert(len(hypo) == 1)
|
|
assert(type(ref) is list)
|
|
assert(len(ref) > 0)
|
|
cider_scorer += (hypo[0], ref)
|
|
|
|
(score, scores) = cider_scorer.compute_score(self._df)
|
|
|
|
return score, scores
|
|
|
|
def method(self):
|
|
return "CIDEr"
|
|
|
|
|
|
def calculate_CIDEr_score(audio_file_list=None, text_file_list=None):
|
|
|
|
if audio_file_list is None or text_file_list is None:
|
|
raise ValueError("Both audio_file_list and text_file_list must be provided.")
|
|
if len(audio_file_list) != len(text_file_list):
|
|
raise ValueError("audio_file_list and text_file_list must have the same length.")
|
|
|
|
cider_scorer = Cider(n=4, df="corpus")
|
|
|
|
gts = {}
|
|
res = []
|
|
from spacy.tokenizer import Tokenizer
|
|
from spacy.lang.en import English
|
|
nlp = English()
|
|
|
|
tokenizer = Tokenizer(nlp.vocab)
|
|
|
|
for audio_file, text_file in zip(audio_file_list, text_file_list):
|
|
|
|
text = speech_to_text(audio_file=audio_file)
|
|
|
|
gts[audio_file] = [tokenizer(text).words]
|
|
|
|
with open(text_file, 'r') as f:
|
|
reference_text = f.read().strip()
|
|
|
|
text = tokenizer(reference_text).words
|
|
res.append({
|
|
'image_id': audio_file,
|
|
'caption': [text]
|
|
})
|
|
|
|
score, scores = cider_scorer.compute_score(gts, res)
|
|
return {
|
|
"CIDEr_score": score,
|
|
"scores": scores
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import werpy
|
|
def calculate_wer(audio_file_list: list, text_file_list: list) -> float:
|
|
"""Calculate the Word Error Rate (WER) between a reference and a hypothesis.
|
|
Args:
|
|
audio_file_list (list): List of audio files to be transcribed.
|
|
text_file_list (list): List of text files containing the reference transcriptions.
|
|
"""
|
|
if len(audio_file_list) != len(text_file_list):
|
|
raise ValueError("audio_file_list and text_file_list must have the same length.")
|
|
|
|
total_wer = 0.0
|
|
for audio_file, text_file in zip(audio_file_list, text_file_list):
|
|
|
|
transcribed_text = speech_to_text(audio_file=audio_file)
|
|
|
|
|
|
with open(text_file, 'r') as f:
|
|
reference_text = f.read().strip()
|
|
|
|
|
|
wer_score = werpy.wer(reference_text, transcribed_text)
|
|
total_wer += wer_score
|
|
|
|
average_wer = total_wer / len(audio_file_list)
|
|
return {"WER_score": average_wer}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from pymcd.mcd import Calculate_MCD
|
|
|
|
def calculate_mcd(reference_audio_list: str, generated_audio_list: str) -> float:
|
|
"""Calculate the Mel Cepstral Distortion (MCD) between two audio files.
|
|
|
|
Args:
|
|
reference_audio (str): Path to the reference audio file.
|
|
generated_audio (str): Path to the generated audio file.
|
|
|
|
Returns:
|
|
float: The MCD score.
|
|
"""
|
|
|
|
|
|
mcd_toolbox = Calculate_MCD(MCD_mode="plain")
|
|
|
|
|
|
mcd_scores = []
|
|
for ref_audio, gen_audio in zip(reference_audio_list, generated_audio_list):
|
|
|
|
mcd_score = mcd_toolbox.calculate_mcd(ref_audio, gen_audio)
|
|
mcd_scores.append(mcd_score)
|
|
|
|
mcd_score = sum(mcd_scores) / len(mcd_scores)
|
|
if mcd_score is None:
|
|
raise ValueError("MCD score could not be calculated. Please check the audio files.")
|
|
|
|
return {"MCD_score": mcd_score, "mcd_scores": mcd_scores}
|
|
|
|
|
|
|
|
class AudioGenerationModel:
|
|
def __init__(self, model_name: str):
|
|
self.model_name = model_name
|
|
|
|
def __init__(self, model_name: str):
|
|
self.model_name = model_name
|
|
self.load_model()
|
|
|
|
def load_model(self):
|
|
|
|
pass
|
|
|
|
def generate(self, input_text: str) -> np.ndarray:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
@dataclass
|
|
class Instance:
|
|
input: Dict[str, Any]
|
|
output: Dict[str, Any]
|
|
id: str
|
|
|
|
|
|
class BaseTask(ABC):
|
|
def __init__(self, task_data: Dict[str, Any], model: AudioGenerationModel, audio_dir: str = None, output_dir: str = None, task_name: str = None):
|
|
self.task_data = read_json(task_data)
|
|
self.model = model
|
|
self.audio_dir = audio_dir
|
|
self.data = self._parse_data(self.task_data)
|
|
self.task_name = os.path.dirname(task_data).split("/")[-1] if task_name is None else task_name
|
|
self.output_dir = output_dir
|
|
os.makedirs(self.output_dir, exist_ok=True) if self.output_dir else None
|
|
|
|
self.references = []
|
|
self.predictions = []
|
|
|
|
def save_predictions(self, audio_paths):
|
|
results = []
|
|
for gt, response, audio_path in zip(self.references, self.predictions, audio_paths):
|
|
results.append({
|
|
'gt': gt,
|
|
'response': response,
|
|
'audio_path': audio_path,
|
|
})
|
|
time_prefix = time.strftime('%y%m%d%H%M%S', time.localtime())
|
|
results_file = os.path.join(self.output_dir, f'{self.task_name }_{time_prefix}.json') if self.output_dir else f'{self.task_name }_{time_prefix}.json'
|
|
json.dump(results, open(results_file, 'w'))
|
|
|
|
@abstractmethod
|
|
def _get_choice_candidate(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _parse_data(self, task_data: Dict[str, Any]) -> List[Instance]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def evaluate(self) -> Dict[str, float]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def run_inference(self):
|
|
pass
|
|
|
|
|
|
class SingleCaptionToAudio(BaseTask):
|
|
def __init__(self, task_data: Dict[str, Any], model: AudioGenerationModel, audio_dir: str = None, output_dir: str = None, task_name: str = None):
|
|
super().__init__(task_data, model, audio_dir, output_dir, task_name)
|
|
self._get_choice_candidate()
|
|
|
|
def _get_choice_candidate(self):
|
|
|
|
pass
|
|
|
|
def _parse_data(self, task_data: Dict[str, Any]) -> List[Instance]:
|
|
return [Instance(input=d["input"], output=d["output"], id=d["id"])
|
|
for d in task_data["data"]]
|
|
|
|
def save_predictions(self, audio_paths):
|
|
results = []
|
|
for gt, response, audio_path in zip(self.references, self.predictions, audio_paths):
|
|
results.append({
|
|
'gt': gt,
|
|
'response': response,
|
|
'audio_path': audio_path,
|
|
})
|
|
time_prefix = time.strftime('%y%m%d%H%M%S', time.localtime())
|
|
results_file = os.path.join(self.output_dir, f'{self.task_name }_{time_prefix}.json') if self.output_dir else f'{self.task_name }_{time_prefix}.json'
|
|
json.dump(results, open(results_file, 'w'))
|
|
|
|
|
|
def evaluate(self) -> Dict[str, float]:
|
|
self.predictions = []
|
|
self.references = []
|
|
for inst in tqdm.tqdm(self.data):
|
|
audio_path = os.path.join(self.audio_dir, inst.input["audio_file"])
|
|
prompt = inst.input["prompt"]
|
|
try:
|
|
response = self.model.generate(prompt, audio_path=audio_path)
|
|
except:
|
|
print("error audio {}".format(inst.input["audio_file"]))
|
|
continue
|
|
|
|
self.predictions.append(response)
|
|
self.references.append(prompt)
|
|
|
|
|
|
def run_inference(self):
|
|
clap_score = calculate_clap_score(self.predictions, self.references)
|
|
return clap_score
|
|
|
|
|
|
class VideoToAudio(BaseTask):
|
|
def __init__(self, task_data: Dict[str, Any], model: AudioGenerationModel, audio_dir: str = None, output_dir: str = None, task_name: str = None):
|
|
super().__init__(task_data, model, audio_dir, output_dir, task_name)
|
|
self._get_choice_candidate()
|
|
|
|
def _get_choice_candidate(self):
|
|
|
|
pass
|
|
|
|
def _parse_data(self, task_data: Dict[str, Any]) -> List[Instance]:
|
|
return [Instance(input=d["input"], output=d["output"], id=d["id"])
|
|
for d in task_data["data"]]
|
|
|
|
def evaluate(self) -> Dict[str, float]:
|
|
self.predictions = []
|
|
self.references = []
|
|
for inst in tqdm.tqdm(self.data):
|
|
video_path = os.path.join(self.audio_dir, inst.input["video_file"])
|
|
prompt = inst.input["prompt"]
|
|
try:
|
|
response = self.model.generate(prompt, video_path=video_path)
|
|
except:
|
|
print("error video {}".format(inst.input["video_file"]))
|
|
continue
|
|
|
|
self.predictions.append(response)
|
|
self.references.append(prompt)
|
|
|
|
def run_inference(self):
|
|
fad_score = calculate_fad_score(
|
|
background_dir=self.audio_dir,
|
|
eval_dir=self.output_dir
|
|
)
|
|
return fad_score
|
|
|
|
|
|
class ImageToSpeech(BaseTask):
|
|
def __init__(self, task_data: Dict[str, Any], model: AudioGenerationModel, audio_dir: str = None, output_dir: str = None, task_name: str = None):
|
|
super().__init__(task_data, model, audio_dir, output_dir, task_name)
|
|
self._get_choice_candidate()
|
|
|
|
def _get_choice_candidate(self):
|
|
|
|
pass
|
|
|
|
def _parse_data(self, task_data: Dict[str, Any]) -> List[Instance]:
|
|
return [Instance(input=d["input"], output=d["output"], id=d["id"])
|
|
for d in task_data["data"]]
|
|
|
|
def evaluate(self) -> Dict[str, float]:
|
|
|
|
self.predictions = []
|
|
self.references = []
|
|
for inst in tqdm.tqdm(self.data):
|
|
image_path = os.path.join(self.audio_dir, inst.input["image_file"])
|
|
prompt = inst.input["prompt"]
|
|
try:
|
|
response = self.model.generate(prompt, image_path=image_path)
|
|
except:
|
|
print("error image {}".format(inst.input["image_file"]))
|
|
continue
|
|
|
|
self.predictions.append(response)
|
|
self.references.append(prompt)
|
|
|
|
def run_inference(self):
|
|
CIDEr_score = calculate_CIDEr_score(
|
|
audio_file_list=self.predictions,
|
|
text_file_list=self.references
|
|
)
|
|
return CIDEr_score
|
|
|
|
|
|
def log_performance_csv(model_name, task_name, metric, score, root_path, output_file='prediction.json'):
|
|
import csv
|
|
file_exists = os.path.isfile(os.path.join(root_path, output_file))
|
|
|
|
row_data = {
|
|
'model': model_name,
|
|
'task': task_name,
|
|
'metric': metric,
|
|
'score': str(score),
|
|
}
|
|
|
|
with open(os.path.join(root_path, output_file), mode='a', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=row_data.keys())
|
|
if not file_exists:
|
|
writer.writeheader()
|
|
|
|
writer.writerow(row_data)
|
|
|
|
|
|
def log_performance_json(model_name, task_name, metric, score, root_path, output_file='prediction.json'):
|
|
import json
|
|
log_data = {
|
|
'model': model_name,
|
|
'task': task_name,
|
|
'metric': metric,
|
|
'score': str(score),
|
|
}
|
|
|
|
log_file_path = os.path.join(root_path, output_file)
|
|
|
|
if os.path.exists(log_file_path):
|
|
with open(log_file_path, 'r') as f:
|
|
existing_data = json.load(f)
|
|
else:
|
|
existing_data = []
|
|
|
|
existing_data.append(log_data)
|
|
|
|
with open(log_file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(existing_data, f, indent=4)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Run audio generation tasks")
|
|
parser.add_argument('-m', '--model_name', type=str, required=True, help='Name of the audio generation model to use')
|
|
parser.add_argument('-d', '--data_dir', type=str, default='./audio/generation/', help='Directory containing task data')
|
|
parser.add_argument('-o', '--output_dir', type=str, default='./audio/predictions/generation/', help='Directory to save predictions for each task')
|
|
parser.add_argument('-r', '--root_path', type=str, default='./', help='Root path for logging performance')
|
|
parser.add_argument('-t', '--task_names', type=str, nargs='+',
|
|
help='List of task names to run (for example: SingleCaptionToAudio VideoToAudio ImageToSpeech)')
|
|
args = parser.parse_args()
|
|
|
|
|
|
model = AudioGenerationModel(model_name=args.model_name)
|
|
|
|
|
|
|
|
|
|
task_name_list = [
|
|
'SingleCaptionToAudio', 'VideoToAudio', 'ImageToSpeech',
|
|
|
|
]
|
|
|
|
if args.task_names is None or len(args.task_names) == 0:
|
|
args.task_names = task_name_list
|
|
|
|
for task_name in args.task_names:
|
|
|
|
|
|
if task_name in globals():
|
|
task_class = globals()[task_name]
|
|
else:
|
|
|
|
print(f"Task {task_name} is not defined in the current scope.")
|
|
continue
|
|
|
|
|
|
import glob
|
|
json_file_list = glob.glob(os.path.join(args.data_dir, task_name, "*.json"))
|
|
if len(json_file_list) == 0:
|
|
print(f"No JSON files found for task: {task_name}")
|
|
continue
|
|
elif len(json_file_list) > 1:
|
|
print(f"Multiple JSON files found for task: {task_name}, using the first one: {json_file_list[0]}")
|
|
task_annotation_data = json_file_list[0]
|
|
else:
|
|
task_annotation_data = json_file_list[0]
|
|
print(f"Using task annotation data: {task_annotation_data}")
|
|
task = task_class(
|
|
task_data=task_annotation_data,
|
|
model=model,
|
|
audio_dir=os.path.join(args.data_dir, task_name, 'audios'),
|
|
output_dir=args.output_dir
|
|
)
|
|
|
|
|
|
|
|
print(f"Running inference for task: {task_name}")
|
|
task.run_inference()
|
|
|
|
|
|
|
|
|
|
|
|
eval_results = task.evaluate()
|
|
print("Task name: ", task_name, "Evaluation results:", eval_results)
|
|
log_performance_json(
|
|
model_name=args.model_name,
|
|
task_name=task_name,
|
|
metric=list(eval_results.keys())[0].split('_')[0],
|
|
score=eval_results[list(eval_results.keys())[0]],
|
|
root_path=args.data_dir)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|