Spaces:
Running
Running
"""Module containing common function. | |
""" | |
import os | |
import traceback | |
import numpy as np | |
from pathlib import Path | |
import config as cfg | |
def collect_audio_files(path: str): | |
"""Collects all audio files in the given directory. | |
Args: | |
path: The directory to be searched. | |
Returns: | |
A sorted list of all audio files in the directory. | |
""" | |
# Get all files in directory with os.walk | |
files = [] | |
for root, _, flist in os.walk(path): | |
for f in flist: | |
if not f.startswith(".") and f.rsplit(".", 1)[-1].lower() in cfg.ALLOWED_FILETYPES: | |
files.append(os.path.join(root, f)) | |
return sorted(files) | |
def readLines(path: str): | |
"""Reads the lines into a list. | |
Opens the file and reads its contents into a list. | |
It is expected to have one line for each species or label. | |
Args: | |
path: Absolute path to the species file. | |
Returns: | |
A list of all species inside the file. | |
""" | |
return Path(path).read_text(encoding="utf-8").splitlines() if path else [] | |
def list_subdirectories(path: str): | |
"""Lists all directories inside a path. | |
Retrieves all the subdirectories in a given path without recursion. | |
Args: | |
path: Directory to be searched. | |
Returns: | |
A filter sequence containing the absolute paths to all directories. | |
""" | |
return filter(lambda el: os.path.isdir(os.path.join(path, el)), os.listdir(path)) | |
def random_split(x, y, val_ratio=0.2): | |
"""Splits the data into training and validation data. | |
Makes sure that each class is represented in both sets. | |
Args: | |
x: Samples. | |
y: One-hot labels. | |
val_ratio: The ratio of validation data. | |
Returns: | |
A tuple of (x_train, y_train, x_val, y_val). | |
""" | |
# Set numpy random seed | |
np.random.seed(cfg.RANDOM_SEED) | |
# Get number of classes | |
num_classes = y.shape[1] | |
# Initialize training and validation data | |
x_train, y_train, x_val, y_val = [], [], [], [] | |
# Split data | |
for i in range(num_classes): | |
# Get indices of current class | |
indices = np.where(y[:, i] == 1)[0] | |
# Get number of samples for each set | |
num_samples = len(indices) | |
num_samples_train = max(1, int(num_samples * (1 - val_ratio))) | |
num_samples_val = max(0, num_samples - num_samples_train) | |
# Randomly choose samples for training and validation | |
np.random.shuffle(indices) | |
train_indices = indices[:num_samples_train] | |
val_indices = indices[num_samples_train:num_samples_train + num_samples_val] | |
# Append samples to training and validation data | |
x_train.append(x[train_indices]) | |
y_train.append(y[train_indices]) | |
x_val.append(x[val_indices]) | |
y_val.append(y[val_indices]) | |
# Concatenate data | |
x_train = np.concatenate(x_train) | |
y_train = np.concatenate(y_train) | |
x_val = np.concatenate(x_val) | |
y_val = np.concatenate(y_val) | |
# Shuffle data | |
indices = np.arange(len(x_train)) | |
np.random.shuffle(indices) | |
x_train = x_train[indices] | |
y_train = y_train[indices] | |
indices = np.arange(len(x_val)) | |
np.random.shuffle(indices) | |
x_val = x_val[indices] | |
y_val = y_val[indices] | |
return x_train, y_train, x_val, y_val | |
def mixup(x, y, augmentation_ratio=0.25, alpha=0.2): | |
"""Apply mixup to the given data. | |
Mixup is a data augmentation technique that generates new samples by | |
mixing two samples and their labels. | |
Args: | |
x: Samples. | |
y: One-hot labels. | |
augmentation_ratio: The ratio of augmented samples. | |
alpha: The beta distribution parameter. | |
Returns: | |
Augmented data. | |
""" | |
# Calculate the number of samples to augment based on the ratio | |
num_samples_to_augment = int(len(x) * augmentation_ratio) | |
for _ in range(num_samples_to_augment): | |
# Randomly choose one instance from the dataset | |
index = np.random.choice(len(x)) | |
x1, y1 = x[index], y[index] | |
# Randomly choose a different instance from the dataset | |
second_index = np.random.choice(len(x)) | |
while second_index == index: | |
second_index = np.random.choice(len(x)) | |
x2, y2 = x[second_index], y[second_index] | |
# Generate a random mixing coefficient (lambda) | |
lambda_ = np.random.beta(alpha, alpha) | |
# Mix the embeddings and labels | |
mixed_x = lambda_ * x1 + (1 - lambda_) * x2 | |
mixed_y = lambda_ * y1 + (1 - lambda_) * y2 | |
# Replace one of the original samples and labels with the augmented sample and labels | |
x[index] = mixed_x | |
y[index] = mixed_y | |
return x, y | |
def label_smoothing(y, alpha=0.1): | |
# Subtract alpha from correct label when it is >0 | |
y[y > 0] -= alpha | |
# Assigned alpha to all other labels | |
y[y == 0] = alpha / y.shape[0] | |
return y | |
def upsampling(x, y, ratio=0.5, mode="repeat"): | |
"""Balance data through upsampling. | |
We upsample minority classes to have at least 10% (ratio=0.1) of the samples of the majority class. | |
Args: | |
x: Samples. | |
y: One-hot labels. | |
ratio: The minimum ratio of minority to majority samples. | |
mode: The upsampling mode. Either 'repeat', 'mean' or 'smote'. | |
Returns: | |
Upsampled data. | |
""" | |
# Set numpy random seed | |
np.random.seed(cfg.RANDOM_SEED) | |
# Determin min number of samples | |
min_samples = int(np.max(y.sum(axis=0)) * ratio) | |
x_temp = [] | |
y_temp = [] | |
if mode == 'repeat': | |
# For each class with less than min_samples ranomdly repeat samples | |
for i in range(y.shape[1]): | |
while y[:, i].sum() + len(y_temp) < min_samples: | |
# Randomly choose a sample from the minority class | |
random_index = np.random.choice(np.where(y[:, i] == 1)[0]) | |
# Append the sample and label to a temp list | |
x_temp.append(x[random_index]) | |
y_temp.append(y[random_index]) | |
elif mode == 'mean': | |
# For each class with less than min_samples | |
# select two random samples and calculate the mean | |
for i in range(y.shape[1]): | |
x_temp = [] | |
y_temp = [] | |
while y[:, i].sum() + len(y_temp) < min_samples: | |
# Randomly choose two samples from the minority class | |
random_indices = np.random.choice(np.where(y[:, i] == 1)[0], 2) | |
# Calculate the mean of the two samples | |
mean = np.mean(x[random_indices], axis=0) | |
# Append the mean and label to a temp list | |
x_temp.append(mean) | |
y_temp.append(y[random_indices[0]]) | |
elif mode == 'smote': | |
# For each class with less than min_samples apply SMOTE | |
for i in range(y.shape[1]): | |
x_temp = [] | |
y_temp = [] | |
while y[:, i].sum() + len(y_temp) < min_samples: | |
# Randomly choose a sample from the minority class | |
random_index = np.random.choice(np.where(y[:, i] == 1)[0]) | |
# Get the k nearest neighbors | |
k = 5 | |
distances = np.sqrt(np.sum((x - x[random_index])**2, axis=1)) | |
indices = np.argsort(distances)[1:k+1] | |
# Randomly choose one of the neighbors | |
random_neighbor = np.random.choice(indices) | |
# Calculate the difference vector | |
diff = x[random_neighbor] - x[random_index] | |
# Randomly choose a weight between 0 and 1 | |
weight = np.random.uniform(0, 1) | |
# Calculate the new sample | |
new_sample = x[random_index] + weight * diff | |
# Append the new sample and label to a temp list | |
x_temp.append(new_sample) | |
y_temp.append(y[random_index]) | |
# Append the temp list to the original data | |
if len(x_temp) > 0: | |
x = np.vstack((x, np.array(x_temp))) | |
y = np.vstack((y, np.array(y_temp))) | |
# Shuffle data | |
indices = np.arange(len(x)) | |
np.random.shuffle(indices) | |
x = x[indices] | |
y = y[indices] | |
return x, y | |
def saveToCache(cache_file: str, x_train: np.ndarray, y_train: np.ndarray, labels: list[str]): | |
"""Saves the training data to a cache file. | |
Args: | |
cache_file: The path to the cache file. | |
x_train: The training samples. | |
y_train: The training labels. | |
labels: The list of labels. | |
""" | |
# Create cache directory | |
os.makedirs(os.path.dirname(cache_file), exist_ok=True) | |
# Save to cache | |
np.savez_compressed(cache_file, x_train=x_train, y_train=y_train, labels=labels) | |
def loadFromCache(cache_file: str): | |
"""Loads the training data from a cache file. | |
Args: | |
cache_file: The path to the cache file. | |
Returns: | |
A tuple of (x_train, y_train, labels). | |
""" | |
# Load from cache | |
cache = np.load(cache_file, allow_pickle=True) | |
# Get data | |
x_train = cache["x_train"] | |
y_train = cache["y_train"] | |
labels = cache["labels"] | |
return x_train, y_train, labels | |
def clearErrorLog(): | |
"""Clears the error log file. | |
For debugging purposes. | |
""" | |
if os.path.isfile(cfg.ERROR_LOG_FILE): | |
os.remove(cfg.ERROR_LOG_FILE) | |
def writeErrorLog(ex: Exception): | |
"""Writes an exception to the error log. | |
Formats the stacktrace and writes it in the error log file configured in the config. | |
Args: | |
ex: An exception that occurred. | |
""" | |
with open(cfg.ERROR_LOG_FILE, "a") as elog: | |
elog.write("".join(traceback.TracebackException.from_exception(ex).format()) + "\n") | |