|
from io import BytesIO |
|
import json |
|
import os |
|
import re |
|
import struct |
|
import warnings |
|
from collections import OrderedDict |
|
|
|
import librosa |
|
import numpy as np |
|
import parselmouth |
|
import pyloudnorm as pyln |
|
import resampy |
|
import torch |
|
import torchcrepe |
|
import webrtcvad |
|
from scipy.ndimage.morphology import binary_dilation |
|
from skimage.transform import resize |
|
import pyworld as world |
|
|
|
from utils import audio |
|
from utils.pitch_utils import f0_to_coarse |
|
from utils.text_encoder import TokenTextEncoder |
|
|
|
warnings.filterwarnings("ignore") |
|
PUNCS = '!,.?;:' |
|
|
|
int16_max = (2 ** 15) - 1 |
|
|
|
|
|
def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12): |
|
""" |
|
Ensures that segments without voice in the waveform remain no longer than a |
|
threshold determined by the VAD parameters in params.py. |
|
:param wav: the raw waveform as a numpy array of floats |
|
:param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have. |
|
:return: the same waveform with silences trimmed away (length <= original wav length) |
|
""" |
|
|
|
|
|
|
|
|
|
sampling_rate = 16000 |
|
wav_raw, sr = librosa.core.load(path, sr=sr) |
|
|
|
if norm: |
|
meter = pyln.Meter(sr) |
|
loudness = meter.integrated_loudness(wav_raw) |
|
wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0) |
|
if np.abs(wav_raw).max() > 1.0: |
|
wav_raw = wav_raw / np.abs(wav_raw).max() |
|
|
|
wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best') |
|
|
|
vad_window_length = 30 |
|
|
|
|
|
vad_moving_average_width = 8 |
|
|
|
|
|
samples_per_window = (vad_window_length * sampling_rate) // 1000 |
|
|
|
|
|
wav = wav[:len(wav) - (len(wav) % samples_per_window)] |
|
|
|
|
|
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16)) |
|
|
|
|
|
voice_flags = [] |
|
vad = webrtcvad.Vad(mode=3) |
|
for window_start in range(0, len(wav), samples_per_window): |
|
window_end = window_start + samples_per_window |
|
voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], |
|
sample_rate=sampling_rate)) |
|
voice_flags = np.array(voice_flags) |
|
|
|
|
|
def moving_average(array, width): |
|
array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2))) |
|
ret = np.cumsum(array_padded, dtype=float) |
|
ret[width:] = ret[width:] - ret[:-width] |
|
return ret[width - 1:] / width |
|
|
|
audio_mask = moving_average(voice_flags, vad_moving_average_width) |
|
audio_mask = np.round(audio_mask).astype(np.bool) |
|
|
|
|
|
audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1)) |
|
audio_mask = np.repeat(audio_mask, samples_per_window) |
|
audio_mask = resize(audio_mask, (len(wav_raw),)) > 0 |
|
if return_raw_wav: |
|
return wav_raw, audio_mask, sr |
|
return wav_raw[audio_mask], audio_mask, sr |
|
|
|
|
|
def process_utterance(wav_path, |
|
fft_size=1024, |
|
hop_size=256, |
|
win_length=1024, |
|
window="hann", |
|
num_mels=80, |
|
fmin=80, |
|
fmax=7600, |
|
eps=1e-6, |
|
sample_rate=22050, |
|
loud_norm=False, |
|
min_level_db=-100, |
|
return_linear=False, |
|
trim_long_sil=False, vocoder='pwg'): |
|
if isinstance(wav_path, str) or isinstance(wav_path, BytesIO): |
|
if trim_long_sil: |
|
wav, _, _ = trim_long_silences(wav_path, sample_rate) |
|
else: |
|
wav, _ = librosa.core.load(wav_path, sr=sample_rate) |
|
else: |
|
wav = wav_path |
|
if loud_norm: |
|
meter = pyln.Meter(sample_rate) |
|
loudness = meter.integrated_loudness(wav) |
|
wav = pyln.normalize.loudness(wav, loudness, -22.0) |
|
if np.abs(wav).max() > 1: |
|
wav = wav / np.abs(wav).max() |
|
|
|
|
|
x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size, |
|
win_length=win_length, window=window, pad_mode="constant") |
|
spc = np.abs(x_stft) |
|
|
|
|
|
fmin = 0 if fmin == -1 else fmin |
|
fmax = sample_rate / 2 if fmax == -1 else fmax |
|
mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax) |
|
mel = mel_basis @ spc |
|
|
|
if vocoder == 'pwg': |
|
mel = np.log10(np.maximum(eps, mel)) |
|
else: |
|
assert False, f'"{vocoder}" is not in ["pwg"].' |
|
|
|
l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1) |
|
wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0) |
|
wav = wav[:mel.shape[1] * hop_size] |
|
|
|
if not return_linear: |
|
return wav, mel |
|
else: |
|
spc = audio.amp_to_db(spc) |
|
spc = audio.normalize(spc, {'min_level_db': min_level_db}) |
|
return wav, mel, spc |
|
|
|
|
|
def get_pitch_parselmouth(wav_data, mel, hparams): |
|
""" |
|
|
|
:param wav_data: [T] |
|
:param mel: [T, 80] |
|
:param hparams: |
|
:return: |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return get_pitch_world(wav_data, mel, hparams) |
|
|
|
def get_pitch_world(wav_data, mel, hparams): |
|
""" |
|
|
|
:param wav_data: [T] |
|
:param mel: [T, 80] |
|
:param hparams: |
|
:return: |
|
""" |
|
time_step = 1000 * hparams['hop_size'] / hparams['audio_sample_rate'] |
|
f0_min = hparams['f0_min'] |
|
f0_max = hparams['f0_max'] |
|
|
|
|
|
f0, _ = world.harvest(wav_data.astype(np.double), hparams['audio_sample_rate'], f0_min, f0_max, time_step) |
|
|
|
|
|
len_diff = len(mel) - len(f0) |
|
if len_diff > 0: |
|
pad_len = (len_diff + 1) // 2 |
|
f0 = np.pad(f0, [[pad_len, len_diff - pad_len]]) |
|
else: |
|
pad_len = (1 - len_diff) // 2 |
|
rpad = pad_len + len_diff |
|
if rpad != 0: |
|
f0 = f0[pad_len:rpad] |
|
f0 = f0[pad_len:] |
|
pitch_coarse = f0_to_coarse(f0, hparams) |
|
return f0, pitch_coarse |
|
|
|
|
|
def get_pitch_crepe(wav_data, mel, hparams, threshold=0.05): |
|
|
|
device = torch.device("cuda") |
|
|
|
wav16k = resampy.resample(wav_data, hparams['audio_sample_rate'], 16000) |
|
wav16k_torch = torch.FloatTensor(wav16k).unsqueeze(0).to(device) |
|
|
|
|
|
f0_min = hparams['f0_min'] |
|
f0_max = hparams['f0_max'] |
|
|
|
|
|
f0, pd = torchcrepe.predict(wav16k_torch, 16000, 80, f0_min, f0_max, pad=True, model='full', batch_size=1024, |
|
device=device, return_periodicity=True) |
|
|
|
|
|
pd = torchcrepe.filter.median(pd, 3) |
|
pd = torchcrepe.threshold.Silence(-60.)(pd, wav16k_torch, 16000, 80) |
|
f0 = torchcrepe.threshold.At(threshold)(f0, pd) |
|
f0 = torchcrepe.filter.mean(f0, 3) |
|
|
|
|
|
f0 = torch.where(torch.isnan(f0), torch.full_like(f0, 0), f0) |
|
|
|
''' |
|
np.savetxt('问棋-crepe.csv',np.array([0.005*np.arange(len(f0[0])),f0[0].cpu().numpy()]).transpose(),delimiter=',') |
|
''' |
|
|
|
|
|
nzindex = torch.nonzero(f0[0]).squeeze() |
|
f0 = torch.index_select(f0[0], dim=0, index=nzindex).cpu().numpy() |
|
time_org = 0.005 * nzindex.cpu().numpy() |
|
time_frame = np.arange(len(mel)) * hparams['hop_size'] / hparams['audio_sample_rate'] |
|
if f0.shape[0] == 0: |
|
f0 = torch.FloatTensor(time_frame.shape[0]).fill_(0) |
|
print('f0 all zero!') |
|
else: |
|
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) |
|
pitch_coarse = f0_to_coarse(f0, hparams) |
|
return f0, pitch_coarse |
|
|
|
|
|
def remove_empty_lines(text): |
|
"""remove empty lines""" |
|
assert (len(text) > 0) |
|
assert (isinstance(text, list)) |
|
text = [t.strip() for t in text] |
|
if "" in text: |
|
text.remove("") |
|
return text |
|
|
|
|
|
class TextGrid(object): |
|
def __init__(self, text): |
|
text = remove_empty_lines(text) |
|
self.text = text |
|
self.line_count = 0 |
|
self._get_type() |
|
self._get_time_intval() |
|
self._get_size() |
|
self.tier_list = [] |
|
self._get_item_list() |
|
|
|
def _extract_pattern(self, pattern, inc): |
|
""" |
|
Parameters |
|
---------- |
|
pattern : regex to extract pattern |
|
inc : increment of line count after extraction |
|
Returns |
|
------- |
|
group : extracted info |
|
""" |
|
try: |
|
group = re.match(pattern, self.text[self.line_count]).group(1) |
|
self.line_count += inc |
|
except AttributeError: |
|
raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count])) |
|
return group |
|
|
|
def _get_type(self): |
|
self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) |
|
|
|
def _get_time_intval(self): |
|
self.xmin = self._extract_pattern(r"xmin = (.*)", 1) |
|
self.xmax = self._extract_pattern(r"xmax = (.*)", 2) |
|
|
|
def _get_size(self): |
|
self.size = int(self._extract_pattern(r"size = (.*)", 2)) |
|
|
|
def _get_item_list(self): |
|
"""Only supports IntervalTier currently""" |
|
for itemIdx in range(1, self.size + 1): |
|
tier = OrderedDict() |
|
item_list = [] |
|
tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) |
|
tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) |
|
if tier_class != "IntervalTier": |
|
raise NotImplementedError("Only IntervalTier class is supported currently") |
|
tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) |
|
tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) |
|
tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) |
|
tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) |
|
for i in range(int(tier_size)): |
|
item = OrderedDict() |
|
item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) |
|
item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) |
|
item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) |
|
item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) |
|
item_list.append(item) |
|
tier["idx"] = tier_idx |
|
tier["class"] = tier_class |
|
tier["name"] = tier_name |
|
tier["xmin"] = tier_xmin |
|
tier["xmax"] = tier_xmax |
|
tier["size"] = tier_size |
|
tier["items"] = item_list |
|
self.tier_list.append(tier) |
|
|
|
def toJson(self): |
|
_json = OrderedDict() |
|
_json["file_type"] = self.file_type |
|
_json["xmin"] = self.xmin |
|
_json["xmax"] = self.xmax |
|
_json["size"] = self.size |
|
_json["tiers"] = self.tier_list |
|
return json.dumps(_json, ensure_ascii=False, indent=2) |
|
|
|
|
|
def get_mel2ph(tg_fn, ph, mel, hparams): |
|
ph_list = ph.split(" ") |
|
with open(tg_fn, "r", encoding='utf-8') as f: |
|
tg = f.readlines() |
|
tg = remove_empty_lines(tg) |
|
tg = TextGrid(tg) |
|
tg = json.loads(tg.toJson()) |
|
split = np.ones(len(ph_list) + 1, np.float) * -1 |
|
tg_idx = 0 |
|
ph_idx = 0 |
|
tg_align = [x for x in tg['tiers'][-1]['items']] |
|
tg_align_ = [] |
|
for x in tg_align: |
|
x['xmin'] = float(x['xmin']) |
|
x['xmax'] = float(x['xmax']) |
|
if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']: |
|
x['text'] = '' |
|
if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': |
|
tg_align_[-1]['xmax'] = x['xmax'] |
|
continue |
|
tg_align_.append(x) |
|
tg_align = tg_align_ |
|
tg_len = len([x for x in tg_align if x['text'] != '']) |
|
ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) |
|
assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn) |
|
while tg_idx < len(tg_align) or ph_idx < len(ph_list): |
|
if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): |
|
split[ph_idx] = 1e8 |
|
ph_idx += 1 |
|
continue |
|
x = tg_align[tg_idx] |
|
if x['text'] == '' and ph_idx == len(ph_list): |
|
tg_idx += 1 |
|
continue |
|
assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn) |
|
ph = ph_list[ph_idx] |
|
if x['text'] == '' and not is_sil_phoneme(ph): |
|
assert False, (ph_list, tg_align) |
|
if x['text'] != '' and is_sil_phoneme(ph): |
|
ph_idx += 1 |
|
else: |
|
assert (x['text'] == '' and is_sil_phoneme(ph)) \ |
|
or x['text'].lower() == ph.lower() \ |
|
or x['text'].lower() == 'sil', (x['text'], ph) |
|
split[ph_idx] = x['xmin'] |
|
if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]): |
|
split[ph_idx - 1] = split[ph_idx] |
|
ph_idx += 1 |
|
tg_idx += 1 |
|
assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) |
|
assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn) |
|
mel2ph = np.zeros([mel.shape[0]], np.int) |
|
split[0] = 0 |
|
split[-1] = 1e8 |
|
for i in range(len(split) - 1): |
|
assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],) |
|
split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split] |
|
for ph_idx in range(len(ph_list)): |
|
mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1 |
|
mel2ph_torch = torch.from_numpy(mel2ph) |
|
T_t = len(ph_list) |
|
dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch)) |
|
dur = dur[1:].numpy() |
|
return mel2ph, dur |
|
|
|
|
|
def build_phone_encoder(data_dir): |
|
phone_list_file = os.path.join(data_dir, 'phone_set.json') |
|
phone_list = json.load(open(phone_list_file, encoding='utf-8')) |
|
return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',') |
|
|
|
|
|
def is_sil_phoneme(p): |
|
return not p[0].isalpha() |
|
|