Spaces:
Sleeping
Sleeping
File size: 9,720 Bytes
bab1cc1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
"""Module containing common function.
"""
import os
import traceback
import numpy as np
from pathlib import Path
import config as cfg
def collect_audio_files(path: str):
"""Collects all audio files in the given directory.
Args:
path: The directory to be searched.
Returns:
A sorted list of all audio files in the directory.
"""
# Get all files in directory with os.walk
files = []
for root, _, flist in os.walk(path):
for f in flist:
if not f.startswith(".") and f.rsplit(".", 1)[-1].lower() in cfg.ALLOWED_FILETYPES:
files.append(os.path.join(root, f))
return sorted(files)
def readLines(path: str):
"""Reads the lines into a list.
Opens the file and reads its contents into a list.
It is expected to have one line for each species or label.
Args:
path: Absolute path to the species file.
Returns:
A list of all species inside the file.
"""
return Path(path).read_text(encoding="utf-8").splitlines() if path else []
def list_subdirectories(path: str):
"""Lists all directories inside a path.
Retrieves all the subdirectories in a given path without recursion.
Args:
path: Directory to be searched.
Returns:
A filter sequence containing the absolute paths to all directories.
"""
return filter(lambda el: os.path.isdir(os.path.join(path, el)), os.listdir(path))
def random_split(x, y, val_ratio=0.2):
"""Splits the data into training and validation data.
Makes sure that each class is represented in both sets.
Args:
x: Samples.
y: One-hot labels.
val_ratio: The ratio of validation data.
Returns:
A tuple of (x_train, y_train, x_val, y_val).
"""
# Set numpy random seed
np.random.seed(cfg.RANDOM_SEED)
# Get number of classes
num_classes = y.shape[1]
# Initialize training and validation data
x_train, y_train, x_val, y_val = [], [], [], []
# Split data
for i in range(num_classes):
# Get indices of current class
indices = np.where(y[:, i] == 1)[0]
# Get number of samples for each set
num_samples = len(indices)
num_samples_train = max(1, int(num_samples * (1 - val_ratio)))
num_samples_val = max(0, num_samples - num_samples_train)
# Randomly choose samples for training and validation
np.random.shuffle(indices)
train_indices = indices[:num_samples_train]
val_indices = indices[num_samples_train:num_samples_train + num_samples_val]
# Append samples to training and validation data
x_train.append(x[train_indices])
y_train.append(y[train_indices])
x_val.append(x[val_indices])
y_val.append(y[val_indices])
# Concatenate data
x_train = np.concatenate(x_train)
y_train = np.concatenate(y_train)
x_val = np.concatenate(x_val)
y_val = np.concatenate(y_val)
# Shuffle data
indices = np.arange(len(x_train))
np.random.shuffle(indices)
x_train = x_train[indices]
y_train = y_train[indices]
indices = np.arange(len(x_val))
np.random.shuffle(indices)
x_val = x_val[indices]
y_val = y_val[indices]
return x_train, y_train, x_val, y_val
def mixup(x, y, augmentation_ratio=0.25, alpha=0.2):
"""Apply mixup to the given data.
Mixup is a data augmentation technique that generates new samples by
mixing two samples and their labels.
Args:
x: Samples.
y: One-hot labels.
augmentation_ratio: The ratio of augmented samples.
alpha: The beta distribution parameter.
Returns:
Augmented data.
"""
# Calculate the number of samples to augment based on the ratio
num_samples_to_augment = int(len(x) * augmentation_ratio)
for _ in range(num_samples_to_augment):
# Randomly choose one instance from the dataset
index = np.random.choice(len(x))
x1, y1 = x[index], y[index]
# Randomly choose a different instance from the dataset
second_index = np.random.choice(len(x))
while second_index == index:
second_index = np.random.choice(len(x))
x2, y2 = x[second_index], y[second_index]
# Generate a random mixing coefficient (lambda)
lambda_ = np.random.beta(alpha, alpha)
# Mix the embeddings and labels
mixed_x = lambda_ * x1 + (1 - lambda_) * x2
mixed_y = lambda_ * y1 + (1 - lambda_) * y2
# Replace one of the original samples and labels with the augmented sample and labels
x[index] = mixed_x
y[index] = mixed_y
return x, y
def label_smoothing(y, alpha=0.1):
# Subtract alpha from correct label when it is >0
y[y > 0] -= alpha
# Assigned alpha to all other labels
y[y == 0] = alpha / y.shape[0]
return y
def upsampling(x, y, ratio=0.5, mode="repeat"):
"""Balance data through upsampling.
We upsample minority classes to have at least 10% (ratio=0.1) of the samples of the majority class.
Args:
x: Samples.
y: One-hot labels.
ratio: The minimum ratio of minority to majority samples.
mode: The upsampling mode. Either 'repeat', 'mean' or 'smote'.
Returns:
Upsampled data.
"""
# Set numpy random seed
np.random.seed(cfg.RANDOM_SEED)
# Determin min number of samples
min_samples = int(np.max(y.sum(axis=0)) * ratio)
x_temp = []
y_temp = []
if mode == 'repeat':
# For each class with less than min_samples ranomdly repeat samples
for i in range(y.shape[1]):
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose a sample from the minority class
random_index = np.random.choice(np.where(y[:, i] == 1)[0])
# Append the sample and label to a temp list
x_temp.append(x[random_index])
y_temp.append(y[random_index])
elif mode == 'mean':
# For each class with less than min_samples
# select two random samples and calculate the mean
for i in range(y.shape[1]):
x_temp = []
y_temp = []
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose two samples from the minority class
random_indices = np.random.choice(np.where(y[:, i] == 1)[0], 2)
# Calculate the mean of the two samples
mean = np.mean(x[random_indices], axis=0)
# Append the mean and label to a temp list
x_temp.append(mean)
y_temp.append(y[random_indices[0]])
elif mode == 'smote':
# For each class with less than min_samples apply SMOTE
for i in range(y.shape[1]):
x_temp = []
y_temp = []
while y[:, i].sum() + len(y_temp) < min_samples:
# Randomly choose a sample from the minority class
random_index = np.random.choice(np.where(y[:, i] == 1)[0])
# Get the k nearest neighbors
k = 5
distances = np.sqrt(np.sum((x - x[random_index])**2, axis=1))
indices = np.argsort(distances)[1:k+1]
# Randomly choose one of the neighbors
random_neighbor = np.random.choice(indices)
# Calculate the difference vector
diff = x[random_neighbor] - x[random_index]
# Randomly choose a weight between 0 and 1
weight = np.random.uniform(0, 1)
# Calculate the new sample
new_sample = x[random_index] + weight * diff
# Append the new sample and label to a temp list
x_temp.append(new_sample)
y_temp.append(y[random_index])
# Append the temp list to the original data
if len(x_temp) > 0:
x = np.vstack((x, np.array(x_temp)))
y = np.vstack((y, np.array(y_temp)))
# Shuffle data
indices = np.arange(len(x))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]
return x, y
def saveToCache(cache_file: str, x_train: np.ndarray, y_train: np.ndarray, labels: list[str]):
"""Saves the training data to a cache file.
Args:
cache_file: The path to the cache file.
x_train: The training samples.
y_train: The training labels.
labels: The list of labels.
"""
# Create cache directory
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
# Save to cache
np.savez_compressed(cache_file, x_train=x_train, y_train=y_train, labels=labels)
def loadFromCache(cache_file: str):
"""Loads the training data from a cache file.
Args:
cache_file: The path to the cache file.
Returns:
A tuple of (x_train, y_train, labels).
"""
# Load from cache
cache = np.load(cache_file, allow_pickle=True)
# Get data
x_train = cache["x_train"]
y_train = cache["y_train"]
labels = cache["labels"]
return x_train, y_train, labels
def clearErrorLog():
"""Clears the error log file.
For debugging purposes.
"""
if os.path.isfile(cfg.ERROR_LOG_FILE):
os.remove(cfg.ERROR_LOG_FILE)
def writeErrorLog(ex: Exception):
"""Writes an exception to the error log.
Formats the stacktrace and writes it in the error log file configured in the config.
Args:
ex: An exception that occurred.
"""
with open(cfg.ERROR_LOG_FILE, "a") as elog:
elog.write("".join(traceback.TracebackException.from_exception(ex).format()) + "\n")
|