SpeechScore / scores /helper_bk.py
alibabasglab's picture
Upload 73 files
936f6fa verified
raw
history blame
16.1 kB
"""
Modifications in Metrics
# Original copyright:
# Copyright (c) Facebook, Inc. and its affiliates.
# Demucs (https://github.com/facebookresearch/denoiser) / author: adefossez
"""
import numpy as np
from scipy.linalg import toeplitz
# ----------------------------- HELPERS ------------------------------------ #
def trim_mos(val):
return min(max(val, 1), 5)
def lpcoeff(speech_frame, model_order):
# (1) Compute Autocor lags
winlength = speech_frame.shape[0]
R = []
for k in range(model_order + 1):
first = speech_frame[:(winlength - k)]
second = speech_frame[k:winlength]
R.append(np.sum(first * second))
# (2) Lev-Durbin
a = np.ones((model_order,))
E = np.zeros((model_order + 1,))
rcoeff = np.zeros((model_order,))
E[0] = R[0]
for i in range(model_order):
if i == 0:
sum_term = 0
else:
a_past = a[:i]
sum_term = np.sum(a_past * np.array(R[i:0:-1]))
rcoeff[i] = (R[i+1] - sum_term)/E[i]
a[i] = rcoeff[i]
if i > 0:
a[:i] = a_past[:i] - rcoeff[i] * a_past[::-1]
E[i+1] = (1-rcoeff[i]*rcoeff[i])*E[i]
acorr = np.array(R, dtype=np.float32)
refcoeff = np.array(rcoeff, dtype=np.float32)
a = a * -1
lpparams = np.array([1] + list(a), dtype=np.float32)
acorr = np.array(acorr, dtype=np.float32)
refcoeff = np.array(refcoeff, dtype=np.float32)
lpparams = np.array(lpparams, dtype=np.float32)
return acorr, refcoeff, lpparams
# -------------------------------------------------------------------------- #
def SSNR(ref_wav, deg_wav, srate=16000, eps=1e-10):
""" Segmental Signal-to-Noise Ratio Objective Speech Quality Measure
This function implements the segmental signal-to-noise ratio
as defined in [1, p. 45] (see Equation 2.12).
"""
clean_speech = ref_wav
processed_speech = deg_wav
clean_length = ref_wav.shape[0]
processed_length = deg_wav.shape[0]
# scale both to have same dynamic range. Remove DC too.
clean_speech -= clean_speech.mean()
processed_speech -= processed_speech.mean()
processed_speech *= (np.max(np.abs(clean_speech)) / np.max(np.abs(processed_speech)))
# Signal-to-Noise Ratio
dif = ref_wav - deg_wav
overall_snr = 10 * np.log10(np.sum(ref_wav ** 2) / (np.sum(dif ** 2) +
10e-20))
# global variables
winlength = int(np.round(30 * srate / 1000)) # 30 msecs
skiprate = winlength // 4
MIN_SNR = -10
MAX_SNR = 35
# For each frame, calculate SSNR
num_frames = int(clean_length / skiprate - (winlength/skiprate))
start = 0
time = np.linspace(1, winlength, winlength) / (winlength + 1)
window = 0.5 * (1 - np.cos(2 * np.pi * time))
segmental_snr = []
for frame_count in range(int(num_frames)):
# (1) get the frames for the test and ref speech.
# Apply Hanning Window
clean_frame = clean_speech[start:start+winlength]
processed_frame = processed_speech[start:start+winlength]
clean_frame = clean_frame * window
processed_frame = processed_frame * window
# (2) Compute Segmental SNR
signal_energy = np.sum(clean_frame ** 2)
noise_energy = np.sum((clean_frame - processed_frame) ** 2)
segmental_snr.append(10 * np.log10(signal_energy / (noise_energy + eps)+ eps))
segmental_snr[-1] = max(segmental_snr[-1], MIN_SNR)
segmental_snr[-1] = min(segmental_snr[-1], MAX_SNR)
start += int(skiprate)
return overall_snr, segmental_snr
def wss(ref_wav, deg_wav, srate):
clean_speech = ref_wav
processed_speech = deg_wav
clean_length = ref_wav.shape[0]
processed_length = deg_wav.shape[0]
assert clean_length == processed_length, clean_length
winlength = round(30 * srate / 1000.) # 240 wlen in samples
skiprate = np.floor(winlength / 4)
max_freq = srate / 2
num_crit = 25 # num of critical bands
USE_FFT_SPECTRUM = 1
n_fft = int(2 ** np.ceil(np.log(2*winlength)/np.log(2)))
n_fftby2 = int(n_fft / 2)
Kmax = 20
Klocmax = 1
# Critical band filter definitions (Center frequency and BW in Hz)
cent_freq = [50., 120, 190, 260, 330, 400, 470, 540, 617.372,
703.378, 798.717, 904.128, 1020.38, 1148.30,
1288.72, 1442.54, 1610.70, 1794.16, 1993.93,
2211.08, 2446.71, 2701.97, 2978.04, 3276.17,
3597.63]
bandwidth = [70., 70, 70, 70, 70, 70, 70, 77.3724, 86.0056,
95.3398, 105.411, 116.256, 127.914, 140.423,
153.823, 168.154, 183.457, 199.776, 217.153,
235.631, 255.255, 276.072, 298.126, 321.465,
346.136]
bw_min = bandwidth[0] # min critical bandwidth
# set up critical band filters. Note here that Gaussianly shaped filters
# are used. Also, the sum of the filter weights are equivalent for each
# critical band filter. Filter less than -30 dB and set to zero.
min_factor = np.exp(-30. / (2 * 2.303)) # -30 dB point of filter
crit_filter = np.zeros((num_crit, n_fftby2))
all_f0 = []
for i in range(num_crit):
f0 = (cent_freq[i] / max_freq) * (n_fftby2)
all_f0.append(np.floor(f0))
bw = (bandwidth[i] / max_freq) * (n_fftby2)
norm_factor = np.log(bw_min) - np.log(bandwidth[i])
j = list(range(n_fftby2))
crit_filter[i, :] = np.exp(-11 * (((j - np.floor(f0)) / bw) ** 2) + \
norm_factor)
crit_filter[i, :] = crit_filter[i, :] * (crit_filter[i, :] > \
min_factor)
# For each frame of input speech, compute Weighted Spectral Slope Measure
num_frames = int(clean_length / skiprate - (winlength / skiprate))
start = 0 # starting sample
time = np.linspace(1, winlength, winlength) / (winlength + 1)
window = 0.5 * (1 - np.cos(2 * np.pi * time))
distortion = []
for frame_count in range(num_frames):
# (1) Get the Frames for the test and reference speeech.
# Multiply by Hanning window.
clean_frame = clean_speech[start:start+winlength]
processed_frame = processed_speech[start:start+winlength]
clean_frame = clean_frame * window
processed_frame = processed_frame * window
# (2) Compuet Power Spectrum of clean and processed
clean_spec = (np.abs(np.fft.fft(clean_frame, n_fft)) ** 2)
processed_spec = (np.abs(np.fft.fft(processed_frame, n_fft)) ** 2)
clean_energy = [None] * num_crit
processed_energy = [None] * num_crit
# (3) Compute Filterbank output energies (in dB)
for i in range(num_crit):
clean_energy[i] = np.sum(clean_spec[:n_fftby2] * \
crit_filter[i, :])
processed_energy[i] = np.sum(processed_spec[:n_fftby2] * \
crit_filter[i, :])
clean_energy = np.array(clean_energy).reshape(-1, 1)
eps = np.ones((clean_energy.shape[0], 1)) * 1e-10
clean_energy = np.concatenate((clean_energy, eps), axis=1)
clean_energy = 10 * np.log10(np.max(clean_energy, axis=1))
processed_energy = np.array(processed_energy).reshape(-1, 1)
processed_energy = np.concatenate((processed_energy, eps), axis=1)
processed_energy = 10 * np.log10(np.max(processed_energy, axis=1))
# (4) Compute Spectral Shape (dB[i+1] - dB[i])
clean_slope = clean_energy[1:num_crit] - clean_energy[:num_crit-1]
processed_slope = processed_energy[1:num_crit] - \
processed_energy[:num_crit-1]
# (5) Find the nearest peak locations in the spectra to each
# critical band. If the slope is negative, we search
# to the left. If positive, we search to the right.
clean_loc_peak = []
processed_loc_peak = []
for i in range(num_crit - 1):
if clean_slope[i] > 0:
# search to the right
n = i
while n < num_crit - 1 and clean_slope[n] > 0:
n += 1
clean_loc_peak.append(clean_energy[n - 1])
else:
# search to the left
n = i
while n >= 0 and clean_slope[n] <= 0:
n -= 1
clean_loc_peak.append(clean_energy[n + 1])
# find the peaks in the processed speech signal
if processed_slope[i] > 0:
n = i
while n < num_crit - 1 and processed_slope[n] > 0:
n += 1
processed_loc_peak.append(processed_energy[n - 1])
else:
n = i
while n >= 0 and processed_slope[n] <= 0:
n -= 1
processed_loc_peak.append(processed_energy[n + 1])
# (6) Compuet the WSS Measure for this frame. This includes
# determination of the weighting functino
dBMax_clean = max(clean_energy)
dBMax_processed = max(processed_energy)
# The weights are calculated by averaging individual
# weighting factors from the clean and processed frame.
# These weights W_clean and W_processed should range
# from 0 to 1 and place more emphasis on spectral
# peaks and less emphasis on slope differences in spectral
# valleys. This procedure is described on page 1280 of
# Klatt's 1982 ICASSP paper.
clean_loc_peak = np.array(clean_loc_peak)
processed_loc_peak = np.array(processed_loc_peak)
Wmax_clean = Kmax / (Kmax + dBMax_clean - clean_energy[:num_crit-1])
Wlocmax_clean = Klocmax / (Klocmax + clean_loc_peak - \
clean_energy[:num_crit-1])
W_clean = Wmax_clean * Wlocmax_clean
Wmax_processed = Kmax / (Kmax + dBMax_processed - \
processed_energy[:num_crit-1])
Wlocmax_processed = Klocmax / (Klocmax + processed_loc_peak - \
processed_energy[:num_crit-1])
W_processed = Wmax_processed * Wlocmax_processed
W = (W_clean + W_processed) / 2
distortion.append(np.sum(W * (clean_slope[:num_crit - 1] - \
processed_slope[:num_crit - 1]) ** 2))
# this normalization is not part of Klatt's paper, but helps
# to normalize the meaasure. Here we scale the measure by the sum of the
# weights
distortion[frame_count] = distortion[frame_count] / np.sum(W)
start += int(skiprate)
return distortion
def llr(ref_wav, deg_wav, srate):
clean_speech = ref_wav
processed_speech = deg_wav
clean_length = ref_wav.shape[0]
processed_length = deg_wav.shape[0]
assert clean_length == processed_length, clean_length
winlength = round(30 * srate / 1000.) # 240 wlen in samples
skiprate = np.floor(winlength / 4)
if srate < 10000:
# LPC analysis order
P = 10
else:
P = 16
# For each frame of input speech, calculate the Log Likelihood Ratio
num_frames = int(clean_length / skiprate - (winlength / skiprate))
start = 0
time = np.linspace(1, winlength, winlength) / (winlength + 1)
window = 0.5 * (1 - np.cos(2 * np.pi * time))
distortion = []
for frame_count in range(num_frames):
# (1) Get the Frames for the test and reference speeech.
# Multiply by Hanning window.
clean_frame = clean_speech[start:start+winlength]
processed_frame = processed_speech[start:start+winlength]
clean_frame = clean_frame * window
processed_frame = processed_frame * window
# (2) Get the autocorrelation logs and LPC params used
# to compute the LLR measure
R_clean, Ref_clean, A_clean = lpcoeff(clean_frame, P)
R_processed, Ref_processed, A_processed = lpcoeff(processed_frame, P)
A_clean = A_clean[None, :]
A_processed = A_processed[None, :]
# (3) Compute the LLR measure
numerator = A_processed.dot(toeplitz(R_clean)).dot(A_processed.T)
denominator = A_clean.dot(toeplitz(R_clean)).dot(A_clean.T)
if (numerator/denominator) <= 0:
print(f'Numerator: {numerator}')
print(f'Denominator: {denominator}')
log_ = np.log(numerator / denominator)
distortion.append(np.squeeze(log_))
start += int(skiprate)
return np.nan_to_num(np.array(distortion))
# -------------------------------------------------------------------------- #
#!/usr/bin/env python3
# Copyright 2020 Wen-Chin Huang and Tomoki Hayashi
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
# ported from https://github.com/espnet/espnet/blob/master/utils/mcd_calculate.py
"""Evaluate MCD between generated and groundtruth audios with SPTK-based mcep."""
from typing import Tuple
import numpy as np
import pysptk
from fastdtw import fastdtw
from scipy import spatial
def sptk_extract(
x: np.ndarray,
fs: int,
n_fft: int = 512,
n_shift: int = 256,
mcep_dim: int = 25,
mcep_alpha: float = 0.41,
is_padding: bool = False,
) -> np.ndarray:
"""Extract SPTK-based mel-cepstrum.
Args:
x (ndarray): 1D waveform array.
fs (int): Sampling rate
n_fft (int): FFT length in point (default=512).
n_shift (int): Shift length in point (default=256).
mcep_dim (int): Dimension of mel-cepstrum (default=25).
mcep_alpha (float): All pass filter coefficient (default=0.41).
is_padding (bool): Whether to pad the end of signal (default=False).
Returns:
ndarray: Mel-cepstrum with the size (N, n_fft).
"""
# perform padding
if is_padding:
n_pad = n_fft - (len(x) - n_fft) % n_shift
x = np.pad(x, (0, n_pad), "reflect")
# get number of frames
n_frame = (len(x) - n_fft) // n_shift + 1
# get window function
win = pysptk.sptk.hamming(n_fft)
# check mcep and alpha
if mcep_dim is None or mcep_alpha is None:
mcep_dim, mcep_alpha = _get_best_mcep_params(fs)
# calculate spectrogram
mcep = [
pysptk.mcep(
x[n_shift * i : n_shift * i + n_fft] * win,
mcep_dim,
mcep_alpha,
eps=1e-6,
etype=1,
)
for i in range(n_frame)
]
return np.stack(mcep)
def _get_best_mcep_params(fs: int) -> Tuple[int, float]:
# https://sp-nitech.github.io/sptk/latest/main/mgcep.html#_CPPv4N4sptk19MelCepstralAnalysisE
if fs == 8000:
return 13, 0.31
elif fs == 16000:
return 23, 0.42
elif fs == 22050:
return 34, 0.45
elif fs == 24000:
return 34, 0.46
elif fs == 32000:
return 36, 0.50
elif fs == 44100:
return 39, 0.53
elif fs == 48000:
return 39, 0.55
else:
raise ValueError(f"Not found the setting for {fs}.")
def calculate_mcd(
inf_audio,
ref_audio,
fs,
n_fft=1024,
n_shift=256,
mcep_dim=None,
mcep_alpha=None,
):
"""Calculate MCD."""
# extract ground truth and converted features
gen_mcep = sptk_extract(
x=inf_audio,
fs=fs,
n_fft=n_fft,
n_shift=n_shift,
mcep_dim=mcep_dim,
mcep_alpha=mcep_alpha,
)
gt_mcep = sptk_extract(
x=ref_audio,
fs=fs,
n_fft=n_fft,
n_shift=n_shift,
mcep_dim=mcep_dim,
mcep_alpha=mcep_alpha,
)
# DTW
_, path = fastdtw(gen_mcep, gt_mcep, dist=spatial.distance.euclidean)
twf = np.array(path).T
gen_mcep_dtw = gen_mcep[twf[0]]
gt_mcep_dtw = gt_mcep[twf[1]]
# MCD
diff2sum = np.sum((gen_mcep_dtw - gt_mcep_dtw) ** 2, 1)
mcd = np.mean(10.0 / np.log(10.0) * np.sqrt(2 * diff2sum), 0)
return mcd