from io import BytesIO import json import os import re import struct import warnings from collections import OrderedDict import librosa import numpy as np import parselmouth import pyloudnorm as pyln import resampy import torch import torchcrepe import webrtcvad from scipy.ndimage.morphology import binary_dilation from skimage.transform import resize import pyworld as world from utils import audio from utils.pitch_utils import f0_to_coarse from utils.text_encoder import TokenTextEncoder warnings.filterwarnings("ignore") PUNCS = '!,.?;:' int16_max = (2 ** 15) - 1 def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12): """ Ensures that segments without voice in the waveform remain no longer than a threshold determined by the VAD parameters in params.py. :param wav: the raw waveform as a numpy array of floats :param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have. :return: the same waveform with silences trimmed away (length <= original wav length) """ ## Voice Activation Detection # Window size of the VAD. Must be either 10, 20 or 30 milliseconds. # This sets the granularity of the VAD. Should not need to be changed. sampling_rate = 16000 wav_raw, sr = librosa.core.load(path, sr=sr) if norm: meter = pyln.Meter(sr) # create BS.1770 meter loudness = meter.integrated_loudness(wav_raw) wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0) if np.abs(wav_raw).max() > 1.0: wav_raw = wav_raw / np.abs(wav_raw).max() wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best') vad_window_length = 30 # In milliseconds # Number of frames to average together when performing the moving average smoothing. # The larger this value, the larger the VAD variations must be to not get smoothed out. vad_moving_average_width = 8 # Compute the voice detection window size samples_per_window = (vad_window_length * sampling_rate) // 1000 # Trim the end of the audio to have a multiple of the window size wav = wav[:len(wav) - (len(wav) % samples_per_window)] # Convert the float waveform to 16-bit mono PCM pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16)) # Perform voice activation detection voice_flags = [] vad = webrtcvad.Vad(mode=3) for window_start in range(0, len(wav), samples_per_window): window_end = window_start + samples_per_window voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2], sample_rate=sampling_rate)) voice_flags = np.array(voice_flags) # Smooth the voice detection with a moving average def moving_average(array, width): array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2))) ret = np.cumsum(array_padded, dtype=float) ret[width:] = ret[width:] - ret[:-width] return ret[width - 1:] / width audio_mask = moving_average(voice_flags, vad_moving_average_width) audio_mask = np.round(audio_mask).astype(np.bool) # Dilate the voiced regions audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1)) audio_mask = np.repeat(audio_mask, samples_per_window) audio_mask = resize(audio_mask, (len(wav_raw),)) > 0 if return_raw_wav: return wav_raw, audio_mask, sr return wav_raw[audio_mask], audio_mask, sr def process_utterance(wav_path, fft_size=1024, hop_size=256, win_length=1024, window="hann", num_mels=80, fmin=80, fmax=7600, eps=1e-6, sample_rate=22050, loud_norm=False, min_level_db=-100, return_linear=False, trim_long_sil=False, vocoder='pwg'): if isinstance(wav_path, str) or isinstance(wav_path, BytesIO): if trim_long_sil: wav, _, _ = trim_long_silences(wav_path, sample_rate) else: wav, _ = librosa.core.load(wav_path, sr=sample_rate) else: wav = wav_path if loud_norm: meter = pyln.Meter(sample_rate) # create BS.1770 meter loudness = meter.integrated_loudness(wav) wav = pyln.normalize.loudness(wav, loudness, -22.0) if np.abs(wav).max() > 1: wav = wav / np.abs(wav).max() # get amplitude spectrogram x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size, win_length=win_length, window=window, pad_mode="constant") spc = np.abs(x_stft) # (n_bins, T) # get mel basis fmin = 0 if fmin == -1 else fmin fmax = sample_rate / 2 if fmax == -1 else fmax mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax) mel = mel_basis @ spc if vocoder == 'pwg': mel = np.log10(np.maximum(eps, mel)) # (n_mel_bins, T) else: assert False, f'"{vocoder}" is not in ["pwg"].' l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1) wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0) wav = wav[:mel.shape[1] * hop_size] if not return_linear: return wav, mel else: spc = audio.amp_to_db(spc) spc = audio.normalize(spc, {'min_level_db': min_level_db}) return wav, mel, spc def get_pitch_parselmouth(wav_data, mel, hparams): """ :param wav_data: [T] :param mel: [T, 80] :param hparams: :return: """ # time_step = hparams['hop_size'] / hparams['audio_sample_rate'] # f0_min = hparams['f0_min'] # f0_max = hparams['f0_max'] # if hparams['hop_size'] == 128: # pad_size = 4 # elif hparams['hop_size'] == 256: # pad_size = 2 # else: # assert False # f0 = parselmouth.Sound(wav_data, hparams['audio_sample_rate']).to_pitch_ac( # time_step=time_step, voicing_threshold=0.6, # pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency'] # lpad = pad_size * 2 # rpad = len(mel) - len(f0) - lpad # f0 = np.pad(f0, [[lpad, rpad]], mode='constant') # # mel and f0 are extracted by 2 different libraries. we should force them to have the same length. # # Attention: we find that new version of some libraries could cause ``rpad'' to be a negetive value... # # Just to be sure, we recommend users to set up the same environments as them in requirements_auto.txt (by Anaconda) # delta_l = len(mel) - len(f0) # assert np.abs(delta_l) <= 8 # if delta_l > 0: # f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0) # f0 = f0[:len(mel)] # pad_size=(int(len(wav_data) // hparams['hop_size']) - len(f0) + 1) // 2 # f0 = np.pad(f0,[[pad_size,len(mel) - len(f0) - pad_size]], mode='constant') # pitch_coarse = f0_to_coarse(f0, hparams) # return f0, pitch_coarse # Bye bye Parselmouth ! return get_pitch_world(wav_data, mel, hparams) def get_pitch_world(wav_data, mel, hparams): """ :param wav_data: [T] :param mel: [T, 80] :param hparams: :return: """ time_step = 1000 * hparams['hop_size'] / hparams['audio_sample_rate'] f0_min = hparams['f0_min'] f0_max = hparams['f0_max'] # Here's to hoping it uses numpy stuff ! f0, _ = world.harvest(wav_data.astype(np.double), hparams['audio_sample_rate'], f0_min, f0_max, time_step) # Change padding len_diff = len(mel) - len(f0) if len_diff > 0: pad_len = (len_diff + 1) // 2 f0 = np.pad(f0, [[pad_len, len_diff - pad_len]]) else: pad_len = (1 - len_diff) // 2 rpad = pad_len + len_diff if rpad != 0: f0 = f0[pad_len:rpad] f0 = f0[pad_len:] pitch_coarse = f0_to_coarse(f0, hparams) return f0, pitch_coarse def get_pitch_crepe(wav_data, mel, hparams, threshold=0.05): # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device = torch.device("cuda") # crepe只支持16khz采样率,需要重采样 wav16k = resampy.resample(wav_data, hparams['audio_sample_rate'], 16000) wav16k_torch = torch.FloatTensor(wav16k).unsqueeze(0).to(device) # 频率范围 f0_min = hparams['f0_min'] f0_max = hparams['f0_max'] # 重采样后按照hopsize=80,也就是5ms一帧分析f0 f0, pd = torchcrepe.predict(wav16k_torch, 16000, 80, f0_min, f0_max, pad=True, model='full', batch_size=1024, device=device, return_periodicity=True) # 滤波,去掉静音,设置uv阈值,参考原仓库readme pd = torchcrepe.filter.median(pd, 3) pd = torchcrepe.threshold.Silence(-60.)(pd, wav16k_torch, 16000, 80) f0 = torchcrepe.threshold.At(threshold)(f0, pd) f0 = torchcrepe.filter.mean(f0, 3) # 将nan频率(uv部分)转换为0频率 f0 = torch.where(torch.isnan(f0), torch.full_like(f0, 0), f0) ''' np.savetxt('问棋-crepe.csv',np.array([0.005*np.arange(len(f0[0])),f0[0].cpu().numpy()]).transpose(),delimiter=',') ''' # 去掉0频率,并线性插值 nzindex = torch.nonzero(f0[0]).squeeze() f0 = torch.index_select(f0[0], dim=0, index=nzindex).cpu().numpy() time_org = 0.005 * nzindex.cpu().numpy() time_frame = np.arange(len(mel)) * hparams['hop_size'] / hparams['audio_sample_rate'] if f0.shape[0] == 0: f0 = torch.FloatTensor(time_frame.shape[0]).fill_(0) print('f0 all zero!') else: f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1]) pitch_coarse = f0_to_coarse(f0, hparams) return f0, pitch_coarse def remove_empty_lines(text): """remove empty lines""" assert (len(text) > 0) assert (isinstance(text, list)) text = [t.strip() for t in text] if "" in text: text.remove("") return text class TextGrid(object): def __init__(self, text): text = remove_empty_lines(text) self.text = text self.line_count = 0 self._get_type() self._get_time_intval() self._get_size() self.tier_list = [] self._get_item_list() def _extract_pattern(self, pattern, inc): """ Parameters ---------- pattern : regex to extract pattern inc : increment of line count after extraction Returns ------- group : extracted info """ try: group = re.match(pattern, self.text[self.line_count]).group(1) self.line_count += inc except AttributeError: raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count])) return group def _get_type(self): self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) def _get_time_intval(self): self.xmin = self._extract_pattern(r"xmin = (.*)", 1) self.xmax = self._extract_pattern(r"xmax = (.*)", 2) def _get_size(self): self.size = int(self._extract_pattern(r"size = (.*)", 2)) def _get_item_list(self): """Only supports IntervalTier currently""" for itemIdx in range(1, self.size + 1): tier = OrderedDict() item_list = [] tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) if tier_class != "IntervalTier": raise NotImplementedError("Only IntervalTier class is supported currently") tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) for i in range(int(tier_size)): item = OrderedDict() item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) item_list.append(item) tier["idx"] = tier_idx tier["class"] = tier_class tier["name"] = tier_name tier["xmin"] = tier_xmin tier["xmax"] = tier_xmax tier["size"] = tier_size tier["items"] = item_list self.tier_list.append(tier) def toJson(self): _json = OrderedDict() _json["file_type"] = self.file_type _json["xmin"] = self.xmin _json["xmax"] = self.xmax _json["size"] = self.size _json["tiers"] = self.tier_list return json.dumps(_json, ensure_ascii=False, indent=2) def get_mel2ph(tg_fn, ph, mel, hparams): ph_list = ph.split(" ") with open(tg_fn, "r", encoding='utf-8') as f: tg = f.readlines() tg = remove_empty_lines(tg) tg = TextGrid(tg) tg = json.loads(tg.toJson()) split = np.ones(len(ph_list) + 1, np.float) * -1 tg_idx = 0 ph_idx = 0 tg_align = [x for x in tg['tiers'][-1]['items']] tg_align_ = [] for x in tg_align: x['xmin'] = float(x['xmin']) x['xmax'] = float(x['xmax']) if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']: x['text'] = '' if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': tg_align_[-1]['xmax'] = x['xmax'] continue tg_align_.append(x) tg_align = tg_align_ tg_len = len([x for x in tg_align if x['text'] != '']) ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn) while tg_idx < len(tg_align) or ph_idx < len(ph_list): if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): split[ph_idx] = 1e8 ph_idx += 1 continue x = tg_align[tg_idx] if x['text'] == '' and ph_idx == len(ph_list): tg_idx += 1 continue assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn) ph = ph_list[ph_idx] if x['text'] == '' and not is_sil_phoneme(ph): assert False, (ph_list, tg_align) if x['text'] != '' and is_sil_phoneme(ph): ph_idx += 1 else: assert (x['text'] == '' and is_sil_phoneme(ph)) \ or x['text'].lower() == ph.lower() \ or x['text'].lower() == 'sil', (x['text'], ph) split[ph_idx] = x['xmin'] if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]): split[ph_idx - 1] = split[ph_idx] ph_idx += 1 tg_idx += 1 assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn) mel2ph = np.zeros([mel.shape[0]], np.int) split[0] = 0 split[-1] = 1e8 for i in range(len(split) - 1): assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],) split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split] for ph_idx in range(len(ph_list)): mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1 mel2ph_torch = torch.from_numpy(mel2ph) T_t = len(ph_list) dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch)) dur = dur[1:].numpy() return mel2ph, dur def build_phone_encoder(data_dir): phone_list_file = os.path.join(data_dir, 'phone_set.json') phone_list = json.load(open(phone_list_file, encoding='utf-8')) return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',') def is_sil_phoneme(p): return not p[0].isalpha()