import os import librosa import numpy as np import torch from tqdm import tqdm from metrics.pipelines import sample_pipeline, inpaint_pipeline, sample_pipeline_GAN from metrics.pipelines_STFT import sample_pipeline_STFT, sample_pipeline_GAN_STFT from tools import rms_normalize, pad_STFT, encode_stft from webUI.natural_language_guided.utils import InputBatch2Encode_STFT def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path): VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder diffuSynth_probabilities = [] # Step 1: Load all wav files in AudioLDM_signals_directory_path AudioLDM_signals = [] signal_lengths = set() target_length = 4 * 16000 # 4 seconds * 16000 samples per second for file_name in os.listdir(AudioLDM_signals_directory_path): if file_name.endswith('.wav') and not file_name.startswith('._'): file_path = os.path.join(AudioLDM_signals_directory_path, file_name) signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000 if len(signal) >= target_length: signal = signal[:target_length] # Take only the first 4 seconds else: raise ValueError(f"The file {file_name} is shorter than 4 seconds.") # Normalize AudioLDM_signals.append(rms_normalize(signal)) signal_lengths.add(len(signal)) # Step 2: Check if all signals have the same length if len(signal_lengths) != 1: raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.") encoded_audios = [] for origin_audio in AudioLDM_signals: D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024) padded_D = pad_STFT(D) encoded_D = encode_stft(padded_D) encoded_audios.append(encoded_D) encoded_audios_np = np.array(encoded_audios) origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device) # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length] batch_size = 8 num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size)) spectrogram_batches = [] for i in range(num_batches): batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size] spectrogram_batches.append(batch) for spectrogram_batch in tqdm(spectrogram_batches): spectrogram_batch = spectrogram_batch.to(device) _, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer, squared=False) quantized_latent_representations = quantized_latent_representations feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) return inception_score(np.array(diffuSynth_probabilities)) # def get_inception_score_for_AudioLDM(device, timbre_encoder, VAE, AudioLDM_signals_directory_path): # VAE_encoder, VAE_quantizer, VAE_decoder = VAE._encoder, VAE._vq_vae, VAE._decoder # # diffuSynth_probabilities = [] # # # Step 1: Load all wav files in AudioLDM_signals_directory_path # AudioLDM_signals = [] # signal_lengths = set() # # for file_name in os.listdir(AudioLDM_signals_directory_path): # if file_name.endswith('.wav'): # file_path = os.path.join(AudioLDM_signals_directory_path, file_name) # signal, sr = librosa.load(file_path, sr=16000) # Load audio file with sampling rate 16000 # # Normalize # AudioLDM_signals.append(rms_normalize(signal)) # signal_lengths.add(len(signal)) # # # Step 2: Check if all signals have the same length # if len(signal_lengths) != 1: # raise ValueError("Not all signals have the same length. Please ensure all audio files are of the same length.") # # encoded_audios = [] # for origin_audio in AudioLDM_signals: # D = librosa.stft(origin_audio, n_fft=1024, hop_length=256, win_length=1024) # padded_D = pad_STFT(D) # encoded_D = encode_stft(padded_D) # encoded_audios.append(encoded_D) # encoded_audios_np = np.array(encoded_audios) # origin_spectrogram_batch_tensor = torch.from_numpy(encoded_audios_np).float().to(device) # # # # Step 3: Reshape to signal_batches [number_batches, batch_size=8, signal_length] # batch_size = 8 # num_batches = int(np.ceil(origin_spectrogram_batch_tensor.shape[0] / batch_size)) # spectrogram_batches = [] # for i in range(num_batches): # batch = origin_spectrogram_batch_tensor[i * batch_size:(i + 1) * batch_size] # spectrogram_batches.append(batch) # # # for spectrogram_batch in tqdm(spectrogram_batches): # spectrogram_batch = spectrogram_batch.to(device) # _, _, _, _, quantized_latent_representations = InputBatch2Encode_STFT(VAE_encoder, spectrogram_batch, quantizer=VAE_quantizer,squared=False) # quantized_latent_representations = quantized_latent_representations # feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) # probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) # # diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) # # return inception_score(np.array(diffuSynth_probabilities)) def get_inception_score(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"): diffuSynth_probabilities = [] if task == "spectrograms": pipe = sample_pipeline elif task == "STFT": pipe = sample_pipeline_STFT else: raise NotImplementedError for _ in tqdm(range(num_batches)): quantized_latent_representations = pipe(device, uNet, VAE, MMM, CLAP_tokenizer, positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) quantized_latent_representations = quantized_latent_representations.to(device) feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) return inception_score(np.array(diffuSynth_probabilities)) def get_inception_score_GAN(device, gan_generator, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=1, sample_steps=10, task="spectrograms"): diffuSynth_probabilities = [] if task == "spectrograms": pipe = sample_pipeline_GAN elif task == "STFT": pipe = sample_pipeline_GAN_STFT else: raise NotImplementedError for _ in tqdm(range(num_batches)): quantized_latent_representations = pipe(device, gan_generator, VAE, MMM, CLAP_tokenizer, positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) quantized_latent_representations = quantized_latent_representations.to(device) feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) diffuSynth_probabilities.extend(probabilities.to("cpu").detach().numpy()) return inception_score(np.array(diffuSynth_probabilities)) def predict_qualities_with_diffuSynth_sample(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, positive_prompts, negative_prompts="", CFG=6, sample_steps=10): diffuSynth_qualities = [] for _ in tqdm(range(num_batches)): quantized_latent_representations = sample_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer, positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None) quantized_latent_representations = quantized_latent_representations.to(device) feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) qualities = qualities.to("cpu").detach().numpy() # qualities = np.where(qualities > 0.5, 1, 0) diffuSynth_qualities.extend(qualities) return np.mean(diffuSynth_qualities, axis=0) def generate_probabilities_with_diffuSynth_inpaint(device, uNet, VAE, MMM, CLAP_tokenizer, timbre_encoder, num_batches, guidance, duration, use_dynamic_mask, noising_strength, positive_prompts, negative_prompts="", CFG=6, sample_steps=10): inpaint_probabilities, signals = [], [] for _ in tqdm(range(num_batches)): quantized_latent_representations, _, rec_signals = inpaint_pipeline(device, uNet, VAE, MMM, CLAP_tokenizer, use_dynamic_mask=use_dynamic_mask, noising_strength=noising_strength, guidance=guidance, positive_prompts=positive_prompts, negative_prompts=negative_prompts, batchsize=8, sample_steps=sample_steps, CFG=CFG, seed=None, duration=duration, mask_flexivity=0.999, return_latent=False) quantized_latent_representations = quantized_latent_representations.to(device) feature, instrument_logits, instrument_family_logits, velocity_logits, qualities = timbre_encoder(quantized_latent_representations) probabilities = torch.nn.functional.softmax(instrument_logits, dim=1) inpaint_probabilities.extend(probabilities.to("cpu").detach().numpy()) signals.extend(rec_signals) return np.array(inpaint_probabilities), signals def inception_score(pred): # 计算每个图像的条件概率分布 P(y|x) pyx = pred / np.sum(pred, axis=1, keepdims=True) # 计算整个数据集的边缘概率分布 P(y) py = np.mean(pyx, axis=0, keepdims=True) # 计算KL散度 kl_div = pyx * (np.log(pyx + 1e-11) - np.log(py + 1e-11)) # 对所有图像求和并平均 kl_div_sum = np.sum(kl_div, axis=1) score = np.exp(np.mean(kl_div_sum)) return score