import gradio as gr # from transformers import Wav2Vec2FeatureExtractor from transformers import AutoModel import torch from torch import nn import torchaudio import torchaudio.transforms as T import logging import json import os import re import os import random import pandas as pd import importlib modeling_MERT = importlib.import_module("MERT-v1-95M.modeling_MERT") from Prediction_Head.MTGGenre_head import MLPProberBase # input cr: https://huggingface.co/spaces/thealphhamerc/audio-to-text/blob/main/app.py logger = logging.getLogger("MERT-v1-95M-app") logger.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s;%(levelname)s;%(message)s", "%Y-%m-%d %H:%M:%S") ch.setFormatter(formatter) logger.addHandler(ch) inputs = [ gr.components.Audio(type="filepath", label="Add music audio file"), ] title = "Isai - toward better music understanding" description = "This space uses MERT-95M model to peform various music information retrieval tasks." audio_examples = [ ["samples/143.mp3"], ["samples/205.mp3"], ["samples/429.mp3"], ["samples/997.mp3"], ] df_init = pd.DataFrame(columns=['Task', 'Top 1', 'Top 2', 'Top 3', 'Top 4', 'Top 5']) transcription_df = gr.DataFrame(value=df_init, label="Model Results", row_count=( 0, "dynamic"), max_rows=30, wrap=True, overflow_row_behaviour='paginate') # outputs = [gr.components.Textbox()] df_init_live = pd.DataFrame(columns=['Task', 'Top 1', 'Top 2', 'Top 3', 'Top 4', 'Top 5']) transcription_df_live = gr.DataFrame(value=df_init_live, label="Model Results", row_count=( 0, "dynamic"), max_rows=30, wrap=True, overflow_row_behaviour='paginate') outputs_live = transcription_df_live # Load the model and the corresponding preprocessor config # model = AutoModel.from_pretrained("m-a-p/MERT-v0-public", trust_remote_code=True) # processor = Wav2Vec2FeatureExtractor.from_pretrained("m-a-p/MERT-v0-public",trust_remote_code=True) model = modeling_MERT.MERTModel.from_pretrained("./MERT-v1-95M") processor = Wav2Vec2FeatureExtractor.from_pretrained("./MERT-v1-95M") device = 'cuda' if torch.cuda.is_available() else 'cpu' MERT_BEST_LAYER_IDX = { 'EMO': 5, 'GS': 8, 'GTZAN': 7, 'MTGGenre': 7, 'MTGInstrument': 'all', 'MTGMood': 6, 'MTGTop50': 6, 'MTT': 'all', 'NSynthI': 6, 'NSynthP': 1, 'VocalSetS': 2, 'VocalSetT': 9, } MERT_BEST_LAYER_IDX = { 'EMO': 5, 'GS': 8, 'GTZAN': 7, 'MTGGenre': 7, 'MTGInstrument': 'all', 'MTGMood': 6, 'MTGTop50': 6, 'MTT': 'all', 'NSynthI': 6, 'NSynthP': 1, 'VocalSetS': 2, 'VocalSetT': 9, } CLASSIFIERS = { } ID2CLASS = { } #TASKS = ['GS', 'MTGInstrument', 'MTGGenre', 'MTGTop50', 'MTGMood', 'NSynthI', 'NSynthP', 'VocalSetS', 'VocalSetT','EMO',] TASKS = ['GS', 'MTGInstrument', 'MTGGenre', 'MTGMood', 'EMO'] TASK_LABELS = { 'GS': 'Scale', 'MTGInstrument': 'Instruments', 'MTGGenre': 'Genre', 'MTGMood': 'Mood', 'EMO': 'Emotion (Valence/Arousal' } Regression_TASKS = ['EMO'] head_dir = './Prediction_Head/best-layer-MERT-v1-95M' for task in TASKS: print('loading', task) with open(os.path.join(head_dir,f'{task}.id2class.json'), 'r') as f: ID2CLASS[task]=json.load(f) num_class = len(ID2CLASS[task].keys()) CLASSIFIERS[task] = MLPProberBase(d=768, layer=MERT_BEST_LAYER_IDX[task], num_outputs=num_class) CLASSIFIERS[task].load_state_dict(torch.load(f'{head_dir}/{task}.ckpt')['state_dict']) CLASSIFIERS[task].to(device) model.to(device) def model_infernce(inputs): waveform, sample_rate = torchaudio.load(inputs) resample_rate = processor.sampling_rate # make sure the sample_rate aligned if resample_rate != sample_rate: # print(f'setting rate from {sample_rate} to {resample_rate}') resampler = T.Resample(sample_rate, resample_rate) waveform = resampler(waveform) waveform = waveform.view(-1,) # make it (n_sample, ) model_inputs = processor(waveform, sampling_rate=resample_rate, return_tensors="pt") model_inputs.to(device) with torch.no_grad(): model_outputs = model(**model_inputs, output_hidden_states=True) # take a look at the output shape, there are 13 layers of representation # each layer performs differently in different downstream tasks, you should choose empirically all_layer_hidden_states = torch.stack(model_outputs.hidden_states).squeeze()[1:,:,:].unsqueeze(0) print(all_layer_hidden_states.shape) # [13 layer, Time steps, 768 feature_dim] all_layer_hidden_states = all_layer_hidden_states.mean(dim=2) task_output_texts = "" df = pd.DataFrame(columns=['Task', 'Top 1', 'Top 2', 'Top 3', 'Top 4', 'Top 5']) df_objects = [] for task in TASKS: num_class = len(ID2CLASS[task].keys()) if MERT_BEST_LAYER_IDX[task] == 'all': logits = CLASSIFIERS[task](all_layer_hidden_states) # [1, 87] else: logits = CLASSIFIERS[task](all_layer_hidden_states[:, MERT_BEST_LAYER_IDX[task]]) # print(f'task {task} logits:', logits.shape, 'num class:', num_class) sorted_idx = torch.argsort(logits, dim = -1, descending=True)[0] # batch =1 sorted_prob,_ = torch.sort(nn.functional.softmax(logits[0], dim=-1), dim=-1, descending=True) # print(sorted_prob) # print(sorted_prob.shape) top_n_show = 5 if num_class >= 5 else num_class # task_output_texts = task_output_texts + f"TASK {task} output:\n" + "\n".join([str(ID2CLASS[task][str(sorted_idx[idx].item())])+f', probability: {sorted_prob[idx].item():.2%}' for idx in range(top_n_show)]) + '\n' # task_output_texts = task_output_texts + '----------------------\n' row_elements = [TASK_LABELS[task]] for idx in range(top_n_show): print(ID2CLASS[task]) # print('id', str(sorted_idx[idx].item())) output_class_name = str(ID2CLASS[task][str(sorted_idx[idx].item())]) output_class_name = re.sub(r'^\w+---', '', output_class_name) output_class_name = re.sub(r'^\w+\/\w+---', '', output_class_name) # print('output name', output_class_name) output_prob = f' {sorted_prob[idx].item():.2%}' row_elements.append(output_class_name+output_prob) # fill empty elment for _ in range(5+1 - len(row_elements)): row_elements.append(' ') df_objects.append(row_elements) df = pd.DataFrame(df_objects, columns=['Task', 'Top 1', 'Top 2', 'Top 3', 'Top 4', 'Top 5']) return df COLORS = [ ["#ff0000", "#00ff00"], ["#00ff00", "#0000ff"], ["#0000ff", "#ff0000"], ] def convert_audio(audio): df = model_infernce(audio) waveform = gr.make_waveform(audio, bars_color=random.choice(COLORS), bar_count=80, bar_width=0.4) return df, waveform def audio_waveform(audio): return gr.make_waveform(audio) def live_convert_audio(microphone): if (microphone is not None): inputs = microphone df = model_infernce(inputs) return df outputs = [transcription_df, gr.Video(label="Waveform")] audio_chunked = gr.Interface( fn=convert_audio, inputs=inputs, outputs=outputs, allow_flagging="never", title=title, description=description, # article=article, examples=audio_examples, ) # live_audio_chunked = gr.Interface( # fn=live_convert_audio, # inputs=live_inputs, # outputs=outputs_live, # allow_flagging="never", # title=title, # description=description, # article=article, # # examples=audio_examples, # live=True, # ) # demo = gr.Blocks() # with demo: # gr.TabbedInterface( # [ # audio_chunked, # live_audio_chunked, # ], # [ # "Audio File or Recording", # "Live Streaming Music" # ] # ) # demo.queue(concurrency_count=1, max_size=5) server_name = os.environ.get('GRADIO_SERVER_NAME', "127.0.0.1") audio_chunked.launch(server_name=server_name, show_api=False)