hilamanor's picture
initial commit
e73da9c
import numpy as np
import torch
from torch import nn as nn
from torchvision.ops.misc import FrozenBatchNorm2d
import logging
# import h5py
from tqdm import tqdm
import random
import json
import os
import pathlib
# TODO: (yusong) this not a good place to store those information and does not scale. Need to be fixed later.
dataset_split = {
"audiocaps": ["train", "valid", "test"],
"audioset": ["balanced_train", "unbalanced_train", "eval"],
"BBCSoundEffects": ["train", "test"],
"Clotho": ["train", "test", "valid"],
"free_to_use_sounds": ["train", "test"],
"paramount_motion": ["train", "test"],
"sonniss_game_effects": ["train", "test"],
"wesoundeffects": ["train", "test"],
"MACS": ["train", "test"],
"freesound": ["train", "test"],
"FSD50K": ["train", "test", "valid"],
"fsd50k_class_label": ["train", "test", "valid"],
"esc50": ["train", "test"],
"audiostock": ["train", "test"],
"freesound_no_overlap_noesc50": ["train", "test"],
"epidemic_sound_effects": ["train", "test"],
"VGGSound": ["train", "test"],
"urbansound8k_class_label": ["train", "test"],
"audioset_t5": ["balanced_train", "unbalanced_train", "eval"],
"epidemic_sound_effects_t5": ["train", "test"],
"WavText5K": ["train", "test"],
"esc50_no_overlap": ["train", "test"],
"usd8k_no_overlap": ["train", "test"],
"fsd50k_200_class_label": ["train", "test", "valid"],
}
def freeze_batch_norm_2d(module, module_match={}, name=""):
"""
Converts all `BatchNorm2d` and `SyncBatchNorm` layers of provided module into `FrozenBatchNorm2d`. If `module` is
itself an instance of either `BatchNorm2d` or `SyncBatchNorm`, it is converted into `FrozenBatchNorm2d` and
returned. Otherwise, the module is walked recursively and submodules are converted in place.
Args:
module (torch.nn.Module): Any PyTorch module.
module_match (dict): Dictionary of full module names to freeze (all if empty)
name (str): Full module name (prefix)
Returns:
torch.nn.Module: Resulting module
Inspired by https://github.com/pytorch/pytorch/blob/a5895f85be0f10212791145bfedc0261d364f103/torch/nn/modules/batchnorm.py#L762
"""
res = module
is_match = True
if module_match:
is_match = name in module_match
if is_match and isinstance(
module, (nn.modules.batchnorm.BatchNorm2d, nn.modules.batchnorm.SyncBatchNorm)
):
res = FrozenBatchNorm2d(module.num_features)
res.num_features = module.num_features
res.affine = module.affine
if module.affine:
res.weight.data = module.weight.data.clone().detach()
res.bias.data = module.bias.data.clone().detach()
res.running_mean.data = module.running_mean.data
res.running_var.data = module.running_var.data
res.eps = module.eps
else:
for child_name, child in module.named_children():
full_child_name = ".".join([name, child_name]) if name else child_name
new_child = freeze_batch_norm_2d(child, module_match, full_child_name)
if new_child is not child:
res.add_module(child_name, new_child)
return res
def exist(dataset_name, dataset_type):
"""
Check if dataset exists
"""
if dataset_type in dataset_split[dataset_name]:
return True
else:
return False
def get_tar_path_from_dataset_name(
dataset_names, dataset_types, islocal, dataset_path, proportion=1, full_dataset=None
):
"""
Get tar path from dataset name and type
"""
output = []
for n in dataset_names:
if full_dataset is not None and n in full_dataset:
current_dataset_types = dataset_split[n]
else:
current_dataset_types = dataset_types
for s in current_dataset_types:
tmp = []
if islocal:
sizefilepath_ = f"{dataset_path}/{n}/{s}/sizes.json"
if not os.path.exists(sizefilepath_):
sizefilepath_ = f"./json_files/{n}/{s}/sizes.json"
else:
sizefilepath_ = f"./json_files/{n}/{s}/sizes.json"
if not os.path.exists(sizefilepath_):
continue
sizes = json.load(open(sizefilepath_, "r"))
for k in sizes.keys():
if islocal:
tmp.append(f"{dataset_path}/{n}/{s}/{k}")
else:
tmp.append(
f"pipe:aws s3 --cli-connect-timeout 0 cp s3://s-laion-audio/webdataset_tar/{n}/{s}/{k} -"
)
if proportion != 1:
tmp = random.sample(tmp, int(proportion * len(tmp)))
output.append(tmp)
return sum(output, [])
def get_tar_path_from_txts(txt_path, islocal, proportion=1):
"""
Get tar path from txt path
"""
if isinstance(txt_path, (list, tuple)):
return sum(
[
get_tar_path_from_txts(
txt_path[i], islocal=islocal, proportion=proportion
)
for i in range(len(txt_path))
],
[],
)
if isinstance(txt_path, str):
with open(txt_path) as f:
lines = f.readlines()
if islocal:
lines = [
lines[i]
.split("\n")[0]
.replace("pipe:aws s3 cp s3://s-laion-audio/", "/mnt/audio_clip/")
for i in range(len(lines))
]
else:
lines = [
lines[i].split("\n")[0].replace(".tar", ".tar -")
for i in range(len(lines))
]
if proportion != 1:
print("Sampling tars with proportion of {}".format(proportion))
lines = random.sample(lines, int(proportion * len(lines)))
return lines
def get_mix_lambda(mixup_alpha, batch_size):
mixup_lambdas = [
np.random.beta(mixup_alpha, mixup_alpha, 1)[0] for _ in range(batch_size)
]
return np.array(mixup_lambdas).astype(np.float32)
def do_mixup(x, mixup_lambda):
"""
Args:
x: (batch_size , ...)
mixup_lambda: (batch_size,)
Returns:
out: (batch_size, ...)
"""
out = (
x.transpose(0, -1) * mixup_lambda
+ torch.flip(x, dims=[0]).transpose(0, -1) * (1 - mixup_lambda)
).transpose(0, -1)
return out
def interpolate(x, ratio):
"""Interpolate data in time domain. This is used to compensate the
resolution reduction in downsampling of a CNN.
Args:
x: (batch_size, time_steps, classes_num)
ratio: int, ratio to interpolate
Returns:
upsampled: (batch_size, time_steps * ratio, classes_num)
"""
(batch_size, time_steps, classes_num) = x.shape
upsampled = x[:, :, None, :].repeat(1, 1, ratio, 1)
upsampled = upsampled.reshape(batch_size, time_steps * ratio, classes_num)
return upsampled
def pad_framewise_output(framewise_output, frames_num):
"""Pad framewise_output to the same length as input frames. The pad value
is the same as the value of the last frame.
Args:
framewise_output: (batch_size, frames_num, classes_num)
frames_num: int, number of frames to pad
Outputs:
output: (batch_size, frames_num, classes_num)
"""
pad = framewise_output[:, -1:, :].repeat(
1, frames_num - framewise_output.shape[1], 1
)
"""tensor for padding"""
output = torch.cat((framewise_output, pad), dim=1)
"""(batch_size, frames_num, classes_num)"""
# def process_ipc(index_path, classes_num, filename):
# # load data
# logging.info("Load Data...............")
# ipc = [[] for _ in range(classes_num)]
# with h5py.File(index_path, "r") as f:
# for i in tqdm(range(len(f["target"]))):
# t_class = np.where(f["target"][i])[0]
# for t in t_class:
# ipc[t].append(i)
# print(ipc)
# np.save(filename, ipc)
# logging.info("Load Data Succeed...............")
def save_to_dict(s, o_={}):
sp = s.split(": ")
o_.update({sp[0]: float(sp[1])})
return o_
def get_data_from_log(txt_path):
"""
Output dictionary from out.txt log file
"""
with open(txt_path) as f:
lines = f.readlines()
val_data = {}
train_data = {}
train_losses = []
train_losses_epoch = []
for i in range(len(lines)):
if "| INFO |" in lines[i]:
if "Eval Epoch" in lines[i]:
if "val_loss" in lines[i]:
# float(regex.sub("", lines[310].split(" ")[-1]).replace(" ", ""))
line = lines[i].split("Eval Epoch: ")[-1]
num_epoch = int(line.split(" ")[0].split(" ")[0])
d = {
line.split(" ")[0]
.split(" ")[1]
.replace(":", ""): float(line.split(" ")[0].split(" ")[-1])
}
for i in range(1, len(line.split(" "))):
d = save_to_dict(line.split(" ")[i], d)
val_data[num_epoch] = d
elif "Train Epoch" in lines[i]:
num_epoch = int(lines[i].split("Train Epoch: ")[1][0])
loss = float(lines[i].split("Loss: ")[-1].split(" (")[0])
train_losses.append(loss)
train_losses_epoch.append(num_epoch)
for i in range(len(train_losses)):
train_data[i] = {
"num_epoch": train_losses_epoch[i],
"train_loss": train_losses[i],
}
return train_data, val_data
def save_p(obj, filename):
import pickle
try:
from deepdiff import DeepDiff
except:
os.system("pip install deepdiff")
from deepdiff import DeepDiff
with open(filename, "wb") as file:
pickle.dump(obj, file, protocol=pickle.HIGHEST_PROTOCOL) # highest protocol
with open(filename, "rb") as file:
z = pickle.load(file)
assert (
DeepDiff(obj, z, ignore_string_case=True) == {}
), "there is something wrong with the saving process"
return
def load_p(filename):
import pickle
with open(filename, "rb") as file:
z = pickle.load(file)
return z
def save_json(data, name="data.json"):
import json
with open(name, "w") as fp:
json.dump(data, fp)
return
def load_json(name):
import json
with open(name, "r") as fp:
data = json.load(fp)
return data
from multiprocessing import Process, Manager
from multiprocessing import Process, Value, Array
from ctypes import c_wchar
def load_class_label(path):
# https://stackoverflow.com/questions/48004243/how-to-share-large-read-only-dictionary-list-across-processes-in-multiprocessing
# https://stackoverflow.com/questions/45693949/storing-strings-in-a-multiprocessing-sharedctypes-array
out = None
if path is not None:
if pathlib.Path(path).suffix in [".pkl", ".pickle"]:
out = load_p(path)
elif pathlib.Path(path).suffix in [".json", ".txt"]:
out = load_json(path)
elif pathlib.Path(path).suffix in [".npy", ".npz"]:
out = np.load(path)
elif pathlib.Path(path).suffix in [".csv"]:
import pandas as pd
out = pd.read_csv(path)
return out
# if out is None:
# return None
# else:
# key = Array(c_wchar, '\n'.join(list(out.keys())), lock=False)
# val = Array('i', out.values(), lock=False)
# return (key, val)
from torch import optim
def get_optimizer(params, lr, betas, eps, momentum, optimizer_name):
if optimizer_name.lower() == "adamw":
optimizer = optim.AdamW(params, lr=lr, betas=betas, eps=eps)
elif optimizer_name.lower() == "sgd":
optimizer = optim.SGD(params, lr=lr, momentum=momentum)
elif optimizer_name.lower() == "adam":
optimizer = optim.Adam(params, lr=lr, betas=betas, eps=eps)
else:
raise ValueError("optimizer name is not correct")
return optimizer