|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import scipy |
|
|
import torch |
|
|
|
|
|
from desed_task.evaluation.evaluation_measures import compute_sed_eval_metrics |
|
|
import json |
|
|
|
|
|
import soundfile |
|
|
import glob |
|
|
from thop import profile, clever_format |
|
|
|
|
|
from sed_scores_eval.base_modules.scores import create_score_dataframe |
|
|
|
|
|
|
|
|
def batched_decode_preds( |
|
|
strong_preds, filenames, encoder, thresholds=[0.5], median_filter=7, pad_indx=None, |
|
|
): |
|
|
""" Decode a batch of predictions to dataframes. Each threshold gives a different dataframe and stored in a |
|
|
dictionary |
|
|
|
|
|
Args: |
|
|
strong_preds: torch.Tensor, batch of strong predictions. |
|
|
filenames: list, the list of filenames of the current batch. |
|
|
encoder: ManyHotEncoder object, object used to decode predictions. |
|
|
thresholds: list, the list of thresholds to be used for predictions. |
|
|
median_filter: int, the number of frames for which to apply median window (smoothing). |
|
|
pad_indx: list, the list of indexes which have been used for padding. |
|
|
|
|
|
Returns: |
|
|
dict of predictions, each keys is a threshold and the value is the DataFrame of predictions. |
|
|
""" |
|
|
|
|
|
scores_raw = {} |
|
|
scores_postprocessed = {} |
|
|
prediction_dfs = {} |
|
|
for threshold in thresholds: |
|
|
prediction_dfs[threshold] = pd.DataFrame() |
|
|
|
|
|
for j in range(strong_preds.shape[0]): |
|
|
audio_id = Path(filenames[j]).stem |
|
|
filename = audio_id + ".wav" |
|
|
c_scores = strong_preds[j] |
|
|
if pad_indx is not None: |
|
|
true_len = int(c_scores.shape[-1] * pad_indx[j].item()) |
|
|
c_scores = c_scores[:true_len] |
|
|
c_scores = c_scores.transpose(0, 1).detach().cpu().numpy() |
|
|
scores_raw[audio_id] = create_score_dataframe( |
|
|
scores=c_scores, |
|
|
timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)), |
|
|
event_classes=encoder.labels, |
|
|
) |
|
|
c_scores = scipy.ndimage.filters.median_filter(c_scores, (median_filter, 1)) |
|
|
scores_postprocessed[audio_id] = create_score_dataframe( |
|
|
scores=c_scores, |
|
|
timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)), |
|
|
event_classes=encoder.labels, |
|
|
) |
|
|
for c_th in thresholds: |
|
|
pred = c_scores > c_th |
|
|
pred = encoder.decode_strong(pred) |
|
|
pred = pd.DataFrame(pred, columns=["event_label", "onset", "offset"]) |
|
|
pred["filename"] = filename |
|
|
prediction_dfs[c_th] = pd.concat([prediction_dfs[c_th], pred], ignore_index=True) |
|
|
|
|
|
return scores_raw, scores_postprocessed, prediction_dfs |
|
|
|
|
|
|
|
|
def convert_to_event_based(weak_dataframe): |
|
|
""" Convert a weakly labeled DataFrame ('filename', 'event_labels') to a DataFrame strongly labeled |
|
|
('filename', 'onset', 'offset', 'event_label'). |
|
|
|
|
|
Args: |
|
|
weak_dataframe: pd.DataFrame, the dataframe to be converted. |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame, the dataframe strongly labeled. |
|
|
""" |
|
|
|
|
|
new = [] |
|
|
for i, r in weak_dataframe.iterrows(): |
|
|
|
|
|
events = r["event_labels"].split(",") |
|
|
for e in events: |
|
|
new.append( |
|
|
{"filename": r["filename"], "event_label": e, "onset": 0, "offset": 1} |
|
|
) |
|
|
return pd.DataFrame(new) |
|
|
|
|
|
|
|
|
def log_sedeval_metrics(predictions, ground_truth, save_dir=None): |
|
|
""" Return the set of metrics from sed_eval |
|
|
Args: |
|
|
predictions: pd.DataFrame, the dataframe of predictions. |
|
|
ground_truth: pd.DataFrame, the dataframe of groundtruth. |
|
|
save_dir: str, path to the folder where to save the event and segment based metrics outputs. |
|
|
|
|
|
Returns: |
|
|
tuple, event-based macro-F1 and micro-F1, segment-based macro-F1 and micro-F1 |
|
|
""" |
|
|
if predictions.empty: |
|
|
return 0.0, 0.0, 0.0, 0.0 |
|
|
|
|
|
gt = pd.read_csv(ground_truth, sep="\t") |
|
|
|
|
|
event_res, segment_res = compute_sed_eval_metrics(predictions, gt) |
|
|
|
|
|
if save_dir is not None: |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
with open(os.path.join(save_dir, "event_f1.txt"), "w") as f: |
|
|
f.write(str(event_res)) |
|
|
|
|
|
with open(os.path.join(save_dir, "segment_f1.txt"), "w") as f: |
|
|
f.write(str(segment_res)) |
|
|
|
|
|
return ( |
|
|
event_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
|
|
event_res.results()["overall"]["f_measure"]["f_measure"], |
|
|
segment_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
|
|
segment_res.results()["overall"]["f_measure"]["f_measure"], |
|
|
) |
|
|
|
|
|
|
|
|
def parse_jams(jams_list, encoder, out_json): |
|
|
|
|
|
if len(jams_list) == 0: |
|
|
raise IndexError("jams list is empty ! Wrong path ?") |
|
|
|
|
|
backgrounds = [] |
|
|
sources = [] |
|
|
for jamfile in jams_list: |
|
|
|
|
|
with open(jamfile, "r") as f: |
|
|
jdata = json.load(f) |
|
|
|
|
|
|
|
|
assert len(jdata["annotations"][0]["data"]) == len( |
|
|
jdata["annotations"][-1]["sandbox"]["scaper"]["isolated_events_audio_path"] |
|
|
) |
|
|
|
|
|
for indx, sound in enumerate(jdata["annotations"][0]["data"]): |
|
|
source_name = Path( |
|
|
jdata["annotations"][-1]["sandbox"]["scaper"][ |
|
|
"isolated_events_audio_path" |
|
|
][indx] |
|
|
).stem |
|
|
source_file = os.path.join( |
|
|
Path(jamfile).parent, |
|
|
Path(jamfile).stem + "_events", |
|
|
source_name + ".wav", |
|
|
) |
|
|
|
|
|
if sound["value"]["role"] == "background": |
|
|
backgrounds.append(source_file) |
|
|
else: |
|
|
if ( |
|
|
sound["value"]["label"] not in encoder.labels |
|
|
): |
|
|
if sound["value"]["label"].startswith("Frying"): |
|
|
sound["value"]["label"] = "Frying" |
|
|
elif sound["value"]["label"].startswith("Vacuum_cleaner"): |
|
|
sound["value"]["label"] = "Vacuum_cleaner" |
|
|
else: |
|
|
raise NotImplementedError |
|
|
|
|
|
sources.append( |
|
|
{ |
|
|
"filename": source_file, |
|
|
"onset": sound["value"]["event_time"], |
|
|
"offset": sound["value"]["event_time"] |
|
|
+ sound["value"]["event_duration"], |
|
|
"event_label": sound["value"]["label"], |
|
|
} |
|
|
) |
|
|
|
|
|
os.makedirs(Path(out_json).parent, exist_ok=True) |
|
|
with open(out_json, "w") as f: |
|
|
json.dump({"backgrounds": backgrounds, "sources": sources}, f, indent=4) |
|
|
|
|
|
|
|
|
def generate_tsv_wav_durations(audio_dir, out_tsv): |
|
|
""" |
|
|
Generate a dataframe with filename and duration of the file |
|
|
|
|
|
Args: |
|
|
audio_dir: str, the path of the folder where audio files are (used by glob.glob) |
|
|
out_tsv: str, the path of the output tsv file |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame: the dataframe containing filenames and durations |
|
|
""" |
|
|
meta_list = [] |
|
|
for file in glob.glob(os.path.join(audio_dir, "*.wav")): |
|
|
d = soundfile.info(file).duration |
|
|
meta_list.append([os.path.basename(file), d]) |
|
|
meta_df = pd.DataFrame(meta_list, columns=["filename", "duration"]) |
|
|
if out_tsv is not None: |
|
|
meta_df.to_csv(out_tsv, sep="\t", index=False, float_format="%.1f") |
|
|
|
|
|
return meta_df |
|
|
|
|
|
|
|
|
def calculate_macs(model, config, dataset=None): |
|
|
""" |
|
|
The function calculate the multiply–accumulate operation (MACs) of the model given as input. |
|
|
|
|
|
Args: |
|
|
model: deep learning model to calculate the macs for |
|
|
config: config used to train the model |
|
|
dataset: dataset used to train the model |
|
|
|
|
|
Returns: |
|
|
|
|
|
""" |
|
|
n_frames = int(((config["feats"]["sample_rate"] * config["data"]["audio_max_len"]) / config["feats"]["hop_length"])+1) |
|
|
input_size = [sum(config["training"]["batch_size"]), config["feats"]["n_mels"], n_frames] |
|
|
input = torch.randn(input_size) |
|
|
|
|
|
if "use_embeddings" in config["net"] and config["net"]["use_embeddings"]: |
|
|
audio, label, padded_indxs, path, embeddings = dataset[0] |
|
|
embeddings = embeddings.repeat((sum(config["training"]["batch_size"])), 1, 1) |
|
|
macs, params = profile(model, inputs=(input, None, embeddings)) |
|
|
else: |
|
|
macs, params = profile(model, inputs=(input,)) |
|
|
|
|
|
macs, params = clever_format([macs, params], "%.3f") |
|
|
return macs, params |
|
|
|