import logging import math import time from pathlib import Path from typing import Tuple import cv2 import numpy as np import pandas as pd import torch import torchaudio import torchaudio.transforms as T import yaml from PIL import Image from tqdm import tqdm from ultralytics import YOLO def yaml_read(path: Path) -> dict: """Returns yaml content as a python dict.""" with open(path, "r") as f: return yaml.safe_load(f) def clip( waveform: torch.Tensor, offset: float, duration: float, sample_rate: int, ) -> torch.Tensor: """ Returns a clipped waveform of `duration` seconds at `offset` in seconds. """ offset_frames_start = int(offset * sample_rate) offset_frames_end = offset_frames_start + int(duration * sample_rate) return waveform[:, offset_frames_start:offset_frames_end] def chunk( waveform: torch.Tensor, sample_rate: int, duration: float, overlap: float, ) -> list[torch.Tensor]: """ Returns a list of waveforms as torch.Tensor. Each of these waveforms have the specified duration and the specified overlap in seconds. """ total_seconds = waveform.shape[1] / sample_rate number_spectrograms = total_seconds / (duration - overlap) offsets = [ idx * (duration - overlap) for idx in range(0, math.floor(number_spectrograms)) ] return [ clip( waveform=waveform, offset=offset, duration=duration, sample_rate=sample_rate, ) for offset in offsets ] def load_audio(audio_filepath: Path) -> Tuple[torch.Tensor, int]: """ Loads an audio_filepath and returns the waveform and sample_rate of the file. """ start_time = time.time() waveform, sample_rate = torchaudio.load(audio_filepath) end_time = time.time() elapsed_time = end_time - start_time logging.info( f"Elapsed time to load audio file {audio_filepath.name}: {elapsed_time:.2f}s" ) return waveform, sample_rate def waveform_to_spectrogram( waveform: torch.Tensor, sample_rate: int, n_fft: int, hop_length: int, freq_max: float, ) -> torch.Tensor: """ Returns a spectrogram as a torch.Tensor given the provided arguments. See torchaudio.transforms.Spectrogram for more details about the parameters. Args: waveform (torch.Tensor): audio waveform of dimension of `(..., time)` sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz) n_fft (int): Size of FFT hop_length (int): Length of hop between STFT windows. freq_max (float): cutoff frequency (Hz) """ filtered_waveform = torchaudio.functional.lowpass_biquad( waveform=waveform, sample_rate=sample_rate, cutoff_freq=freq_max ) transform = T.Spectrogram(n_fft=n_fft, hop_length=hop_length, power=2) spectrogram = transform(filtered_waveform) spectrogram_db = torchaudio.transforms.AmplitudeToDB()(spectrogram) frequencies = torch.linspace(0, sample_rate // 2, spectrogram_db.size(1)) max_freq_bin = torch.searchsorted(frequencies, freq_max).item() filtered_spectrogram_db = spectrogram_db[:, :max_freq_bin, :] return filtered_spectrogram_db def normalize(x: np.ndarray, max_value: int = 255) -> np.ndarray: """ Returns the normalized array, value in [0 - max_value] Useful for image conversion. """ _min, _max = x.min(), x.max() x_normalized = max_value * (x - _min) / (_max - _min) return x_normalized.astype(np.uint8) def spectrogram_tensor_to_np_image( spectrogram: torch.Tensor, width: int, height: int ) -> np.ndarray: """ Returns a numpy array of shape (height, width) that represents the spectrogram tensor as an image. """ spectrogram_db_np = spectrogram[0].numpy() # Normalize to [0, 255] for image conversion spectrogram_db_normalized = normalize(spectrogram_db_np, max_value=255) resized_spectrogram_array = cv2.resize( spectrogram_db_normalized, (width, height), interpolation=cv2.INTER_LINEAR ) # Horizontal flip to make it show the low frequency range at the bottom left of the image instead of the top left flipped_resized_spectrogram_array = np.flipud(resized_spectrogram_array) return flipped_resized_spectrogram_array def waveform_to_np_image( waveform: torch.Tensor, sample_rate: int, n_fft: int, hop_length: int, freq_max: float, width: int, height: int, ) -> np.ndarray: """ Returns a numpy image of shape (height, width) that represents the waveform tensor as an image of its spectrogram. Args: waveform (torch.Tensor): audio waveform of dimension of `(..., time)` sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz) duration (float): time in seconds of the waveform n_fft (int): Size of FFT hop_length (int): Length of hop between STFT windows. freq_max (float): cutoff frequency (Hz) width (int): width of the generated image height (int): height of the generated image """ spectrogram = waveform_to_spectrogram( waveform=waveform, sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, freq_max=freq_max, ) return spectrogram_tensor_to_np_image( spectrogram=spectrogram, width=width, height=height, ) def batch_sequence(xs: list, batch_size: int): """ Yields successive n-sized batches from xs. """ for i in range(0, len(xs), batch_size): yield xs[i : i + batch_size] def inference( model: YOLO, audio_filepath: Path, duration: float, overlap: float, width: int, height: int, freq_max: float, n_fft: int, hop_length: int, batch_size: int, output_dir: Path, save_spectrograms: bool, save_predictions: bool, verbose: bool, ) -> list: """ Inference entry point for running on an entire audio_filepath sound file. """ logging.info(f"Loading audio filepath {audio_filepath}") # waveform, sample_rate = torchaudio.load(audio_filepath) waveform, sample_rate = load_audio(audio_filepath) waveforms = chunk( waveform=waveform, sample_rate=sample_rate, duration=duration, overlap=overlap, ) logging.info(f"Chunking the waveform into {len(waveforms)} overlapping clips") logging.info(f"Generating {len(waveforms)} spectrograms") images = [ Image.fromarray( waveform_to_np_image( waveform=y, sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, freq_max=freq_max, width=width, height=height, ) ) for y in tqdm(waveforms) ] if save_spectrograms: save_dir = output_dir / "spectrograms" logging.info(f"Saving spectrograms in {save_dir}") save_dir.mkdir(exist_ok=True, parents=True) for i, image in tqdm(enumerate(images), total=len(images)): image.save(save_dir / f"spectrogram_{i}.png") results = [] batches = list(batch_sequence(images, batch_size=batch_size)) logging.info(f"Running inference on the spectrograms, {len(batches)} batches") for batch in tqdm(batches): results.extend(model.predict(batch, verbose=verbose)) if save_predictions: save_dir = output_dir / "predictions" save_dir.mkdir(parents=True, exist_ok=True) logging.info(f"Saving predictions in {save_dir}") for i, yolov8_prediction in tqdm(enumerate(results), total=len(results)): yolov8_prediction.save(str(save_dir / f"prediction_{i}.png")) return results def index_to_relative_offset(idx: int, duration: float, overlap: float) -> float: """ Returns the relative offset in seconds based on the provided spectrogram index, the duration and the overlap. """ return idx * (duration - overlap) def from_yolov8_prediction( yolov8_prediction, idx: int, duration: float, overlap: float, freq_min: float, freq_max: float, ) -> list[dict]: results = [] for k, box_xyxyn in enumerate(yolov8_prediction.boxes.xyxyn): conf = yolov8_prediction.boxes.conf[k].item() x1, y1, x2, y2 = box_xyxyn.numpy() xmin = min(x1, x2) xmax = max(x1, x2) ymin = min(y1, y2) ymax = max(y1, y2) freq_start = ymin * (freq_max - freq_min) freq_end = ymax * (freq_max - freq_min) t_start = xmin * duration + index_to_relative_offset( idx=idx, duration=duration, overlap=overlap ) t_end = xmax * duration + index_to_relative_offset( idx=idx, duration=duration, overlap=overlap ) data = { "probability": conf, "freq_start": freq_start, "freq_end": freq_end, "t_start": t_start, "t_end": t_end, } results.append(data) return results def to_dataframe( yolov8_predictions, duration: float, overlap: float, freq_min: float, freq_max: float, ) -> pd.DataFrame: """ Turns the yolov8 predictions into a pandas dataframe, taking into account the relative offset of each prediction. The dataframes contains the following columns probability (float): float in 0-1 that represents the probability that this is an actual rumble freq_start (float): Hz - where the box starts on the frequency axis freq_end (float): Hz - where the box ends on the frequency axis t_start (float): Hz - where the box starts on the time axis t_end (float): Hz - where the box ends on the time axis """ results = [] for idx, yolov8_prediction in enumerate(yolov8_predictions): results.extend( from_yolov8_prediction( yolov8_prediction, idx=idx, duration=duration, overlap=overlap, freq_min=freq_min, freq_max=freq_max, ) ) return pd.DataFrame(results) def bgr_to_rgb(a: np.ndarray) -> np.ndarray: """ Turn a BGR numpy array into a RGB numpy array when the array `a` represents an image. """ return a[:, :, ::-1] def get_concat_v(im1: Image.Image, im2: Image.Image) -> Image.Image: """ Concatenate vertically two PIL images. """ dst = Image.new('RGB', (im1.width, im1.height + im2.height)) dst.paste(im1, (0, 0)) dst.paste(im2, (0, im1.height)) return dst