import json import seaborn as sns import matplotlib.pyplot as plt import pandas as pd import os import numpy as np # move to consts buckets_age=['teens','twenties', 'thirties', 'fourties', 'fifties', 'sixties', 'seventies', 'eighties', 'nineties'] buckets_sex=["male", "female"] def load_bigos_analyzer_report(fp:str)->dict: with open(fp, 'r') as f: data = json.load(f) return data def num_of_samples_per_split(dataset_hf): # input - huggingface dataset object # output - dictionary with statistics about number of samples per split out_dict = {} # number of samples per subset and split metric = "samples" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): samples = dataset_hf[split].num_rows ##print(split, samples) out_dict[split] = samples # add number of samples for all splits out_dict["all_splits"] = sum(out_dict.values()) return out_dict def total_audio_duration_per_split(dataset_hf): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "audio[h]" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): #sampling_rate = dataset_hf[split]["sampling_rate"][0] #audio_total_length_samples = 0 #audio_total_length_samples = sum(len(audio_file["array"]) for audio_file in dataset_hf["test"]["audio"]) audio_total_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) audio_total_length_hours = round(audio_total_length_seconds / 3600,2) out_dict[split] = audio_total_length_hours #print(split, audio_total_length_hours) # add number of samples for all splits out_dict["all_splits"] = sum(out_dict.values()) return out_dict def average_audio_duration_per_split(dataset_hf): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "average_audio_duration[s]" print("Calculating {}".format(metric)) samples_all=0 audio_length_total_seconds=0 for split in dataset_hf.keys(): #sampling_rate = dataset_hf[split]["sampling_rate"][0] #audio_total_length_samples = 0 #audio_total_length_samples = sum(len(audio_file["array"]) for audio_file in dataset_hf["test"]["audio"]) audio_length_split_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) audio_length_total_seconds += audio_length_split_seconds samples_split = len(dataset_hf[split]["audio_duration_seconds"]) samples_all += samples_split audio_average_length_seconds = round(audio_length_split_seconds / samples_split,2) out_dict[split] = audio_average_length_seconds #print(split, audio_total_length_hours) # add number of samples for all splits out_dict["all_splits"] = round(audio_length_total_seconds / samples_all,2) return out_dict def speakers_per_split(dataset_hf): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "speakers" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): # extract speakers from file_id speakers_ids_all = [str(fileid).split("-")[4] for fileid in dataset_hf[split]["audioname"]] speakers_ids_uniq = list(set(speakers_ids_all)) speakers_count = len(speakers_ids_uniq) #print(split, speakers_count) out_dict[split] = speakers_count # add number of samples for all splits out_dict["all_splits"] = sum(out_dict.values()) return out_dict def uniq_utts_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "utts_unique" print("Calculating {}".format(metric)) utts_all = [] for split in dataset_hf.keys(): # extract speakers from file_id if (split == "test"): utts_split = dataset_hf_secret[split]["ref_orig"] else: utts_split = dataset_hf[split]["ref_orig"] utts_all = utts_all + utts_split utts_uniq = list(set(utts_split)) utts_uniq_count = len(utts_uniq) #print(split, utts_uniq_count) out_dict[split] = utts_uniq_count # add number of samples for all splits out_dict["all_splits"] = len(list(set(utts_all))) return out_dict,utts_all def words_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "words" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): # extract speakers from file_id if (split == "test"): utts_all = dataset_hf_secret[split]["ref_orig"] else: utts_all = dataset_hf[split]["ref_orig"] utts_lenghts = [len(utt.split(" ")) for utt in utts_all] words_all_count = sum(utts_lenghts) #print(split, words_all_count) out_dict[split] = words_all_count # add number of samples for all splits out_dict["all_splits"] = sum(out_dict.values()) return out_dict def uniq_words_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} out_words_list = [] metric = "words_unique" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): # extract speakers from file_id if (split == "test"): utts_all = dataset_hf_secret[split]["ref_orig"] else: utts_all = dataset_hf[split]["ref_orig"] words_all = " ".join(utts_all).split(" ") words_uniq = list(set(words_all)) out_words_list = out_words_list + words_uniq words_uniq_count = len(words_uniq) #print(split, words_uniq_count) out_dict[split] = words_uniq_count # add number of samples for all splits out_words_uniq = list(set((out_words_list))) out_words_uniq_count = len(out_words_uniq) out_dict["all_splits"] = out_words_uniq_count #print("all", out_words_uniq_count) return out_dict, out_words_uniq def chars_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "chars" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): # extract speakers from file_id if (split=="test"): utts_all = dataset_hf_secret[split]["ref_orig"] else: utts_all = dataset_hf[split]["ref_orig"] words_all = " ".join(utts_all).split(" ") chars_all = " ".join(words_all) chars_all_count = len(chars_all) #print(split, chars_all_count) out_dict[split] = chars_all_count # add number of samples for all splits out_dict["all_splits"] = sum(out_dict.values()) return out_dict def uniq_chars_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} out_chars_list = [] metric = "chars_unique" print("Calculating {}".format(metric)) for split in dataset_hf.keys(): # extract speakers from file_id if(split == "test"): utts_all = dataset_hf_secret[split]["ref_orig"] else: utts_all = dataset_hf[split]["ref_orig"] words_all = " ".join(utts_all).split(" ") words_uniq = list(set(words_all)) chars_uniq = list(set("".join(words_uniq))) chars_uniq_count = len(chars_uniq) + 1 #print(split, chars_uniq_count) out_dict[split] = chars_uniq_count out_chars_list = out_chars_list + chars_uniq # add number of samples for all splits out_chars_uniq = list(set((out_chars_list))) out_chars_uniq_count = len(out_chars_uniq) out_dict["all_splits"] = out_chars_uniq_count #print("all", out_chars_uniq_count) return out_dict, out_chars_uniq def meta_cov_per_split(dataset_hf, meta_field): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split no_meta=False # TODO move to config if meta_field == 'speaker_age': buckets = buckets_age if meta_field == 'speaker_sex': buckets = buckets_sex out_dict = {} metric = "meta_cov_" + meta_field print("Calculating {}".format(metric)) meta_info_all = 0 meta_info_not_null_all = 0 for split in dataset_hf.keys(): # extract speakers from file_id meta_info = dataset_hf[split][meta_field] meta_info_count = len(meta_info) meta_info_all += meta_info_count # calculate coverage meta_info_not_null_count = len([x for x in meta_info if x != "N/A"]) if meta_info_not_null_count == 0: out_dict[split] = "N/A" continue meta_info_not_null_all += meta_info_not_null_count meta_info_coverage = round(meta_info_not_null_count / meta_info_count, 2) #print(split, meta_info_coverage) # add number of samples for all splits out_dict[split] = meta_info_coverage # add number of samples for all splits if (meta_info_not_null_all == 0): out_dict["all_splits"] = "N/A" else: out_dict["all_splits"] = round(meta_info_not_null_all/meta_info_all,2 ) return out_dict def speech_rate_words_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "words_per_second" print("Calculating {}".format(metric)) words_all_count = 0 audio_total_length_seconds = 0 for split in dataset_hf.keys(): # extract speakers from file_id if (split == "test"): utts_split = dataset_hf_secret[split]["ref_orig"] else: utts_split = dataset_hf[split]["ref_orig"] words_split = " ".join(utts_split).split(" ") words_split_count = len(words_split) words_all_count += words_split_count audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) audio_total_length_seconds += audio_split_length_seconds speech_rate = round(words_split_count / audio_split_length_seconds, 2) #print(split, speech_rate) out_dict[split] = speech_rate # add number of samples for all splits out_dict["all_splits"] = round(words_all_count / audio_total_length_seconds, 2) return out_dict def speech_rate_chars_per_split(dataset_hf, dataset_hf_secret): # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "chars_per_second" print("Calculating {}".format(metric)) chars_all_count = 0 audio_total_length_seconds = 0 for split in dataset_hf.keys(): # extract speakers from file_id if (split == "test"): utts_split = dataset_hf_secret[split]["ref_orig"] else: utts_split = dataset_hf[split]["ref_orig"] words_split = " ".join(utts_split).split(" ") chars_split_count = len("".join(words_split)) chars_all_count += chars_split_count audio_split_length_seconds = sum(dataset_hf[split]["audio_duration_seconds"]) audio_total_length_seconds += audio_split_length_seconds speech_rate = round(chars_split_count / audio_split_length_seconds, 2) #print(split, speech_rate) out_dict[split] = speech_rate # add number of samples for all splits out_dict["all_splits"] = round(chars_all_count / audio_total_length_seconds, 2) return out_dict # distribution of speaker age def meta_distribution_text(dataset_hf, meta_field): no_meta=False if meta_field == 'speaker_age': buckets = buckets_age if meta_field == 'speaker_sex': buckets = buckets_sex # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict = {} metric = "distribution_" + meta_field print("Calculating {}".format(metric)) values_count_total = {} for bucket in buckets: values_count_total[bucket]=0 for split in dataset_hf.keys(): out_dict[split] = {} # extract speakers from file_id meta_info = dataset_hf[split][meta_field] meta_info_not_null = [x for x in meta_info if x != "N/A"] if len(meta_info_not_null) == 0: out_dict[split]="N/A" no_meta=True continue for bucket in buckets: values_count = meta_info_not_null.count(bucket) values_count_total[bucket] += values_count out_dict[split][bucket] = round(values_count/len(meta_info_not_null),2) #print(split, out_dict[split]) # add number of samples for all splits if (no_meta): out_dict["all_splits"] = "N/A" return out_dict out_dict["all_splits"] = {} # calculate total number of samples in values_count_total for bucket in buckets: total_samples = sum(values_count_total.values()) out_dict["all_splits"][bucket] = round(values_count_total[bucket]/total_samples,2) return out_dict def recordings_per_speaker(dataset_hf): recordings_per_speaker_stats_dict = {} # input - huggingface dataset object # output - dictionary with statistics about audio duration per split out_dict_stats = {} out_dict_contents = {} metric = "recordings_per_speaker" print("Calculating {}".format(metric)) recordings_per_speaker_stats_dict_all = {} recordings_total=0 speakers_total = 0 for split in dataset_hf.keys(): # extract speakers from file_id audiopaths = dataset_hf[split]["audioname"] speaker_prefixes = [str(fileid).split("-")[0:5] for fileid in audiopaths] speakers_dict_split = {} # create dictionary with list of audio paths matching speaker prefix # Create initial dictionary keys from speaker prefixes for speaker_prefix in speaker_prefixes: speaker_prefix_str = "-".join(speaker_prefix) speakers_dict_split[speaker_prefix_str] = [] # Populate the dictionary with matching audio paths for audio_path in audiopaths: for speaker_prefix_str in speakers_dict_split.keys(): if speaker_prefix_str in audio_path: speakers_dict_split[speaker_prefix_str].append(audio_path) # iterate of speaker_dict prefixes and calculate number of recordings per speaker. recordings_per_speaker_stats_dict_split = {} for speaker_prefix_str in speakers_dict_split.keys(): recordings_per_speaker_stats_dict_split[speaker_prefix_str] = len(speakers_dict_split[speaker_prefix_str]) out_dict_contents[split] = {} out_dict_contents[split] = recordings_per_speaker_stats_dict_split # use recordings_per_speaker_stats to calculate statistics like min, max, avg, median, std out_dict_stats[split] = {} speakers_split = len(list(recordings_per_speaker_stats_dict_split.keys())) speakers_total += speakers_split recordings_split = len(audiopaths) recordings_total += recordings_split average_recordings_per_speaker = round( recordings_split / speakers_split,2) out_dict_stats[split]["average"] = average_recordings_per_speaker out_dict_stats[split]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_split.values())),2) out_dict_stats[split]["median"] = np.median(list(recordings_per_speaker_stats_dict_split.values())) out_dict_stats[split]["min"] = min(recordings_per_speaker_stats_dict_split.values()) out_dict_stats[split]["max"] = max(recordings_per_speaker_stats_dict_split.values()) recordings_per_speaker_stats_dict_all = recordings_per_speaker_stats_dict_all | recordings_per_speaker_stats_dict_split # add number of samples for all splits average_recordings_per_speaker_all = round( recordings_total / speakers_total , 2) out_dict_stats["all_splits"] = {} out_dict_stats["all_splits"]["average"] = average_recordings_per_speaker_all out_dict_stats["all_splits"]["std"] = round(np.std(list(recordings_per_speaker_stats_dict_all.values())),2) out_dict_stats["all_splits"]["median"] = np.median(list(recordings_per_speaker_stats_dict_all.values())) out_dict_stats["all_splits"]["min"] = min(recordings_per_speaker_stats_dict_all.values()) out_dict_stats["all_splits"]["max"] = max(recordings_per_speaker_stats_dict_all.values()) out_dict_contents["all_splits"] = recordings_per_speaker_stats_dict_all return out_dict_stats, out_dict_contents def meta_distribution_bar_plot(dataset_hf, output_dir, dimension = "speaker_sex"): pass def meta_distribution_violin_plot(dataset_hf, output_dir, metric = "audio_duration_seconds", dimension = "speaker_sex"): # input - huggingface dataset object # output - figure with distribution of audio duration per sex out_dict = {} print("Generating violin plat for metric {} for dimension {}".format(metric, dimension)) # drop samples for which dimension column values are equal to "N/A" for split in dataset_hf.keys(): df_dataset = pd.DataFrame(dataset_hf[split]) # remove values equal to "N/A" for column dimension df_filtered = df_dataset[df_dataset[dimension] != "N/A"] df_filtered = df_filtered[df_filtered[dimension] != "other"] df_filtered = df_filtered[df_filtered[dimension] != "unknown"] if df_filtered.empty: print("No data for split {} and dimension {}".format(split, dimension)) continue if (len(df_filtered)>=5000): sample_size = 5000 print("Selecting sample of size {}".format(sample_size)) else: sample_size = len(df_filtered) print("Selecting full split of size {}".format(sample_size)) df = df_filtered.sample(sample_size) # if df_filtered is empty, skip violin plot generation for this split and dimension print("Generating plot") plt.figure(figsize=(20, 15)) plot = sns.violinplot(data = df, hue=dimension, x='dataset', y=metric, split=True, fill = False,inner = 'quart', legend='auto', common_norm=True) plot.set_xticklabels(plot.get_xticklabels(), rotation = 30, horizontalalignment = 'right') plt.title('Violin plot of {} by {} for split {}'.format(metric, dimension, split)) plt.xlabel(dimension) plt.ylabel(metric) #plt.show( # save figure to file os.makedirs(output_dir, exist_ok=True) output_fn = os.path.join(output_dir, metric + "-" + dimension + "-" + split + ".png") plt.savefig(output_fn) print("Plot generation completed") def read_reports(dataset_name): json_contents = "./reports/{}/dataset_contents.json".format(dataset_name) json_stats = "reports/{}/dataset_statistics.json".format(dataset_name) with open(json_contents, 'r') as file: contents_dict = json.load(file) with open(json_stats, 'r') as file: stats_dict = json.load(file) return(stats_dict, contents_dict) def add_test_split_stats_from_secret_dataset(stats_dict_public, stats_dict_secret): # merge contents if dictionaries for fields utts, words, words_unique, chars, chars_unique and speech_rate for dataset in stats_dict_public.keys(): print(dataset) for metric in stats_dict_secret[dataset].keys(): for split in stats_dict_secret[dataset][metric].keys(): if split == "test": stats_dict_public[dataset][metric][split] = stats_dict_secret[dataset][metric][split] return(stats_dict_public) def dict_to_multindex_df(dict_in, all_splits=False): # Creating a MultiIndex DataFrame rows = [] for dataset, metrics in dict_in.items(): if (dataset == "all"): continue for metric, splits in metrics.items(): for split, value in splits.items(): if (all_splits): if (split == "all_splits"): rows.append((dataset, metric, split, value)) else: if (split == "all_splits"): continue rows.append((dataset, metric, split, value)) # Convert to DataFrame df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value']) df.set_index(['dataset', 'metric', 'split'], inplace=True) return(df) def dict_to_multindex_df_all_splits(dict_in): # Creating a MultiIndex DataFrame rows = [] for dataset, metrics in dict_in.items(): if (dataset == "all"): continue for metric, splits in metrics.items(): for split, value in splits.items(): if (split == "all_splits"): rows.append((dataset, metric, split, value)) # Convert to DataFrame df = pd.DataFrame(rows, columns=['dataset', 'metric', 'split', 'value']) df.set_index(['dataset', 'metric', 'split'], inplace=True) return(df) def extract_stats_to_agg(df_multindex_per_split, metrics, add_total=True): # input - multiindex dataframe has three indexes - dataset, metric, split # select only relevant metrics df_agg_splits = df_multindex_per_split.loc[(slice(None), metrics), :] # unstack - move rows per split to columns df_agg_splits = df_agg_splits.unstack(level ='split') # aggregate values for all splits df_agg_splits['value', 'total'] = df_agg_splits['value'].sum(axis=1) # drop columns with splits df_agg_splits.columns = df_agg_splits.columns.droplevel(0) columns_to_drop = ['test', 'train', 'validation'] df_agg_splits.drop(columns = columns_to_drop, inplace = True) # move rows corresponding to specific metrics into specific columns df_agg_splits = df_agg_splits.unstack(level ='metric') df_agg_splits.columns = df_agg_splits.columns.droplevel(0) if(add_total): # add row with the sum of all rows df_agg_splits.loc['total'] = df_agg_splits.sum() return(df_agg_splits) def extract_stats_all_splits(df_multiindex_all_splits, metrics): df_all_splits = df_multiindex_all_splits.loc[(slice(None), metrics), :] df_all_splits = df_all_splits.unstack(level ='metric') df_all_splits.columns = df_all_splits.columns.droplevel(0) #print(df_all_splits) df_all_splits = df_all_splits.droplevel('split', axis=0) return(df_all_splits) def extract_stats_for_dataset_card(df_multindex_per_split, subset, metrics, add_total=False): #print(df_multindex_per_split) df_metrics_subset = df_multindex_per_split df_metrics_subset = df_metrics_subset.unstack(level ='split') df_metrics_subset.columns = df_metrics_subset.columns.droplevel(0) df_metrics_subset = df_metrics_subset.loc[(slice(None), metrics), :] df_metrics_subset = df_metrics_subset.query("dataset == '{}'".format(subset)) # change order of columns to train validation test df_metrics_subset.reset_index(inplace=True) if (add_total): new_columns = ['metric', 'train', 'validation', 'test', 'total'] total = df_metrics_subset[['train', 'validation','test']].sum(axis=1) df_metrics_subset['total'] = total else: new_columns = ['metric', 'train', 'validation', 'test'] df_metrics_subset = df_metrics_subset.reindex(columns=new_columns) df_metrics_subset.set_index('metric', inplace=True) return(df_metrics_subset)