PronunciationChecker / src /audio_preprocessing.py
karlhajal's picture
Update src/audio_preprocessing.py
413e8eb verified
# SPDX-FileContributor: Karl El Hajal
import numpy as np
import webrtcvad
from pydub import AudioSegment
import subprocess
import soundfile as sf
import os
from pathlib import Path
VAD_SR = 16000
VAD_MODE = 3 # Aggressiveness level (0-3, where 3 is the most aggressive)
VAD_FRAME_DURATION = 10 # Frame duration in milliseconds
def get_speech_segments_webrtcvad(audio_array, sample_rate, frame_duration, vad_mode):
vad = webrtcvad.Vad(vad_mode)
# Convert the frame duration to samples
frame_duration_samples = int(sample_rate * frame_duration / 1000)
# Detect speech regions using VAD
speech_segments = []
start = -1
for i in range(0, len(audio_array), frame_duration_samples):
frame = audio_array[i : i + frame_duration_samples]
if len(frame) < 160:
is_speech = False
else:
frame = frame.tobytes()
is_speech = vad.is_speech(frame, sample_rate)
if is_speech and start == -1:
start = i
elif not is_speech and start != -1:
end = i
speech_segments.append((start, end))
start = -1
return speech_segments
def get_start_end_using_vad(audio, sample_rate):
audio_array = np.array(audio.get_array_of_samples())
speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE)
if len(speech_segments) == 0:
speech_segments = get_speech_segments_webrtcvad(audio_array, sample_rate, VAD_FRAME_DURATION, VAD_MODE - 1)
start_sample = speech_segments[0][0]
end_sample = speech_segments[-1][1]
start_time = float(start_sample / VAD_SR)
end_time = float(end_sample / VAD_SR)
return start_time, end_time
def trim_silences(audio, target_sr):
audio_copy = audio[:]
audio_copy = audio_copy.set_frame_rate(VAD_SR)
start_time, end_time = get_start_end_using_vad(audio_copy, VAD_SR)
start_sample_orig_sr = int(start_time * target_sr)
end_sample_orig_sr = int(end_time * target_sr)
filtered_audio_array = np.array(audio.get_array_of_samples())
filtered_audio_array = filtered_audio_array[start_sample_orig_sr:end_sample_orig_sr]
filtered_audio = AudioSegment(
filtered_audio_array.tobytes(),
frame_rate=target_sr,
sample_width=audio.sample_width,
channels=audio.channels,
)
return filtered_audio
def match_target_amplitude(audio, target_dBFS):
change_in_dBFS = target_dBFS - audio.dBFS
return audio.apply_gain(change_in_dBFS)
def process_wav(wav_path, target_sr, do_trim_silences=True):
audio = AudioSegment.from_file(wav_path)
# Convert audio to mono
if audio.channels > 1:
audio = audio.set_channels(1)
# Resample audio
audio = audio.set_frame_rate(target_sr)
# Convert the audio to 16-bit PCM format
audio = audio.set_sample_width(2)
# Remove silences
if do_trim_silences:
audio = trim_silences(audio, target_sr)
# Loudness normalization to -20dB
audio = match_target_amplitude(audio, -20.0)
return audio
def get_red_green_segments(dist_matrix, path, wav_type='ref', threshold=0.4):
if wav_type == "ref":
num_wav_frames = len(dist_matrix)
else:
num_wav_frames = len(dist_matrix[0])
wav_distances = [0] * num_wav_frames
for (i, j) in zip(*path):
if i == num_wav_frames - 2 and wav_distances[i] > 0: # Special case for second to last frame
continue
wav_distances[i] = dist_matrix[i, j]
red_segments = [i for i, d in enumerate(wav_distances) if d >= threshold]
green_segments = [i for i, d in enumerate(wav_distances) if d < threshold]
return red_segments, green_segments, wav_distances
def assess_pronunciation_quality(dist_matrix, path, threshold=0.4, wav_type="ref"):
# _ is green_segments
red_segments, _, wav_distances = get_red_green_segments(dist_matrix, path, wav_type=wav_type, threshold=threshold)
# Analyze normalized distances
num_red_segments = len(red_segments)
total_segments = len(wav_distances)
red_percentage = num_red_segments / total_segments if total_segments > 0 else 0.0
# Calculate quality score and repetition need
quality_score = 1 - red_percentage
needs_repeat = red_percentage > 0.5
# Print debug information
print(f"Raw distance stats:")
print(f" Min distance: {min(wav_distances):.4f}")
print(f" Max distance: {max(wav_distances):.4f}")
print(f" Mean distance: {np.mean(wav_distances):.4f}")
print(f"\nNormalized distance stats:")
print(f" Number of red segments (>= 0.5): {num_red_segments}")
print(f" Total segments: {total_segments}")
print(f"\nRed percentage: {red_percentage * 100:.2f}%")
return quality_score, needs_repeat
def denoise_audio(input_audio_path):
assert isinstance(input_audio_path, (str, Path)), "Input path must be a string or a Path object"
input_audio_path = str(input_audio_path)
output_audio_path = input_audio_path.replace(".wav", "_denoised.wav")
try:
# Load audio and convert to required format
audio = AudioSegment.from_wav(input_audio_path)
audio = audio.set_frame_rate(48000) # Set to 48 kHz
audio = audio.set_channels(1) # Convert to mono
audio = audio.set_sample_width(2) # Set to 16-bit
# Export as WAV with correct format
temp_wav = "temp_audio.wav"
audio.export(temp_wav, format="wav")
# Run denoising
result = subprocess.run(
["denoise", temp_wav, output_audio_path, "--plot"],
check=True,
capture_output=True,
text=True
)
print(result.stdout)
# Clean up
os.remove(temp_wav)
except subprocess.CalledProcessError as e:
print(f"Error: {e}")
print(f"Stdout: {e.stdout}")
print(f"Stderr: {e.stderr}")
return input_audio_path
except Exception as e:
print(f"Unexpected error: {e}")
return input_audio_path
return output_audio_path