FlexSED / src /local /utils.py
OpenSound's picture
Upload 544 files
3b6a091 verified
import os
from pathlib import Path
import numpy as np
import pandas as pd
import scipy
import torch
from desed_task.evaluation.evaluation_measures import compute_sed_eval_metrics
import json
import soundfile
import glob
from thop import profile, clever_format
from sed_scores_eval.base_modules.scores import create_score_dataframe
def batched_decode_preds(
strong_preds, filenames, encoder, thresholds=[0.5], median_filter=7, pad_indx=None,
):
""" Decode a batch of predictions to dataframes. Each threshold gives a different dataframe and stored in a
dictionary
Args:
strong_preds: torch.Tensor, batch of strong predictions.
filenames: list, the list of filenames of the current batch.
encoder: ManyHotEncoder object, object used to decode predictions.
thresholds: list, the list of thresholds to be used for predictions.
median_filter: int, the number of frames for which to apply median window (smoothing).
pad_indx: list, the list of indexes which have been used for padding.
Returns:
dict of predictions, each keys is a threshold and the value is the DataFrame of predictions.
"""
# Init a dataframe per threshold
scores_raw = {}
scores_postprocessed = {}
prediction_dfs = {}
for threshold in thresholds:
prediction_dfs[threshold] = pd.DataFrame()
for j in range(strong_preds.shape[0]): # over batches
audio_id = Path(filenames[j]).stem
filename = audio_id + ".wav"
c_scores = strong_preds[j]
if pad_indx is not None:
true_len = int(c_scores.shape[-1] * pad_indx[j].item())
c_scores = c_scores[:true_len]
c_scores = c_scores.transpose(0, 1).detach().cpu().numpy()
scores_raw[audio_id] = create_score_dataframe(
scores=c_scores,
timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)),
event_classes=encoder.labels,
)
c_scores = scipy.ndimage.filters.median_filter(c_scores, (median_filter, 1))
scores_postprocessed[audio_id] = create_score_dataframe(
scores=c_scores,
timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)),
event_classes=encoder.labels,
)
for c_th in thresholds:
pred = c_scores > c_th
pred = encoder.decode_strong(pred)
pred = pd.DataFrame(pred, columns=["event_label", "onset", "offset"])
pred["filename"] = filename
prediction_dfs[c_th] = pd.concat([prediction_dfs[c_th], pred], ignore_index=True)
return scores_raw, scores_postprocessed, prediction_dfs
def convert_to_event_based(weak_dataframe):
""" Convert a weakly labeled DataFrame ('filename', 'event_labels') to a DataFrame strongly labeled
('filename', 'onset', 'offset', 'event_label').
Args:
weak_dataframe: pd.DataFrame, the dataframe to be converted.
Returns:
pd.DataFrame, the dataframe strongly labeled.
"""
new = []
for i, r in weak_dataframe.iterrows():
events = r["event_labels"].split(",")
for e in events:
new.append(
{"filename": r["filename"], "event_label": e, "onset": 0, "offset": 1}
)
return pd.DataFrame(new)
def log_sedeval_metrics(predictions, ground_truth, save_dir=None):
""" Return the set of metrics from sed_eval
Args:
predictions: pd.DataFrame, the dataframe of predictions.
ground_truth: pd.DataFrame, the dataframe of groundtruth.
save_dir: str, path to the folder where to save the event and segment based metrics outputs.
Returns:
tuple, event-based macro-F1 and micro-F1, segment-based macro-F1 and micro-F1
"""
if predictions.empty:
return 0.0, 0.0, 0.0, 0.0
gt = pd.read_csv(ground_truth, sep="\t")
event_res, segment_res = compute_sed_eval_metrics(predictions, gt)
if save_dir is not None:
os.makedirs(save_dir, exist_ok=True)
with open(os.path.join(save_dir, "event_f1.txt"), "w") as f:
f.write(str(event_res))
with open(os.path.join(save_dir, "segment_f1.txt"), "w") as f:
f.write(str(segment_res))
return (
event_res.results()["class_wise_average"]["f_measure"]["f_measure"],
event_res.results()["overall"]["f_measure"]["f_measure"],
segment_res.results()["class_wise_average"]["f_measure"]["f_measure"],
segment_res.results()["overall"]["f_measure"]["f_measure"],
) # return also segment measures
def parse_jams(jams_list, encoder, out_json):
if len(jams_list) == 0:
raise IndexError("jams list is empty ! Wrong path ?")
backgrounds = []
sources = []
for jamfile in jams_list:
with open(jamfile, "r") as f:
jdata = json.load(f)
# check if we have annotations for each source in scaper
assert len(jdata["annotations"][0]["data"]) == len(
jdata["annotations"][-1]["sandbox"]["scaper"]["isolated_events_audio_path"]
)
for indx, sound in enumerate(jdata["annotations"][0]["data"]):
source_name = Path(
jdata["annotations"][-1]["sandbox"]["scaper"][
"isolated_events_audio_path"
][indx]
).stem
source_file = os.path.join(
Path(jamfile).parent,
Path(jamfile).stem + "_events",
source_name + ".wav",
)
if sound["value"]["role"] == "background":
backgrounds.append(source_file)
else: # it is an event
if (
sound["value"]["label"] not in encoder.labels
): # correct different labels
if sound["value"]["label"].startswith("Frying"):
sound["value"]["label"] = "Frying"
elif sound["value"]["label"].startswith("Vacuum_cleaner"):
sound["value"]["label"] = "Vacuum_cleaner"
else:
raise NotImplementedError
sources.append(
{
"filename": source_file,
"onset": sound["value"]["event_time"],
"offset": sound["value"]["event_time"]
+ sound["value"]["event_duration"],
"event_label": sound["value"]["label"],
}
)
os.makedirs(Path(out_json).parent, exist_ok=True)
with open(out_json, "w") as f:
json.dump({"backgrounds": backgrounds, "sources": sources}, f, indent=4)
def generate_tsv_wav_durations(audio_dir, out_tsv):
"""
Generate a dataframe with filename and duration of the file
Args:
audio_dir: str, the path of the folder where audio files are (used by glob.glob)
out_tsv: str, the path of the output tsv file
Returns:
pd.DataFrame: the dataframe containing filenames and durations
"""
meta_list = []
for file in glob.glob(os.path.join(audio_dir, "*.wav")):
d = soundfile.info(file).duration
meta_list.append([os.path.basename(file), d])
meta_df = pd.DataFrame(meta_list, columns=["filename", "duration"])
if out_tsv is not None:
meta_df.to_csv(out_tsv, sep="\t", index=False, float_format="%.1f")
return meta_df
def calculate_macs(model, config, dataset=None):
"""
The function calculate the multiply–accumulate operation (MACs) of the model given as input.
Args:
model: deep learning model to calculate the macs for
config: config used to train the model
dataset: dataset used to train the model
Returns:
"""
n_frames = int(((config["feats"]["sample_rate"] * config["data"]["audio_max_len"]) / config["feats"]["hop_length"])+1)
input_size = [sum(config["training"]["batch_size"]), config["feats"]["n_mels"], n_frames]
input = torch.randn(input_size)
if "use_embeddings" in config["net"] and config["net"]["use_embeddings"]:
audio, label, padded_indxs, path, embeddings = dataset[0]
embeddings = embeddings.repeat((sum(config["training"]["batch_size"])), 1, 1)
macs, params = profile(model, inputs=(input, None, embeddings))
else:
macs, params = profile(model, inputs=(input,))
macs, params = clever_format([macs, params], "%.3f")
return macs, params