Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
from spectrogram import logmelspectrogram | |
import numpy as np | |
from joblib import Parallel, delayed | |
import librosa | |
import soundfile as sf | |
import os | |
from glob import glob | |
from tqdm import tqdm | |
import random | |
import json | |
import resampy | |
import pyworld as pw | |
def extract_logmel(wav_path, sr=16000): | |
# wav, fs = librosa.load(wav_path, sr=sr) | |
wav, fs = sf.read(wav_path) | |
wav, _ = librosa.effects.trim(wav, top_db=60) | |
if fs != sr: | |
wav = resampy.resample(wav, fs, sr, axis=0) | |
fs = sr | |
# duration = len(wav)/fs | |
assert fs == 16000 | |
peak = np.abs(wav).max() | |
if peak > 1.0: | |
wav /= peak | |
mel = logmelspectrogram( | |
x=wav, | |
fs=fs, | |
n_mels=80, | |
n_fft=400, | |
n_shift=160, | |
win_length=400, | |
window='hann', | |
fmin=80, | |
fmax=7600, | |
) | |
tlen = mel.shape[0] | |
frame_period = 160/fs*1000 | |
f0, timeaxis = pw.dio(wav.astype('float64'), fs, frame_period=frame_period) | |
f0 = pw.stonemask(wav.astype('float64'), f0, timeaxis, fs) | |
f0 = f0[:tlen].reshape(-1).astype('float32') | |
nonzeros_indices = np.nonzero(f0) | |
lf0 = f0.copy() | |
lf0[nonzeros_indices] = np.log(f0[nonzeros_indices]) # for f0(Hz), lf0 > 0 when f0 != 0 | |
wav_name = os.path.basename(wav_path).split('.')[0] | |
# print(wav_name, mel.shape, duration) | |
return wav_name, mel, lf0, mel.shape[0] | |
def normalize_logmel(wav_name, mel, mean, std): | |
mel = (mel - mean) / (std + 1e-8) | |
return wav_name, mel | |
def save_one_file(save_path, arr): | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
np.save(save_path, arr) | |
def save_logmel(save_root, wav_name, melinfo, mode): | |
mel, lf0, mel_len = melinfo | |
spk = wav_name.split('_')[0] | |
mel_save_path = f'{save_root}/{mode}/mels/{spk}/{wav_name}.npy' | |
lf0_save_path = f'{save_root}/{mode}/lf0/{spk}/{wav_name}.npy' | |
save_one_file(mel_save_path, mel) | |
save_one_file(lf0_save_path, lf0) | |
return mel_len, mel_save_path, lf0_save_path | |
# def get_wavs_names(spks, data_root) | |
data_root = '/Dataset/VCTK-Corpus/wav48_silence_trimmed' | |
save_root = 'data' | |
os.makedirs(save_root, exist_ok=True) | |
spk_info_txt = '/Dataset/VCTK-Corpus/speaker-info.txt' | |
f = open(spk_info_txt, 'r') | |
gen2spk = {} | |
all_spks = [] | |
for i, line in enumerate(f): | |
if i == 0: | |
continue | |
else: | |
tmp = line.split() | |
# print(tmp) | |
spk = tmp[0] | |
all_spks.append(spk) | |
gen = tmp[2] | |
if gen not in gen2spk: | |
gen2spk[gen] = [spk] | |
else: | |
gen2spk[gen].append(spk) | |
random.shuffle(all_spks) | |
train_spks = all_spks[:-20] | |
test_spks = all_spks[-20:] | |
train_wavs_names = [] | |
valid_wavs_names = [] | |
test_wavs_names = [] | |
print('all_spks:', all_spks) | |
for spk in train_spks: | |
spk_wavs = glob(f'{data_root}/{spk}/*mic1.flac') | |
print('len(spk_wavs):', len(spk_wavs)) | |
spk_wavs_names = [os.path.basename(p).split('.')[0] for p in spk_wavs] | |
valid_names = random.sample(spk_wavs_names, int(len(spk_wavs_names)*0.1)) | |
train_names = [n for n in spk_wavs_names if n not in valid_names] | |
train_wavs_names += train_names | |
valid_wavs_names += valid_names | |
for spk in test_spks: | |
spk_wavs = glob(f'{data_root}/{spk}/*mic1.flac') | |
print('len(spk_wavs):', len(spk_wavs)) | |
spk_wavs_names = [os.path.basename(p).split('.')[0] for p in spk_wavs] | |
test_wavs_names += spk_wavs_names | |
print(len(train_wavs_names)) | |
print(len(valid_wavs_names)) | |
print(len(test_wavs_names)) | |
# extract log-mel | |
print('extract log-mel...') | |
all_wavs = glob(f'{data_root}/*/*mic1.flac') | |
results = Parallel(n_jobs=-1)(delayed(extract_logmel)(wav_path) for wav_path in tqdm(all_wavs)) | |
wn2mel = {} | |
for r in results: | |
wav_name, mel, lf0, mel_len = r | |
# print(wav_name, mel.shape, duration) | |
wn2mel[wav_name] = [mel, lf0, mel_len] | |
# normalize log-mel | |
print('normalize log-mel...') | |
mels = [] | |
spk2lf0 = {} | |
for wav_name in train_wavs_names: | |
mel, _, _ = wn2mel[wav_name] | |
mels.append(mel) | |
mels = np.concatenate(mels, 0) | |
mean = np.mean(mels, 0) | |
std = np.std(mels, 0) | |
mel_stats = np.concatenate([mean.reshape(1,-1), std.reshape(1,-1)], 0) | |
np.save(f'{save_root}/mel_stats.npy', mel_stats) | |
results = Parallel(n_jobs=-1)(delayed(normalize_logmel)(wav_name, wn2mel[wav_name][0], mean, std) for wav_name in tqdm(wn2mel.keys())) | |
wn2mel_new = {} | |
for r in results: | |
wav_name, mel = r | |
lf0 = wn2mel[wav_name][1] | |
mel_len = wn2mel[wav_name][2] | |
wn2mel_new[wav_name] = [mel, lf0, mel_len] | |
# save log-mel | |
print('save log-mel...') | |
train_results = Parallel(n_jobs=-1)(delayed(save_logmel)(save_root, wav_name, wn2mel_new[wav_name], 'train') for wav_name in tqdm(train_wavs_names)) | |
valid_results = Parallel(n_jobs=-1)(delayed(save_logmel)(save_root, wav_name, wn2mel_new[wav_name], 'valid') for wav_name in tqdm(valid_wavs_names)) | |
test_results = Parallel(n_jobs=-1)(delayed(save_logmel)(save_root, wav_name, wn2mel_new[wav_name], 'test') for wav_name in tqdm(test_wavs_names)) | |
def save_json(save_root, results, mode): | |
fp = open(f'{save_root}/{mode}.json', 'w') | |
json.dump(results, fp, indent=4) | |
fp.close() | |
save_json(save_root, train_results, 'train') | |
save_json(save_root, valid_results, 'valid') | |
save_json(save_root, test_results, 'test') | |