Spaces:
Running
Running
""" | |
Modifications in Metrics | |
# Original copyright: | |
# Copyright (c) Facebook, Inc. and its affiliates. | |
# Demucs (https://github.com/facebookresearch/denoiser) / author: adefossez | |
""" | |
import numpy as np | |
from scipy.linalg import toeplitz | |
# ----------------------------- HELPERS ------------------------------------ # | |
def trim_mos(val): | |
return min(max(val, 1), 5) | |
def lpcoeff(speech_frame, model_order): | |
# (1) Compute Autocor lags | |
winlength = speech_frame.shape[0] | |
R = [] | |
for k in range(model_order + 1): | |
first = speech_frame[:(winlength - k)] | |
second = speech_frame[k:winlength] | |
R.append(np.sum(first * second)) | |
# (2) Lev-Durbin | |
a = np.ones((model_order,)) | |
E = np.zeros((model_order + 1,)) | |
rcoeff = np.zeros((model_order,)) | |
E[0] = R[0] | |
for i in range(model_order): | |
if i == 0: | |
sum_term = 0 | |
else: | |
a_past = a[:i] | |
sum_term = np.sum(a_past * np.array(R[i:0:-1])) | |
rcoeff[i] = (R[i+1] - sum_term)/E[i] | |
a[i] = rcoeff[i] | |
if i > 0: | |
a[:i] = a_past[:i] - rcoeff[i] * a_past[::-1] | |
E[i+1] = (1-rcoeff[i]*rcoeff[i])*E[i] | |
acorr = np.array(R, dtype=np.float32) | |
refcoeff = np.array(rcoeff, dtype=np.float32) | |
a = a * -1 | |
lpparams = np.array([1] + list(a), dtype=np.float32) | |
acorr = np.array(acorr, dtype=np.float32) | |
refcoeff = np.array(refcoeff, dtype=np.float32) | |
lpparams = np.array(lpparams, dtype=np.float32) | |
return acorr, refcoeff, lpparams | |
# -------------------------------------------------------------------------- # | |
def SSNR(ref_wav, deg_wav, srate=16000, eps=1e-10): | |
""" Segmental Signal-to-Noise Ratio Objective Speech Quality Measure | |
This function implements the segmental signal-to-noise ratio | |
as defined in [1, p. 45] (see Equation 2.12). | |
""" | |
clean_speech = ref_wav | |
processed_speech = deg_wav | |
clean_length = ref_wav.shape[0] | |
processed_length = deg_wav.shape[0] | |
# scale both to have same dynamic range. Remove DC too. | |
clean_speech -= clean_speech.mean() | |
processed_speech -= processed_speech.mean() | |
processed_speech *= (np.max(np.abs(clean_speech)) / np.max(np.abs(processed_speech))) | |
# Signal-to-Noise Ratio | |
dif = ref_wav - deg_wav | |
overall_snr = 10 * np.log10(np.sum(ref_wav ** 2) / (np.sum(dif ** 2) + | |
10e-20)) | |
# global variables | |
winlength = int(np.round(30 * srate / 1000)) # 30 msecs | |
skiprate = winlength // 4 | |
MIN_SNR = -10 | |
MAX_SNR = 35 | |
# For each frame, calculate SSNR | |
num_frames = int(clean_length / skiprate - (winlength/skiprate)) | |
start = 0 | |
time = np.linspace(1, winlength, winlength) / (winlength + 1) | |
window = 0.5 * (1 - np.cos(2 * np.pi * time)) | |
segmental_snr = [] | |
for frame_count in range(int(num_frames)): | |
# (1) get the frames for the test and ref speech. | |
# Apply Hanning Window | |
clean_frame = clean_speech[start:start+winlength] | |
processed_frame = processed_speech[start:start+winlength] | |
clean_frame = clean_frame * window | |
processed_frame = processed_frame * window | |
# (2) Compute Segmental SNR | |
signal_energy = np.sum(clean_frame ** 2) | |
noise_energy = np.sum((clean_frame - processed_frame) ** 2) | |
segmental_snr.append(10 * np.log10(signal_energy / (noise_energy + eps)+ eps)) | |
segmental_snr[-1] = max(segmental_snr[-1], MIN_SNR) | |
segmental_snr[-1] = min(segmental_snr[-1], MAX_SNR) | |
start += int(skiprate) | |
return overall_snr, segmental_snr | |
def wss(ref_wav, deg_wav, srate): | |
clean_speech = ref_wav | |
processed_speech = deg_wav | |
clean_length = ref_wav.shape[0] | |
processed_length = deg_wav.shape[0] | |
assert clean_length == processed_length, clean_length | |
winlength = round(30 * srate / 1000.) # 240 wlen in samples | |
skiprate = np.floor(winlength / 4) | |
max_freq = srate / 2 | |
num_crit = 25 # num of critical bands | |
USE_FFT_SPECTRUM = 1 | |
n_fft = int(2 ** np.ceil(np.log(2*winlength)/np.log(2))) | |
n_fftby2 = int(n_fft / 2) | |
Kmax = 20 | |
Klocmax = 1 | |
# Critical band filter definitions (Center frequency and BW in Hz) | |
cent_freq = [50., 120, 190, 260, 330, 400, 470, 540, 617.372, | |
703.378, 798.717, 904.128, 1020.38, 1148.30, | |
1288.72, 1442.54, 1610.70, 1794.16, 1993.93, | |
2211.08, 2446.71, 2701.97, 2978.04, 3276.17, | |
3597.63] | |
bandwidth = [70., 70, 70, 70, 70, 70, 70, 77.3724, 86.0056, | |
95.3398, 105.411, 116.256, 127.914, 140.423, | |
153.823, 168.154, 183.457, 199.776, 217.153, | |
235.631, 255.255, 276.072, 298.126, 321.465, | |
346.136] | |
bw_min = bandwidth[0] # min critical bandwidth | |
# set up critical band filters. Note here that Gaussianly shaped filters | |
# are used. Also, the sum of the filter weights are equivalent for each | |
# critical band filter. Filter less than -30 dB and set to zero. | |
min_factor = np.exp(-30. / (2 * 2.303)) # -30 dB point of filter | |
crit_filter = np.zeros((num_crit, n_fftby2)) | |
all_f0 = [] | |
for i in range(num_crit): | |
f0 = (cent_freq[i] / max_freq) * (n_fftby2) | |
all_f0.append(np.floor(f0)) | |
bw = (bandwidth[i] / max_freq) * (n_fftby2) | |
norm_factor = np.log(bw_min) - np.log(bandwidth[i]) | |
j = list(range(n_fftby2)) | |
crit_filter[i, :] = np.exp(-11 * (((j - np.floor(f0)) / bw) ** 2) + \ | |
norm_factor) | |
crit_filter[i, :] = crit_filter[i, :] * (crit_filter[i, :] > \ | |
min_factor) | |
# For each frame of input speech, compute Weighted Spectral Slope Measure | |
num_frames = int(clean_length / skiprate - (winlength / skiprate)) | |
start = 0 # starting sample | |
time = np.linspace(1, winlength, winlength) / (winlength + 1) | |
window = 0.5 * (1 - np.cos(2 * np.pi * time)) | |
distortion = [] | |
for frame_count in range(num_frames): | |
# (1) Get the Frames for the test and reference speeech. | |
# Multiply by Hanning window. | |
clean_frame = clean_speech[start:start+winlength] | |
processed_frame = processed_speech[start:start+winlength] | |
clean_frame = clean_frame * window | |
processed_frame = processed_frame * window | |
# (2) Compuet Power Spectrum of clean and processed | |
clean_spec = (np.abs(np.fft.fft(clean_frame, n_fft)) ** 2) | |
processed_spec = (np.abs(np.fft.fft(processed_frame, n_fft)) ** 2) | |
clean_energy = [None] * num_crit | |
processed_energy = [None] * num_crit | |
# (3) Compute Filterbank output energies (in dB) | |
for i in range(num_crit): | |
clean_energy[i] = np.sum(clean_spec[:n_fftby2] * \ | |
crit_filter[i, :]) | |
processed_energy[i] = np.sum(processed_spec[:n_fftby2] * \ | |
crit_filter[i, :]) | |
clean_energy = np.array(clean_energy).reshape(-1, 1) | |
eps = np.ones((clean_energy.shape[0], 1)) * 1e-10 | |
clean_energy = np.concatenate((clean_energy, eps), axis=1) | |
clean_energy = 10 * np.log10(np.max(clean_energy, axis=1)) | |
processed_energy = np.array(processed_energy).reshape(-1, 1) | |
processed_energy = np.concatenate((processed_energy, eps), axis=1) | |
processed_energy = 10 * np.log10(np.max(processed_energy, axis=1)) | |
# (4) Compute Spectral Shape (dB[i+1] - dB[i]) | |
clean_slope = clean_energy[1:num_crit] - clean_energy[:num_crit-1] | |
processed_slope = processed_energy[1:num_crit] - \ | |
processed_energy[:num_crit-1] | |
# (5) Find the nearest peak locations in the spectra to each | |
# critical band. If the slope is negative, we search | |
# to the left. If positive, we search to the right. | |
clean_loc_peak = [] | |
processed_loc_peak = [] | |
for i in range(num_crit - 1): | |
if clean_slope[i] > 0: | |
# search to the right | |
n = i | |
while n < num_crit - 1 and clean_slope[n] > 0: | |
n += 1 | |
clean_loc_peak.append(clean_energy[n - 1]) | |
else: | |
# search to the left | |
n = i | |
while n >= 0 and clean_slope[n] <= 0: | |
n -= 1 | |
clean_loc_peak.append(clean_energy[n + 1]) | |
# find the peaks in the processed speech signal | |
if processed_slope[i] > 0: | |
n = i | |
while n < num_crit - 1 and processed_slope[n] > 0: | |
n += 1 | |
processed_loc_peak.append(processed_energy[n - 1]) | |
else: | |
n = i | |
while n >= 0 and processed_slope[n] <= 0: | |
n -= 1 | |
processed_loc_peak.append(processed_energy[n + 1]) | |
# (6) Compuet the WSS Measure for this frame. This includes | |
# determination of the weighting functino | |
dBMax_clean = max(clean_energy) | |
dBMax_processed = max(processed_energy) | |
# The weights are calculated by averaging individual | |
# weighting factors from the clean and processed frame. | |
# These weights W_clean and W_processed should range | |
# from 0 to 1 and place more emphasis on spectral | |
# peaks and less emphasis on slope differences in spectral | |
# valleys. This procedure is described on page 1280 of | |
# Klatt's 1982 ICASSP paper. | |
clean_loc_peak = np.array(clean_loc_peak) | |
processed_loc_peak = np.array(processed_loc_peak) | |
Wmax_clean = Kmax / (Kmax + dBMax_clean - clean_energy[:num_crit-1]) | |
Wlocmax_clean = Klocmax / (Klocmax + clean_loc_peak - \ | |
clean_energy[:num_crit-1]) | |
W_clean = Wmax_clean * Wlocmax_clean | |
Wmax_processed = Kmax / (Kmax + dBMax_processed - \ | |
processed_energy[:num_crit-1]) | |
Wlocmax_processed = Klocmax / (Klocmax + processed_loc_peak - \ | |
processed_energy[:num_crit-1]) | |
W_processed = Wmax_processed * Wlocmax_processed | |
W = (W_clean + W_processed) / 2 | |
distortion.append(np.sum(W * (clean_slope[:num_crit - 1] - \ | |
processed_slope[:num_crit - 1]) ** 2)) | |
# this normalization is not part of Klatt's paper, but helps | |
# to normalize the meaasure. Here we scale the measure by the sum of the | |
# weights | |
distortion[frame_count] = distortion[frame_count] / np.sum(W) | |
start += int(skiprate) | |
return distortion | |
def llr(ref_wav, deg_wav, srate): | |
clean_speech = ref_wav | |
processed_speech = deg_wav | |
clean_length = ref_wav.shape[0] | |
processed_length = deg_wav.shape[0] | |
assert clean_length == processed_length, clean_length | |
winlength = round(30 * srate / 1000.) # 240 wlen in samples | |
skiprate = np.floor(winlength / 4) | |
if srate < 10000: | |
# LPC analysis order | |
P = 10 | |
else: | |
P = 16 | |
# For each frame of input speech, calculate the Log Likelihood Ratio | |
num_frames = int(clean_length / skiprate - (winlength / skiprate)) | |
start = 0 | |
time = np.linspace(1, winlength, winlength) / (winlength + 1) | |
window = 0.5 * (1 - np.cos(2 * np.pi * time)) | |
distortion = [] | |
for frame_count in range(num_frames): | |
# (1) Get the Frames for the test and reference speeech. | |
# Multiply by Hanning window. | |
clean_frame = clean_speech[start:start+winlength] | |
processed_frame = processed_speech[start:start+winlength] | |
clean_frame = clean_frame * window | |
processed_frame = processed_frame * window | |
# (2) Get the autocorrelation logs and LPC params used | |
# to compute the LLR measure | |
R_clean, Ref_clean, A_clean = lpcoeff(clean_frame, P) | |
R_processed, Ref_processed, A_processed = lpcoeff(processed_frame, P) | |
A_clean = A_clean[None, :] | |
A_processed = A_processed[None, :] | |
# (3) Compute the LLR measure | |
numerator = A_processed.dot(toeplitz(R_clean)).dot(A_processed.T) | |
denominator = A_clean.dot(toeplitz(R_clean)).dot(A_clean.T) | |
if (numerator/denominator) <= 0: | |
print(f'Numerator: {numerator}') | |
print(f'Denominator: {denominator}') | |
log_ = np.log(numerator / denominator) | |
distortion.append(np.squeeze(log_)) | |
start += int(skiprate) | |
return np.nan_to_num(np.array(distortion)) | |
# -------------------------------------------------------------------------- # | |