import os import subprocess import tempfile import logging import wave import numpy as np logger = logging.getLogger(__name__) FFMPEG_CMD = "ffmpeg" async def audio_normalize(full_filename, file_data): try: file_name = os.path.splitext(full_filename)[0] file_extension = os.path.splitext(full_filename)[1].lower() logger.debug(f"normalizing {full_filename}") with tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) as temp_file: temp_file.write(file_data) temp_file.flush() temp_out_filename = f"norm_{file_name}.wav" subprocess.call( [FFMPEG_CMD, '-i', temp_file.name, '-ac', '1', '-ar', '44100', '-acodec', 'pcm_s16le', '-f', 'wav', temp_out_filename]) if os.path.exists(temp_out_filename): return temp_out_filename else: raise FileNotFoundError("Unable to make the file") except Exception as e: logger.exception(e) return None def read_wav_file_to_numpy_array(filename): with wave.open(filename, 'rb') as wav_file: sampling_rate = wav_file.getframerate() bytes_data = wav_file.readframes(wav_file.getnframes()) sample_width = wav_file.getsampwidth() if sample_width == 1: data_type = np.uint8 elif sample_width == 2: data_type = np.int16 elif sample_width == 4: data_type = np.int32 else: raise ValueError(f"Unsupported sample width: {sample_width}") numpy_data = np.frombuffer(bytes_data, dtype=data_type) return sampling_rate, numpy_data