Spaces:
Sleeping
Sleeping
import os | |
import random | |
import json | |
import tgt | |
import librosa | |
import numpy as np | |
import pyworld as pw | |
from scipy.interpolate import interp1d | |
from sklearn.preprocessing import StandardScaler | |
from tqdm import tqdm | |
import audio as Audio | |
class Preprocessor: | |
def __init__(self, config): | |
self.config = config | |
self.emo_dir = config["path"]["emo_path"] | |
self.in_dir = config["path"]["raw_path"] | |
self.out_dir = config["path"]["preprocessed_path"] | |
self.val_size = config["preprocessing"]["val_size"] | |
self.sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] | |
self.hop_length = config["preprocessing"]["stft"]["hop_length"] | |
assert config["preprocessing"]["pitch"]["feature"] in [ | |
"phoneme_level", | |
"frame_level", | |
] | |
assert config["preprocessing"]["energy"]["feature"] in [ | |
"phoneme_level", | |
"frame_level", | |
] | |
self.pitch_phoneme_averaging = ( | |
config["preprocessing"]["pitch"]["feature"] == "phoneme_level" | |
) | |
self.energy_phoneme_averaging = ( | |
config["preprocessing"]["energy"]["feature"] == "phoneme_level" | |
) | |
self.pitch_normalization = config["preprocessing"]["pitch"]["normalization"] | |
self.energy_normalization = config["preprocessing"]["energy"]["normalization"] | |
self.STFT = Audio.stft.TacotronSTFT( | |
config["preprocessing"]["stft"]["filter_length"], | |
config["preprocessing"]["stft"]["hop_length"], | |
config["preprocessing"]["stft"]["win_length"], | |
config["preprocessing"]["mel"]["n_mel_channels"], | |
config["preprocessing"]["audio"]["sampling_rate"], | |
config["preprocessing"]["mel"]["mel_fmin"], | |
config["preprocessing"]["mel"]["mel_fmax"], | |
) | |
def build_from_path(self): | |
os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True) | |
os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True) | |
os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True) | |
os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True) | |
print("Processing Data ...") | |
out = list() | |
n_frames = 0 | |
pitch_scaler = StandardScaler() | |
energy_scaler = StandardScaler() | |
# add emotion dictionary | |
emotions = {} | |
with open(os.path.join(self.emo_dir), "r", encoding='utf-8') as f: | |
i = 0 | |
for line in f: | |
emotion = line.strip().split(None, 1) | |
emotions[emotion[0]] = i | |
i += 1 | |
# Compute pitch, energy, duration, and mel-spectrogram | |
speakers = {} | |
for i, speaker in enumerate(tqdm(os.listdir(self.in_dir))): | |
speakers[speaker] = i | |
for wav_name in os.listdir(os.path.join(self.in_dir, speaker)): | |
if ".wav" not in wav_name: | |
continue | |
basename = wav_name.split(".")[0] | |
tg_path = os.path.join( | |
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format( | |
basename) | |
) | |
if os.path.exists(tg_path): | |
ret = self.process_utterance(speaker, basename) | |
if ret is None: | |
continue | |
else: | |
info, pitch, energy, n = ret | |
out.append(info) | |
if len(pitch) > 0: | |
pitch_scaler.partial_fit(pitch.reshape((-1, 1))) | |
if len(energy) > 0: | |
energy_scaler.partial_fit(energy.reshape((-1, 1))) | |
n_frames += n | |
print("Computing statistic quantities ...") | |
# Perform normalization if necessary | |
if self.pitch_normalization: | |
pitch_mean = pitch_scaler.mean_[0] | |
pitch_std = pitch_scaler.scale_[0] | |
else: | |
# A numerical trick to avoid normalization... | |
pitch_mean = 0 | |
pitch_std = 1 | |
if self.energy_normalization: | |
energy_mean = energy_scaler.mean_[0] | |
energy_std = energy_scaler.scale_[0] | |
else: | |
energy_mean = 0 | |
energy_std = 1 | |
pitch_min, pitch_max = self.normalize( | |
os.path.join(self.out_dir, "pitch"), pitch_mean, pitch_std | |
) | |
energy_min, energy_max = self.normalize( | |
os.path.join(self.out_dir, "energy"), energy_mean, energy_std | |
) | |
# Save files | |
with open(os.path.join(self.out_dir, "speakers.json"), "w") as f: | |
f.write(json.dumps(speakers)) | |
# Save emotions in a json file | |
with open(os.path.join(self.out_dir, "emotions.json"), "w") as f: | |
f.write(json.dumps(emotions)) | |
with open(os.path.join(self.out_dir, "stats.json"), "w") as f: | |
stats = { | |
"pitch": [ | |
float(pitch_min), | |
float(pitch_max), | |
float(pitch_mean), | |
float(pitch_std), | |
], | |
"energy": [ | |
float(energy_min), | |
float(energy_max), | |
float(energy_mean), | |
float(energy_std), | |
], | |
} | |
f.write(json.dumps(stats)) | |
print( | |
"Total time: {} hours".format( | |
n_frames * self.hop_length / self.sampling_rate / 3600 | |
) | |
) | |
random.shuffle(out) | |
out = [r for r in out if r is not None] | |
# Write metadata | |
with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f: | |
for m in out[self.val_size:]: | |
f.write(m + "\n") | |
with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f: | |
for m in out[: self.val_size]: | |
f.write(m + "\n") | |
return out | |
def process_utterance(self, speaker, basename): | |
wav_path = os.path.join(self.in_dir, speaker, | |
"{}.wav".format(basename)) | |
text_path = os.path.join(self.in_dir, speaker, | |
"{}.lab".format(basename)) | |
tg_path = os.path.join( | |
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename) | |
) | |
# Get alignments | |
textgrid = tgt.io.read_textgrid(tg_path) | |
phone, duration, start, end = self.get_alignment( | |
textgrid.get_tier_by_name("phones") | |
) | |
text = "{" + " ".join(phone) + "}" | |
if start >= end: | |
return None | |
# Read and trim wav files | |
wav, _ = librosa.load(wav_path) | |
wav = wav[ | |
int(self.sampling_rate * start): int(self.sampling_rate * end) | |
].astype(np.float32) | |
# Read raw text | |
with open(text_path, "r") as f: | |
raw_text = f.readline().strip("\n") | |
# Compute fundamental frequency | |
pitch, t = pw.dio( | |
wav.astype(np.float64), | |
self.sampling_rate, | |
frame_period=self.hop_length / self.sampling_rate * 1000, | |
) | |
pitch = pw.stonemask(wav.astype(np.float64), | |
pitch, t, self.sampling_rate) | |
pitch = pitch[: sum(duration)] | |
if np.sum(pitch != 0) <= 1: | |
return None | |
# Compute mel-scale spectrogram and energy | |
mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, self.STFT) | |
mel_spectrogram = mel_spectrogram[:, : sum(duration)] | |
energy = energy[: sum(duration)] | |
if self.pitch_phoneme_averaging: | |
# perform linear interpolation | |
nonzero_ids = np.where(pitch != 0)[0] | |
interp_fn = interp1d( | |
nonzero_ids, | |
pitch[nonzero_ids], | |
fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]), | |
bounds_error=False, | |
) | |
pitch = interp_fn(np.arange(0, len(pitch))) | |
# Phoneme-level average | |
pos = 0 | |
for i, d in enumerate(duration): | |
if d > 0: | |
pitch[i] = np.mean(pitch[pos: pos + d]) | |
else: | |
pitch[i] = 0 | |
pos += d | |
pitch = pitch[: len(duration)] | |
if self.energy_phoneme_averaging: | |
# Phoneme-level average | |
pos = 0 | |
for i, d in enumerate(duration): | |
if d > 0: | |
energy[i] = np.mean(energy[pos: pos + d]) | |
else: | |
energy[i] = 0 | |
pos += d | |
energy = energy[: len(duration)] | |
# Save files | |
dur_filename = "{}-duration-{}.npy".format(speaker, basename) | |
np.save(os.path.join(self.out_dir, "duration", dur_filename), duration) | |
pitch_filename = "{}-pitch-{}.npy".format(speaker, basename) | |
np.save(os.path.join(self.out_dir, "pitch", pitch_filename), pitch) | |
energy_filename = "{}-energy-{}.npy".format(speaker, basename) | |
np.save(os.path.join(self.out_dir, "energy", energy_filename), energy) | |
mel_filename = "{}-mel-{}.npy".format(speaker, basename) | |
np.save( | |
os.path.join(self.out_dir, "mel", mel_filename), | |
mel_spectrogram.T, | |
) | |
return ( | |
"|".join([basename, speaker, text, | |
raw_text, basename.split('_')[0].lower()]), | |
self.remove_outlier(pitch), | |
self.remove_outlier(energy), | |
mel_spectrogram.shape[1], | |
) | |
def get_alignment(self, tier): | |
sil_phones = ["sil", "sp", "spn"] | |
phones = [] | |
durations = [] | |
start_time = 0 | |
end_time = 0 | |
end_idx = 0 | |
for t in tier._objects: | |
s, e, p = t.start_time, t.end_time, t.text | |
# Trim leading silences | |
if phones == []: | |
if p in sil_phones: | |
continue | |
else: | |
start_time = s | |
if p not in sil_phones: | |
# For ordinary phones | |
phones.append(p) | |
end_time = e | |
end_idx = len(phones) | |
else: | |
# For silent phones | |
phones.append(p) | |
durations.append( | |
int( | |
np.round(e * self.sampling_rate / self.hop_length) | |
- np.round(s * self.sampling_rate / self.hop_length) | |
) | |
) | |
# Trim tailing silences | |
phones = phones[:end_idx] | |
durations = durations[:end_idx] | |
return phones, durations, start_time, end_time | |
def remove_outlier(self, values): | |
values = np.array(values) | |
p25 = np.percentile(values, 25) | |
p75 = np.percentile(values, 75) | |
lower = p25 - 1.5 * (p75 - p25) | |
upper = p75 + 1.5 * (p75 - p25) | |
normal_indices = np.logical_and(values > lower, values < upper) | |
return values[normal_indices] | |
def normalize(self, in_dir, mean, std): | |
max_value = np.finfo(np.float64).min | |
min_value = np.finfo(np.float64).max | |
for filename in os.listdir(in_dir): | |
filename = os.path.join(in_dir, filename) | |
values = (np.load(filename) - mean) / std | |
np.save(filename, values) | |
max_value = max(max_value, max(values)) | |
min_value = min(min_value, min(values)) | |
return min_value, max_value | |