BirdNet / utils.py
BilalSardar's picture
Upload 4 files
bab1cc1
"""Module containing common function.
"""
import os
import traceback
import numpy as np
from pathlib import Path
import config as cfg
def collect_audio_files(path: str):
"""Collects all audio files in the given directory.
Args:
path: The directory to be searched.
Returns:
A sorted list of all audio files in the directory.
"""
# Get all files in directory with os.walk
files = []
for root, _, flist in os.walk(path):
for f in flist:
if not f.startswith(".") and f.rsplit(".", 1)[-1].lower() in cfg.ALLOWED_FILETYPES:
files.append(os.path.join(root, f))
return sorted(files)
def readLines(path: str):
"""Reads the lines into a list.
Opens the file and reads its contents into a list.
It is expected to have one line for each species or label.
Args:
path: Absolute path to the species file.
Returns:
A list of all species inside the file.
"""
return Path(path).read_text(encoding="utf-8").splitlines() if path else []
def list_subdirectories(path: str):
"""Lists all directories inside a path.
Retrieves all the subdirectories in a given path without recursion.
Args:
path: Directory to be searched.
Returns:
A filter sequence containing the absolute paths to all directories.
"""
return filter(lambda el: os.path.isdir(os.path.join(path, el)), os.listdir(path))
def random_split(x, y, val_ratio=0.2):
"""Splits the data into training and validation data.
Makes sure that each class is represented in both sets.
Args:
x: Samples.
y: One-hot labels.
val_ratio: The ratio of validation data.
Returns:
A tuple of (x_train, y_train, x_val, y_val).
"""
# Set numpy random seed
np.random.seed(cfg.RANDOM_SEED)
# Get number of classes
num_classes = y.shape[1]
# Initialize training and validation data
x_train, y_train, x_val, y_val = [], [], [], []
# Split data
for i in range(num_classes):
# Get indices of current class
indices = np.where(y[:, i] == 1)[0]
# Get number of samples for each set
num_samples = len(indices)
num_samples_train = max(1, int(num_samples * (1 - val_ratio)))
num_samples_val = max(0, num_samples - num_samples_train)
# Randomly choose samples for training and validation
np.random.shuffle(indices)
train_indices = indices[:num_samples_train]
val_indices = indices[num_samples_train:num_samples_train + num_samples_val]
# Append samples to training and validation data
x_train.append(x[train_indices])
y_train.append(y[train_indices])
x_val.append(x[val_indices])
y_val.append(y[val_indices])
# Concatenate data
x_train = np.concatenate(x_train)
y_train = np.concatenate(y_train)
x_val = np.concatenate(x_val)
y_val = np.concatenate(y_val)
# Shuffle data
indices = np.arange(len(x_train))
np.random.shuffle(indices)
x_train = x_train[indices]
y_train = y_train[indices]
indices = np.arange(len(x_val))
np.random.shuffle(indices)
x_val = x_val[indices]
y_val = y_val[indices]
return x_train, y_train, x_val, y_val
def mixup(x, y, augmentation_ratio=0.25, alpha=0.2):
"""Apply mixup to the given data.
Mixup is a data augmentation technique that generates new samples by
mixing two samples and their labels.
Args:
x: Samples.
y: One-hot labels.
augmentation_ratio: The ratio of augmented samples.
alpha: The beta distribution parameter.
Returns:
Augmented data.
"""
# Calculate the number of samples to augment based on the ratio
num_samples_to_augment = int(len(x) * augmentation_ratio)
for _ in range(num_samples_to_augment):
# Randomly choose one instance from the dataset
index = np.random.choice(len(x))
x1, y1 = x[index], y[index]
# Randomly choose a different instance from the dataset
second_index = np.random.choice(len(x))
while second_index == index:
second_index = np.random.choice(len(x))
x2, y2 = x[second_index], y[second_index]
# Generate a random mixing coefficient (lambda)
lambda_ = np.random.beta(alpha, alpha)
# Mix the embeddings and labels
mixed_x = lambda_ * x1 + (1 - lambda_) * x2
mixed_y = lambda_ * y1 + (1 - lambda_) * y2
# Replace one of the original samples and labels with the augmented sample and labels
x[index] = mixed_x
y[index] = mixed_y
return x, y
def label_smoothing(y, alpha=0.1):
# Subtract alpha from correct label when it is >0
y[y > 0] -= alpha
# Assigned alpha to all other labels
y[y == 0] = alpha / y.shape[0]
return y
def upsampling(x, y, ratio=0.5, mode="repeat"):
"""Balance data through upsampling.
We upsample minority classes to have at least 10% (ratio=0.1) of the samples of the majority class.
Args:
x: Samples.
y: One-hot labels.
ratio: The minimum ratio of minority to majority samples.
mode: The upsampling mode. Either 'repeat', 'mean' or 'smote'.
Returns:
Upsampled data.
"""
# Set numpy random seed
np.random.seed(cfg.RANDOM_SEED)
# Determin min number of samples
min_samples = int(np.max(y.sum(axis=0)) * ratio)
x_temp = []
y_temp = []
if mode == 'repeat':
# For each class with less than min_samples ranomdly repeat samples
for i in range(y.shape[1]):
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose a sample from the minority class
random_index = np.random.choice(np.where(y[:, i] == 1)[0])
# Append the sample and label to a temp list
x_temp.append(x[random_index])
y_temp.append(y[random_index])
elif mode == 'mean':
# For each class with less than min_samples
# select two random samples and calculate the mean
for i in range(y.shape[1]):
x_temp = []
y_temp = []
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose two samples from the minority class
random_indices = np.random.choice(np.where(y[:, i] == 1)[0], 2)
# Calculate the mean of the two samples
mean = np.mean(x[random_indices], axis=0)
# Append the mean and label to a temp list
x_temp.append(mean)
y_temp.append(y[random_indices[0]])
elif mode == 'smote':
# For each class with less than min_samples apply SMOTE
for i in range(y.shape[1]):
x_temp = []
y_temp = []
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose a sample from the minority class
random_index = np.random.choice(np.where(y[:, i] == 1)[0])
# Get the k nearest neighbors
k = 5
distances = np.sqrt(np.sum((x - x[random_index])**2, axis=1))
indices = np.argsort(distances)[1:k+1]
# Randomly choose one of the neighbors
random_neighbor = np.random.choice(indices)
# Calculate the difference vector
diff = x[random_neighbor] - x[random_index]
# Randomly choose a weight between 0 and 1
weight = np.random.uniform(0, 1)
# Calculate the new sample
new_sample = x[random_index] + weight * diff
# Append the new sample and label to a temp list
x_temp.append(new_sample)
y_temp.append(y[random_index])
# Append the temp list to the original data
if len(x_temp) > 0:
x = np.vstack((x, np.array(x_temp)))
y = np.vstack((y, np.array(y_temp)))
# Shuffle data
indices = np.arange(len(x))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]
return x, y
def saveToCache(cache_file: str, x_train: np.ndarray, y_train: np.ndarray, labels: list[str]):
"""Saves the training data to a cache file.
Args:
cache_file: The path to the cache file.
x_train: The training samples.
y_train: The training labels.
labels: The list of labels.
"""
# Create cache directory
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
# Save to cache
np.savez_compressed(cache_file, x_train=x_train, y_train=y_train, labels=labels)
def loadFromCache(cache_file: str):
"""Loads the training data from a cache file.
Args:
cache_file: The path to the cache file.
Returns:
A tuple of (x_train, y_train, labels).
"""
# Load from cache
cache = np.load(cache_file, allow_pickle=True)
# Get data
x_train = cache["x_train"]
y_train = cache["y_train"]
labels = cache["labels"]
return x_train, y_train, labels
def clearErrorLog():
"""Clears the error log file.
For debugging purposes.
"""
if os.path.isfile(cfg.ERROR_LOG_FILE):
os.remove(cfg.ERROR_LOG_FILE)
def writeErrorLog(ex: Exception):
"""Writes an exception to the error log.
Formats the stacktrace and writes it in the error log file configured in the config.
Args:
ex: An exception that occurred.
"""
with open(cfg.ERROR_LOG_FILE, "a") as elog:
elog.write("".join(traceback.TracebackException.from_exception(ex).format()) + "\n")