import os | |
import subprocess | |
import tempfile | |
import logging | |
import wave | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
FFMPEG_CMD = "ffmpeg" | |
async def audio_normalize(full_filename, file_data): | |
try: | |
file_name = os.path.splitext(full_filename)[0] | |
file_extension = os.path.splitext(full_filename)[1].lower() | |
logger.debug(f"normalizing {full_filename}") | |
with tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) as temp_file: | |
temp_file.write(file_data) | |
temp_file.flush() | |
temp_out_filename = f"norm_{file_name}.wav" | |
subprocess.call( | |
[FFMPEG_CMD, | |
'-i', temp_file.name, | |
'-ac', '1', | |
'-ar', '44100', | |
'-acodec', 'pcm_s16le', | |
'-f', 'wav', | |
temp_out_filename]) | |
if os.path.exists(temp_out_filename): | |
return temp_out_filename | |
else: | |
raise FileNotFoundError("Unable to make the file") | |
except Exception as e: | |
logger.exception(e) | |
return None | |
def read_wav_file_to_numpy_array(filename): | |
with wave.open(filename, 'rb') as wav_file: | |
sampling_rate = wav_file.getframerate() | |
bytes_data = wav_file.readframes(wav_file.getnframes()) | |
sample_width = wav_file.getsampwidth() | |
if sample_width == 1: | |
data_type = np.uint8 | |
elif sample_width == 2: | |
data_type = np.int16 | |
elif sample_width == 4: | |
data_type = np.int32 | |
else: | |
raise ValueError(f"Unsupported sample width: {sample_width}") | |
numpy_data = np.frombuffer(bytes_data, dtype=data_type) | |
return sampling_rate, numpy_data | |