Spaces:
Runtime error
Runtime error
File size: 5,450 Bytes
6bc94ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
from scipy.ndimage.morphology import binary_dilation
from encoder.params_data import *
from pathlib import Path
from typing import Optional, Union
from warnings import warn
import numpy as np
import librosa
import struct
import os
from pydub import AudioSegment
import noisereduce
try:
import webrtcvad
except:
warn("Unable to import 'webrtcvad'. This package enables noise removal and is recommended.")
webrtcvad=None
int16_max = (2 ** 15) - 1
def preprocess_wav(fpath_or_wav: Union[str, Path, np.ndarray],
source_sr: Optional[int] = None,
normalize: Optional[bool] = True,
trim_silence: Optional[bool] = True):
"""
Applies the preprocessing operations used in training the Speaker Encoder to a waveform
either on disk or in memory. The waveform will be resampled to match the data hyperparameters.
:param fpath_or_wav: either a filepath to an audio file (many extensions are supported, not
just .wav), either the waveform as a numpy array of floats.
:param source_sr: if passing an audio waveform, the sampling rate of the waveform before
preprocessing. After preprocessing, the waveform's sampling rate will match the data
hyperparameters. If passing a filepath, the sampling rate will be automatically detected and
this argument will be ignored.
"""
# Load the wav from disk if needed
if isinstance(fpath_or_wav, str) or isinstance(fpath_or_wav, Path):
# if str(fpath_or_wav).endswith(".m4a"):
# try:
# track = AudioSegment.from_file(fpath_or_wav, format="m4a")
# except:
# return []
# fpath = os.path.splitext(str(fpath_or_wav))[0]
# path_components = os.path.normpath(fpath).split(os.sep)
# wav_dir = Path("D:\\liuhaozhe").joinpath(f"VoxCeleb2_wav") # local path
# wav_dir.mkdir(exist_ok=True)
# wav_name = "_".join(path_components[-6: ])
# wav_path = wav_dir.joinpath(f"{wav_name}.wav")
# track.export(wav_path, format="wav")
# wav, source_sr = librosa.load(str(wav_path), sr=None)
# else:
wav, source_sr = librosa.load(str(fpath_or_wav), sr=None)
else:
wav = fpath_or_wav
# Resample the wav if needed
if source_sr is not None and source_sr != sampling_rate:
wav = librosa.resample(wav, source_sr, sampling_rate)
# Apply the preprocessing: normalize volume and shorten long silences
if normalize:
wav = normalize_volume(wav, audio_norm_target_dBFS, increase_only=True)
if webrtcvad and trim_silence:
wav = trim_long_silences(wav)
return wav
def wav_to_mel_spectrogram(wav):
"""
Derives a mel spectrogram ready to be used by the encoder from a preprocessed audio waveform.
Note: this not a log-mel spectrogram.
"""
frames = librosa.feature.melspectrogram(
wav,
sampling_rate,
n_fft=int(sampling_rate * mel_window_length / 1000),
hop_length=int(sampling_rate * mel_window_step / 1000),
n_mels=mel_n_channels
)
return frames.astype(np.float32).T
def trim_long_silences(wav):
"""
Ensures that segments without voice in the waveform remain no longer than a
threshold determined by the VAD parameters in params.py.
:param wav: the raw waveform as a numpy array of floats
:return: the same waveform with silences trimmed away (length <= original wav length)
"""
# Compute the voice detection window size
samples_per_window = (vad_window_length * sampling_rate) // 1000
# Trim the end of the audio to have a multiple of the window size
wav = wav[:len(wav) - (len(wav) % samples_per_window)]
# Convert the float waveform to 16-bit mono PCM
pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16))
# Perform voice activation detection
voice_flags = []
vad = webrtcvad.Vad(mode=3)
for window_start in range(0, len(wav), samples_per_window):
window_end = window_start + samples_per_window
voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2],
sample_rate=sampling_rate))
voice_flags = np.array(voice_flags)
# Smooth the voice detection with a moving average
def moving_average(array, width):
array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2)))
ret = np.cumsum(array_padded, dtype=float)
ret[width:] = ret[width:] - ret[:-width]
return ret[width - 1:] / width
audio_mask = moving_average(voice_flags, vad_moving_average_width)
audio_mask = np.round(audio_mask).astype(np.bool)
# Dilate the voiced regions
audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1))
audio_mask = np.repeat(audio_mask, samples_per_window)
return wav[audio_mask == True]
def normalize_volume(wav, target_dBFS, increase_only=False, decrease_only=False):
if increase_only and decrease_only:
raise ValueError("Both increase only and decrease only are set")
dBFS_change = target_dBFS - 10 * np.log10(np.mean(wav ** 2))
if (dBFS_change < 0 and increase_only) or (dBFS_change > 0 and decrease_only):
return wav
return wav * (10 ** (dBFS_change / 20))
|