import numpy as np import matplotlib.pyplot as plt from matplotlib.cm import ScalarMappable from matplotlib.patches import Rectangle from librosa.sequence import viterbi_discriminative , dtw from librosa import note_to_hz,midi_to_hz from numba import jit from scipy.stats import norm from scipy.ndimage import gaussian_filter1d from scipy.signal import medfilt ,upfirdn,argrelmax from torchaudio.models.conformer import ConformerLayer from torchaudio import load as torchaudio_load from torchaudio.functional import resample as torchaudio_functional_resample from torch import cat as torch_cat , load as torch_load ,Tensor as torch_Tensor , from_numpy as torch_from_numpy,no_grad as torch_no_grad ,mean as torch_mean,std as torch_std,sigmoid as torch_sigmoid,nan_to_num as torch_nan_to_num,nn from sklearn.metrics.pairwise import euclidean_distances from mir_eval.melody import hz2cents from pretty_midi import PrettyMIDI , Instrument , Note , PitchBend , instrument_name_to_program ,note_name_to_number from time import perf_counter from collections import defaultdict from typing import DefaultDict, Dict, List, Optional, Tuple from pathlib import Path from mido import MidiFile,MidiTrack class PitchEstimator(nn.Module): """ This is the base class that everything else inherits from. The hierarchy is: PitchEstimator -> Transcriber -> Synchronizer -> AutonomousAgent -> The n-Head Music Performance Analysis Models PitchEstimator can handle reading the audio, predicting all the features, estimating a single frame level f0 using viterbi, or MIDI pitch bend creation for the predicted note events when used inside a Transcriber, or score-informed f0 estimation when used inside a Synchronizer. """ def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): super().__init__() self.labeling = labeling self.sr = sr self.window_size = window_size self.hop_length = hop_length self.instrument = instrument self.f0_bins_per_semitone = int(np.round(100/self.labeling.f0_granularity_c)) def read_audio(self, audio): """ Read and resample an audio file, convert to mono, and unfold into representation frames. The time array represents the center of each small frame with 5.8ms hop length. This is different than the chunk level frames. The chunk level frames represent the entire sequence the model sees. Whereas it predicts with the small frames intervals (5.8ms). :param audio: str, pathlib.Path, np.ndarray, or torch.Tensor :return: frames: (n_big_frames, frame_length), times: (n_small_frames,) """ if isinstance(audio, str) or isinstance(audio, Path): audio, sample_rate = torchaudio_load(audio, normalize=True) audio = audio.mean(axis=0) # convert to mono if sample_rate != self.sr: audio = torchaudio_functional_resample(audio, sample_rate, self.sr) elif isinstance(audio, np.ndarray): audio = torch_from_numpy(audio) else: assert isinstance(audio, torch_Tensor) len_audio = audio.shape[-1] n_frames = int(np.ceil((len_audio + sum(self.frame_overlap)) / (self.hop_length * self.chunk_size))) audio = nn.functional.pad(audio, (self.frame_overlap[0], self.frame_overlap[1] + (n_frames * self.hop_length * self.chunk_size) - len_audio)) frames = audio.unfold(0, self.max_window_size, self.hop_length*self.chunk_size) times = np.arange(0, len_audio, self.hop_length) / self.sr # not tensor, we don't compute anything with it return frames, times def predict(self, audio, batch_size): frames, times = self.read_audio(audio) performance = {'f0': [], 'note': [], 'onset': [], 'offset': []} self.eval() device = self.main.conv0.conv2d.weight.device with torch_no_grad(): for i in range(0, len(frames), batch_size): f = frames[i:min(i + batch_size, len(frames))].to(device) f -= (torch_mean(f, axis=1).unsqueeze(-1)) f /= (torch_std(f, axis=1).unsqueeze(-1)) out = self.forward(f) for key, value in out.items(): value = torch_sigmoid(value) value = torch_nan_to_num(value) # the model outputs nan when the frame is silent (this is an expected behavior due to normalization) value = value.view(-1, value.shape[-1]) value = value.detach().cpu().numpy() performance[key].append(value) performance = {key: np.concatenate(value, axis=0)[:len(times)] for key, value in performance.items()} performance['time'] = times return performance def estimate_pitch(self, audio, batch_size, viterbi=False): out = self.predict(audio, batch_size) f0_hz = self.out2f0(out, viterbi) return out['time'], f0_hz def out2f0(self, out, viterbi=False): """ Monophonic f0 estimation from the model output. The viterbi postprocessing is specialized for the violin family. """ salience = out['f0'] if viterbi == 'constrained': assert hasattr(self, 'out2note') notes = spotify_create_notes( out["note"], out["onset"], note_low=self.labeling.midi_centers[0], note_high=self.labeling.midi_centers[-1], onset_thresh=0.5, frame_thresh=0.3, infer_onsets=True, melodia_trick=True, min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length)))) note_cents = self.get_pitch_bends(salience, notes, to_midi=False, timing_refinement_range=0) cents = np.zeros_like(out['time']) cents[note_cents[:,0].astype(int)] = note_cents[:,1] elif viterbi: # transition probabilities inducing continuous pitch # big changes are penalized with one order of magnitude transition = gaussian_filter1d(np.eye(self.labeling.f0_n_bins), 30) + 99 * gaussian_filter1d( np.eye(self.labeling.f0_n_bins), 2) transition = transition / np.sum(transition, axis=1)[:, None] p = salience / salience.sum(axis=1)[:, None] p[np.isnan(p.sum(axis=1)), :] = np.ones(self.labeling.f0_n_bins) * 1 / self.labeling.f0_n_bins path = viterbi_discriminative(p.T, transition) cents = np.array([self.labeling.f0_label2c(salience[i, :], path[i]) for i in range(len(path))]) else: cents = self.labeling.f0_label2c(salience, center=None) # use argmax for center f0_hz = self.labeling.f0_c2hz(cents) f0_hz[np.isnan(f0_hz)] = 0 return f0_hz def get_pitch_bends( self, contours: np.ndarray, note_events: List[Tuple[int, int, int, float]], timing_refinement_range: int = 0, to_midi: bool = True, ) -> List[Tuple[int, int, int, float, Optional[List[int]]]]: """Modified version of an excellent script from Spotify/basic_pitch!! Thank you!!!! Given note events and contours, estimate pitch bends per note. Pitch bends are represented as a sequence of evenly spaced midi pitch bend control units. The time stamps of each pitch bend can be inferred by computing an evenly spaced grid between the start and end times of each note event. Args: contours: Matrix of estimated pitch contours note_events: note event tuple timing_refinement_range: if > 0, refine onset/offset boundaries with f0 confidence to_midi: whether to convert pitch bends to midi pitch bends. If False, return pitch estimates in the format [time (index), pitch (Hz), confidence in range [0, 1]]. Returns: note events with pitch bends """ f0_matrix = [] # [time (index), pitch (Hz), confidence in range [0, 1]] note_events_with_pitch_bends = [] for start_idx, end_idx, pitch_midi, amplitude in note_events: if timing_refinement_range: start_idx = np.max([0, start_idx - timing_refinement_range]) end_idx = np.min([contours.shape[0], end_idx + timing_refinement_range]) freq_idx = int(np.round(self.midi_pitch_to_contour_bin(pitch_midi))) freq_start_idx = np.max([freq_idx - self.labeling.f0_tolerance_bins, 0]) freq_end_idx = np.min([self.labeling.f0_n_bins, freq_idx + self.labeling.f0_tolerance_bins + 1]) trans_start_idx = np.max([0, self.labeling.f0_tolerance_bins - freq_idx]) trans_end_idx = (2 * self.labeling.f0_tolerance_bins + 1) - \ np.max([0, freq_idx - (self.labeling.f0_n_bins - self.labeling.f0_tolerance_bins - 1)]) # apply regional viterbi to estimate the intonation # observation probabilities come from the f0_roll matrix observation = contours[start_idx:end_idx, freq_start_idx:freq_end_idx] observation = observation / observation.sum(axis=1)[:, None] observation[np.isnan(observation.sum(axis=1)), :] = np.ones(freq_end_idx - freq_start_idx) * 1 / ( freq_end_idx - freq_start_idx) # transition probabilities assure continuity transition = self.labeling.f0_transition_matrix[trans_start_idx:trans_end_idx, trans_start_idx:trans_end_idx] + 1e-6 transition = transition / np.sum(transition, axis=1)[:, None] path = viterbi_discriminative(observation.T / observation.sum(axis=1), transition) + freq_start_idx cents = np.array([self.labeling.f0_label2c(contours[i + start_idx, :], path[i]) for i in range(len(path))]) bends = cents - self.labeling.midi_centers_c[pitch_midi - self.labeling.midi_centers[0]] if to_midi: bends = (bends * 4096 / 100).astype(int) bends[bends > 8191] = 8191 bends[bends < -8192] = -8192 if timing_refinement_range: confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))]) threshold = np.median(confidences) threshold = (np.median(confidences > threshold) + threshold) / 2 # some magic median_kernel = 2 * (timing_refinement_range // 2) + 1 # some more magic confidences = medfilt(confidences, kernel_size=median_kernel) conf_bool = confidences > threshold onset_idx = np.argmax(conf_bool) offset_idx = len(confidences) - np.argmax(conf_bool[::-1]) bends = bends[onset_idx:offset_idx] start_idx = start_idx + onset_idx end_idx = start_idx + offset_idx note_events_with_pitch_bends.append((start_idx, end_idx, pitch_midi, amplitude, bends)) else: confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))]) time_idx = np.arange(len(path)) + start_idx # f0_hz = self.labeling.f0_c2hz(cents) possible_f0s = np.array([time_idx, cents, confidences]).T f0_matrix.append(possible_f0s[np.abs(bends)<100]) # filter out pitch bends that are too large if not to_midi: return np.vstack(f0_matrix) else: return note_events_with_pitch_bends def midi_pitch_to_contour_bin(self, pitch_midi: int) -> np.array: """Convert midi pitch to corresponding index in contour matrix Args: pitch_midi: pitch in midi Returns: index in contour matrix """ pitch_hz = midi_to_hz(pitch_midi) return np.argmin(np.abs(self.labeling.f0_centers_hz - pitch_hz)) # SPOTIFY def get_inferred_onsets(onset_roll: np.array, note_roll: np.array, n_diff: int = 2) -> np.array: """ Infer onsets from large changes in note roll matrix amplitudes. Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py :param onset_roll: Onset activation matrix (n_times, n_freqs). :param note_roll: Frame-level note activation matrix (n_times, n_freqs). :param n_diff: Differences used to detect onsets. :return: The maximum between the predicted onsets and its differences. """ diffs = [] for n in range(1, n_diff + 1): frames_appended = np.concatenate([np.zeros((n, note_roll.shape[1])), note_roll]) diffs.append(frames_appended[n:, :] - frames_appended[:-n, :]) frame_diff = np.min(diffs, axis=0) frame_diff[frame_diff < 0] = 0 frame_diff[:n_diff, :] = 0 frame_diff = np.max(onset_roll) * frame_diff / np.max(frame_diff) # rescale to have the same max as onsets max_onsets_diff = np.max([onset_roll, frame_diff], axis=0) # use the max of the predicted onsets and the differences return max_onsets_diff def spotify_create_notes( note_roll: np.array, onset_roll: np.array, onset_thresh: float, frame_thresh: float, min_note_len: int, infer_onsets: bool, note_low : int, #self.labeling.midi_centers[0] note_high : int, #self.labeling.midi_centers[-1], melodia_trick: bool = True, energy_tol: int = 11, ) -> List[Tuple[int, int, int, float]]: """Decode raw model output to polyphonic note events Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py Args: note_roll: Frame activation matrix (n_times, n_freqs). onset_roll: Onset activation matrix (n_times, n_freqs). onset_thresh: Minimum amplitude of an onset activation to be considered an onset. frame_thresh: Minimum amplitude of a frame activation for a note to remain "on". min_note_len: Minimum allowed note length in frames. infer_onsets: If True, add additional onsets when there are large differences in frame amplitudes. melodia_trick : Whether to use the melodia trick to better detect notes. energy_tol: Drop notes below this energy. Returns: list of tuples [(start_time_frames, end_time_frames, pitch_midi, amplitude)] representing the note events, where amplitude is a number between 0 and 1 """ n_frames = note_roll.shape[0] # use onsets inferred from frames in addition to the predicted onsets if infer_onsets: onset_roll = get_inferred_onsets(onset_roll, note_roll) peak_thresh_mat = np.zeros(onset_roll.shape) peaks = argrelmax(onset_roll, axis=0) peak_thresh_mat[peaks] = onset_roll[peaks] onset_idx = np.where(peak_thresh_mat >= onset_thresh) onset_time_idx = onset_idx[0][::-1] # sort to go backwards in time onset_freq_idx = onset_idx[1][::-1] # sort to go backwards in time remaining_energy = np.zeros(note_roll.shape) remaining_energy[:, :] = note_roll[:, :] # loop over onsets note_events = [] for note_start_idx, freq_idx in zip(onset_time_idx, onset_freq_idx): # if we're too close to the end of the audio, continue if note_start_idx >= n_frames - 1: continue # find time index at this frequency band where the frames drop below an energy threshold i = note_start_idx + 1 k = 0 # number of frames since energy dropped below threshold while i < n_frames - 1 and k < energy_tol: if remaining_energy[i, freq_idx] < frame_thresh: k += 1 else: k = 0 i += 1 i -= k # go back to frame above threshold # if the note is too short, skip it if i - note_start_idx <= min_note_len: continue remaining_energy[note_start_idx:i, freq_idx] = 0 if freq_idx < note_high: remaining_energy[note_start_idx:i, freq_idx + 1] = 0 if freq_idx > note_low: remaining_energy[note_start_idx:i, freq_idx - 1] = 0 # add the note amplitude = np.mean(note_roll[note_start_idx:i, freq_idx]) note_events.append( ( note_start_idx, i, freq_idx + note_low, amplitude, ) ) if melodia_trick: energy_shape = remaining_energy.shape while np.max(remaining_energy) > frame_thresh: i_mid, freq_idx = np.unravel_index(np.argmax(remaining_energy), energy_shape) remaining_energy[i_mid, freq_idx] = 0 # forward pass i = i_mid + 1 k = 0 while i < n_frames - 1 and k < energy_tol: if remaining_energy[i, freq_idx] < frame_thresh: k += 1 else: k = 0 remaining_energy[i, freq_idx] = 0 if freq_idx < note_high: remaining_energy[i, freq_idx + 1] = 0 if freq_idx > note_low: remaining_energy[i, freq_idx - 1] = 0 i += 1 i_end = i - 1 - k # go back to frame above threshold # backward pass i = i_mid - 1 k = 0 while i > 0 and k < energy_tol: if remaining_energy[i, freq_idx] < frame_thresh: k += 1 else: k = 0 remaining_energy[i, freq_idx] = 0 if freq_idx < note_high: remaining_energy[i, freq_idx + 1] = 0 if freq_idx > note_low: remaining_energy[i, freq_idx - 1] = 0 i -= 1 i_start = i + 1 + k # go back to frame above threshold assert i_start >= 0, "{}".format(i_start) assert i_end < n_frames if i_end - i_start <= min_note_len: # note is too short, skip it continue # add the note amplitude = np.mean(note_roll[i_start:i_end, freq_idx]) note_events.append( ( i_start, i_end, freq_idx + note_low, amplitude, ) ) return note_events # TIKTOK def note_detection_with_onset_offset_regress(frame_output, onset_output, onset_shift_output, offset_output, offset_shift_output, velocity_output, frame_threshold): """Process prediction matrices to note events information. First, detect onsets with onset outputs. Then, detect offsets with frame and offset outputs. Args: frame_output: (frames_num,) onset_output: (frames_num,) onset_shift_output: (frames_num,) offset_output: (frames_num,) offset_shift_output: (frames_num,) velocity_output: (frames_num,) frame_threshold: float Returns: output_tuples: list of [bgn, fin, onset_shift, offset_shift, normalized_velocity], e.g., [ [1821, 1909, 0.47498, 0.3048533, 0.72119445], [1909, 1947, 0.30730522, -0.45764327, 0.64200014], ...] """ output_tuples = [] bgn = None frame_disappear = None offset_occur = None for i in range(onset_output.shape[0]): if onset_output[i] == 1: """Onset detected""" if bgn: """Consecutive onsets. E.g., pedal is not released, but two consecutive notes being played.""" fin = max(i - 1, 0) output_tuples.append([bgn, fin, onset_shift_output[bgn], 0, velocity_output[bgn]]) frame_disappear, offset_occur = None, None bgn = i if bgn and i > bgn: """If onset found, then search offset""" if frame_output[i] <= frame_threshold and not frame_disappear: """Frame disappear detected""" frame_disappear = i if offset_output[i] == 1 and not offset_occur: """Offset detected""" offset_occur = i if frame_disappear: if offset_occur and offset_occur - bgn > frame_disappear - offset_occur: """bgn --------- offset_occur --- frame_disappear""" fin = offset_occur else: """bgn --- offset_occur --------- frame_disappear""" fin = frame_disappear output_tuples.append([bgn, fin, onset_shift_output[bgn], offset_shift_output[fin], velocity_output[bgn]]) bgn, frame_disappear, offset_occur = None, None, None if bgn and (i - bgn >= 600 or i == onset_output.shape[0] - 1): """Offset not detected""" fin = i output_tuples.append([bgn, fin, onset_shift_output[bgn], offset_shift_output[fin], velocity_output[bgn]]) bgn, frame_disappear, offset_occur = None, None, None # Sort pairs by onsets output_tuples.sort(key=lambda pair: pair[0]) return output_tuples class RegressionPostProcessor(object): def __init__(self, frames_per_second, classes_num, onset_threshold, offset_threshold, frame_threshold, pedal_offset_threshold, begin_note): """Postprocess the output probabilities of a transription model to MIDI events. Args: frames_per_second: float classes_num: int onset_threshold: float offset_threshold: float frame_threshold: float pedal_offset_threshold: float """ self.frames_per_second = frames_per_second self.classes_num = classes_num self.onset_threshold = onset_threshold self.offset_threshold = offset_threshold self.frame_threshold = frame_threshold self.pedal_offset_threshold = pedal_offset_threshold self.begin_note = begin_note self.velocity_scale = 128 def output_dict_to_midi_events(self, output_dict): """Main function. Post process model outputs to MIDI events. Args: output_dict: { 'reg_onset_output': (segment_frames, classes_num), 'reg_offset_output': (segment_frames, classes_num), 'frame_output': (segment_frames, classes_num), 'velocity_output': (segment_frames, classes_num), 'reg_pedal_onset_output': (segment_frames, 1), 'reg_pedal_offset_output': (segment_frames, 1), 'pedal_frame_output': (segment_frames, 1)} Outputs: est_note_events: list of dict, e.g. [ {'onset_time': 39.74, 'offset_time': 39.87, 'midi_note': 27, 'velocity': 83}, {'onset_time': 11.98, 'offset_time': 12.11, 'midi_note': 33, 'velocity': 88}] est_pedal_events: list of dict, e.g. [ {'onset_time': 0.17, 'offset_time': 0.96}, {'osnet_time': 1.17, 'offset_time': 2.65}] """ output_dict['frame_output'] = output_dict['note'] output_dict['velocity_output'] = output_dict['note'] output_dict['reg_onset_output'] = output_dict['onset'] output_dict['reg_offset_output'] = output_dict['offset'] # Post process piano note outputs to piano note and pedal events information (est_on_off_note_vels, est_pedal_on_offs) = \ self.output_dict_to_note_pedal_arrays(output_dict) """est_on_off_note_vels: (events_num, 4), the four columns are: [onset_time, offset_time, piano_note, velocity], est_pedal_on_offs: (pedal_events_num, 2), the two columns are: [onset_time, offset_time]""" # Reformat notes to MIDI events est_note_events = self.detected_notes_to_events(est_on_off_note_vels) if est_pedal_on_offs is None: est_pedal_events = None else: est_pedal_events = self.detected_pedals_to_events(est_pedal_on_offs) return est_note_events, est_pedal_events def output_dict_to_note_pedal_arrays(self, output_dict): """Postprocess the output probabilities of a transription model to MIDI events. Args: output_dict: dict, { 'reg_onset_output': (frames_num, classes_num), 'reg_offset_output': (frames_num, classes_num), 'frame_output': (frames_num, classes_num), 'velocity_output': (frames_num, classes_num), ...} Returns: est_on_off_note_vels: (events_num, 4), the 4 columns are onset_time, offset_time, piano_note and velocity. E.g. [ [39.74, 39.87, 27, 0.65], [11.98, 12.11, 33, 0.69], ...] est_pedal_on_offs: (pedal_events_num, 2), the 2 columns are onset_time and offset_time. E.g. [ [0.17, 0.96], [1.17, 2.65], ...] """ # ------ 1. Process regression outputs to binarized outputs ------ # For example, onset or offset of [0., 0., 0.15, 0.30, 0.40, 0.35, 0.20, 0.05, 0., 0.] # will be processed to [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.] # Calculate binarized onset output from regression output (onset_output, onset_shift_output) = \ self.get_binarized_output_from_regression( reg_output=output_dict['reg_onset_output'], threshold=self.onset_threshold, neighbour=2) output_dict['onset_output'] = onset_output # Values are 0 or 1 output_dict['onset_shift_output'] = onset_shift_output # Calculate binarized offset output from regression output (offset_output, offset_shift_output) = \ self.get_binarized_output_from_regression( reg_output=output_dict['reg_offset_output'], threshold=self.offset_threshold, neighbour=4) output_dict['offset_output'] = offset_output # Values are 0 or 1 output_dict['offset_shift_output'] = offset_shift_output if 'reg_pedal_onset_output' in output_dict.keys(): """Pedal onsets are not used in inference. Instead, frame-wise pedal predictions are used to detect onsets. We empirically found this is more accurate to detect pedal onsets.""" pass if 'reg_pedal_offset_output' in output_dict.keys(): # Calculate binarized pedal offset output from regression output (pedal_offset_output, pedal_offset_shift_output) = \ self.get_binarized_output_from_regression( reg_output=output_dict['reg_pedal_offset_output'], threshold=self.pedal_offset_threshold, neighbour=4) output_dict['pedal_offset_output'] = pedal_offset_output # Values are 0 or 1 output_dict['pedal_offset_shift_output'] = pedal_offset_shift_output # ------ 2. Process matrices results to event results ------ # Detect piano notes from output_dict est_on_off_note_vels = self.output_dict_to_detected_notes(output_dict) est_pedal_on_offs = None return est_on_off_note_vels, est_pedal_on_offs def get_binarized_output_from_regression(self, reg_output, threshold, neighbour): """Calculate binarized output and shifts of onsets or offsets from the regression results. Args: reg_output: (frames_num, classes_num) threshold: float neighbour: int Returns: binary_output: (frames_num, classes_num) shift_output: (frames_num, classes_num) """ binary_output = np.zeros_like(reg_output) shift_output = np.zeros_like(reg_output) (frames_num, classes_num) = reg_output.shape for k in range(classes_num): x = reg_output[:, k] for n in range(neighbour, frames_num - neighbour): if x[n] > threshold and self.is_monotonic_neighbour(x, n, neighbour): binary_output[n, k] = 1 """See Section III-D in [1] for deduction. [1] Q. Kong, et al., High-resolution Piano Transcription with Pedals by Regressing Onsets and Offsets Times, 2020.""" if x[n - 1] > x[n + 1]: shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n + 1]) / 2 else: shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n - 1]) / 2 shift_output[n, k] = shift return binary_output, shift_output def is_monotonic_neighbour(self, x, n, neighbour): """Detect if values are monotonic in both side of x[n]. Args: x: (frames_num,) n: int neighbour: int Returns: monotonic: bool """ monotonic = True for i in range(neighbour): if x[n - i] < x[n - i - 1]: monotonic = False if x[n + i] < x[n + i + 1]: monotonic = False return monotonic def output_dict_to_detected_notes(self, output_dict): """Postprocess output_dict to piano notes. Args: output_dict: dict, e.g. { 'onset_output': (frames_num, classes_num), 'onset_shift_output': (frames_num, classes_num), 'offset_output': (frames_num, classes_num), 'offset_shift_output': (frames_num, classes_num), 'frame_output': (frames_num, classes_num), 'onset_output': (frames_num, classes_num), ...} Returns: est_on_off_note_vels: (notes, 4), the four columns are onsets, offsets, MIDI notes and velocities. E.g., [[39.7375, 39.7500, 27., 0.6638], [11.9824, 12.5000, 33., 0.6892], ...] """ est_tuples = [] est_midi_notes = [] classes_num = output_dict['frame_output'].shape[-1] for piano_note in range(classes_num): """Detect piano notes""" est_tuples_per_note = note_detection_with_onset_offset_regress( frame_output=output_dict['frame_output'][:, piano_note], onset_output=output_dict['onset_output'][:, piano_note], onset_shift_output=output_dict['onset_shift_output'][:, piano_note], offset_output=output_dict['offset_output'][:, piano_note], offset_shift_output=output_dict['offset_shift_output'][:, piano_note], velocity_output=output_dict['velocity_output'][:, piano_note], frame_threshold=self.frame_threshold) est_tuples += est_tuples_per_note est_midi_notes += [piano_note + self.begin_note] * len(est_tuples_per_note) est_tuples = np.array(est_tuples) # (notes, 5) """(notes, 5), the five columns are onset, offset, onset_shift, offset_shift and normalized_velocity""" est_midi_notes = np.array(est_midi_notes) # (notes,) onset_times = (est_tuples[:, 0] + est_tuples[:, 2]) / self.frames_per_second offset_times = (est_tuples[:, 1] + est_tuples[:, 3]) / self.frames_per_second velocities = est_tuples[:, 4] est_on_off_note_vels = np.stack((onset_times, offset_times, est_midi_notes, velocities), axis=-1) """(notes, 3), the three columns are onset_times, offset_times and velocity.""" est_on_off_note_vels = est_on_off_note_vels.astype(np.float32) return est_on_off_note_vels def detected_notes_to_events(self, est_on_off_note_vels): """Reformat detected notes to midi events. Args: est_on_off_vels: (notes, 3), the three columns are onset_times, offset_times and velocity. E.g. [[32.8376, 35.7700, 0.7932], [37.3712, 39.9300, 0.8058], ...] Returns: midi_events, list, e.g., [{'onset_time': 39.7376, 'offset_time': 39.75, 'midi_note': 27, 'velocity': 84}, {'onset_time': 11.9824, 'offset_time': 12.50, 'midi_note': 33, 'velocity': 88}, ...] """ midi_events = [] for i in range(est_on_off_note_vels.shape[0]): midi_events.append({ 'onset_time': est_on_off_note_vels[i][0], 'offset_time': est_on_off_note_vels[i][1], 'midi_note': int(est_on_off_note_vels[i][2]), 'velocity': int(est_on_off_note_vels[i][3] * self.velocity_scale)}) return midi_events def sync_visualize_step1(cost_matrices: List, num_rows: int, num_cols: int, anchors: np.ndarray, wp: np.ndarray) -> Tuple[plt.Figure, plt.Axes]: fig, ax = plt.subplots(1, 1, dpi=72) ax = __visualize_cost_matrices(ax, cost_matrices) __visualize_constraint_rectangles(anchors[[1, 0], :], edgecolor='firebrick') __visualize_path_in_matrix(ax=ax, wp=wp, axisX=np.arange(0, num_rows), axisY=np.arange(0, num_cols), path_color='firebrick') return fig, ax def sync_visualize_step2(ax: plt.Axes, cost_matrices: list, wp_step2: np.ndarray, wp_step1: np.ndarray, num_rows_step1: int, num_cols_step1: int, anchors_step1: np.ndarray, neighboring_anchors: np.ndarray, plot_title: str = ""): offset_x = neighboring_anchors[0, 0] - 1 offset_y = neighboring_anchors[1, 0] - 1 ax = __visualize_cost_matrices(ax=ax, cost_matrices=cost_matrices, offset_x=offset_x, offset_y=offset_y) __visualize_constraint_rectangles(anchors_step1[[1, 0], :], edgecolor='firebrick') __visualize_path_in_matrix(ax=ax, wp=wp_step1, axisX=np.arange(0, num_rows_step1), axisY=np.arange(0, num_cols_step1), path_color='firebrick') __visualize_constraint_rectangles(neighboring_anchors[[1, 0], :] - 1, edgecolor='orangered', linestyle='--') __visualize_path_in_matrix(ax=ax, wp=wp_step2, axisX=np.arange(0, num_rows_step1), axisY=np.arange(0, num_cols_step1), path_color='orangered') ax.set_title(plot_title) ax.set_ylabel("Version 1 (frames)") ax.set_xlabel("Version 2 (frames)") ax = plt.gca() # get the current axes pcm = None for pcm in ax.get_children(): if isinstance(pcm, ScalarMappable): break plt.colorbar(pcm, ax=ax) plt.tight_layout() plt.show() def __size_dtw_matrices(dtw_matrices: List) -> Tuple[List[np.ndarray], List[np.ndarray]]: """Gives information about the dimensionality of a DTW matrix given in form of a list matrix Parameters ---------- dtw_matrices: list The DTW matrix (cost matrix or accumulated cost matrix) given in form a list. Returns ------- axisX_list: list A list containing a horizontal axis for each of the sub matrices which specifies the horizontal position of the respective submatrix in the overall cost matrix. axis_y_list: list A list containing a vertical axis for each of the sub matrices which specifies the vertical position of the respective submatrix in the overall cost matrix. """ num_matrices = len(dtw_matrices) size_list = [dtw_mat.shape for dtw_mat in dtw_matrices] axis_x_list = list() axis_y_list = list() x_acc = 0 y_acc = 0 for i in range(num_matrices): curr_size_list = size_list[i] axis_x_list.append(np.arange(x_acc, x_acc + curr_size_list[0])) axis_y_list.append(np.arange(y_acc, y_acc + curr_size_list[1])) x_acc += curr_size_list[0] - 1 y_acc += curr_size_list[1] - 1 return axis_x_list, axis_y_list def __visualize_cost_matrices(ax: plt.Axes, cost_matrices: list = None, offset_x: float = 0.0, offset_y: float = 0.0) -> plt.Axes: """Visualizes cost matrices Parameters ---------- ax : axes The Axes instance to plot on cost_matrices : list List of DTW cost matrices. offset_x : float Offset on the x axis. offset_y : float Offset on the y axis. Returns ------- ax: axes The Axes instance to plot on """ x_ax, y_ax = __size_dtw_matrices(dtw_matrices=cost_matrices) for i, cur_cost in enumerate(cost_matrices[::-1]): curr_x_ax = x_ax[i] + offset_x curr_y_ax = y_ax[i] + offset_y cur_cost = cost_matrices[i] ax.imshow(cur_cost, cmap='gray_r', aspect='auto', origin='lower', extent=[curr_y_ax[0], curr_y_ax[-1], curr_x_ax[0], curr_x_ax[-1]]) return ax def __visualize_path_in_matrix(ax, wp: np.ndarray = None, axisX: np.ndarray = None, axisY: np.ndarray = None, path_color: str = 'r'): """Plots a warping path on top of a given matrix. The matrix is usually an accumulated cost matrix. Parameters ---------- ax : axes The Axes instance to plot on wp : np.ndarray Warping path axisX : np.ndarray Array of X axis axisY : np.ndarray Array of Y axis path_color : str Color of the warping path to be plotted. (default: r) """ assert axisX is not None and isinstance(axisX, np.ndarray), 'axisX must be a numpy array!' assert axisY is not None and isinstance(axisY, np.ndarray), 'axisY must be a numpy array!' wp = wp.astype(int) ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], '-k', linewidth=5) ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], color=path_color, linewidth=3) def __visualize_constraint_rectangles(anchors: np.ndarray, linestyle: str = '-', edgecolor: str = 'royalblue', linewidth: float = 1.0): for k in range(anchors.shape[1]-1): a1 = anchors[:, k] a2 = anchors[:, k + 1] # a rectangle is defined by [x y width height] x = a1[0] y = a1[1] w = a2[0] - a1[0] + np.finfo(float).eps h = a2[1] - a1[1] + np.finfo(float).eps rect = Rectangle((x, y), w, h, linewidth=linewidth, edgecolor=edgecolor, linestyle=linestyle, facecolor='none') plt.gca().add_patch(rect) def project_alignment_on_a_new_feature_rate(alignment: np.ndarray, feature_rate_old: int, feature_rate_new: int, cost_matrix_size_old: tuple = (), cost_matrix_size_new: tuple = ()) -> np.ndarray: """Projects an alignment computed for a cost matrix on a certain feature resolution on a cost matrix having a different feature resolution. Parameters ---------- alignment : np.ndarray [shape=(2, N)] Alignment matrix feature_rate_old : int Feature rate of the old cost matrix feature_rate_new : int Feature rate of the new cost matrix cost_matrix_size_old : tuple Size of the old cost matrix. Possibly needed to deal with border cases cost_matrix_size_new : tuple Size of the new cost matrix. Possibly needed to deal with border cases Returns ------- np.ndarray [shape=(2, N)] Anchor sequence for the new cost matrix """ # Project the alignment on the new feature rate fac = feature_rate_new / feature_rate_old anchors = np.round(alignment * fac) + 1 # In case the sizes of the cost matrices are given explicitly and the # alignment specifies to align the first and last elements, handle this case # separately since this might cause problems in the general projection # procedure. if cost_matrix_size_old is not None and cost_matrix_size_new is not None: if np.array_equal(alignment[:, 0], np.array([0, 0])): anchors[:, 0] = np.array([1, 1]) if np.array_equal(alignment[:, -1], np.array(cost_matrix_size_old) - 1): anchors[:, -1] = np.array(cost_matrix_size_new) return anchors - 1 def derive_anchors_from_projected_alignment(projected_alignment: np.ndarray, threshold: int) -> np.ndarray: """Derive anchors from a projected alignment such that the area of the rectangle defined by two subsequent anchors a1 and a2 is below a given threshold. Parameters ---------- projected_alignment : np.ndarray [shape=(2, N)] Projected alignment array threshold : int Maximum area of the constraint rectangle Returns ------- anchors_res : np.ndarray [shape=(2, M)] Resulting anchor sequence """ L = projected_alignment.shape[1] a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1) a2 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1) if __compute_area(a1, a2) <= threshold: anchors_res = np.concatenate([a1, a2], axis=1) elif L > 2: center = int(np.floor(L/2 + 1)) a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1) a2 = np.array(projected_alignment[:, center - 1], copy=True).reshape(-1, 1) a3 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1) if __compute_area(a1, a2) > threshold: anchors_1 = derive_anchors_from_projected_alignment(projected_alignment[:, 0:center], threshold) else: anchors_1 = np.concatenate([a1, a2], axis=1) if __compute_area(a2, a3) > threshold: anchors_2 = derive_anchors_from_projected_alignment(projected_alignment[:, center - 1:], threshold) else: anchors_2 = np.concatenate([a2, a3], axis=1) anchors_res = np.concatenate([anchors_1, anchors_2[:, 1:]], axis=1) else: if __compute_area(a1, a2) > threshold: print('Only two anchor points are given which do not fulfill the constraint.') anchors_res = np.concatenate([a1, a2], axis=1) return anchors_res def derive_neighboring_anchors(warping_path: np.ndarray, anchor_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Compute anchor points in the neighborhood of previous anchor points. Parameters ---------- warping_path : np.ndarray [shape=(2, N)] Warping path anchor_indices : np.ndarray Indices corresponding to the anchor points in the ``warping_path`` Returns ------- neighboring_anchors : np.ndarray [shape=(2, N-1)] Sequence of neighboring anchors neighboring_anchor_indices : np.ndarray Indices into ``warping path`` corresponding to ``neighboring_anchors`` """ L = anchor_indices.shape[0] neighboring_anchor_indices = np.zeros(L-1, dtype=int) neighboring_anchors = np.zeros((2, L-1), dtype=int) for k in range(1, L): i1 = anchor_indices[k-1] i2 = anchor_indices[k] neighboring_anchor_indices[k-1] = i1 + np.floor((i2 - i1) / 2) neighboring_anchors[:, k-1] = warping_path[:, neighboring_anchor_indices[k - 1]] return neighboring_anchors, neighboring_anchor_indices @jit(nopython=True) def __compute_area(a: tuple, b: tuple): """Computes the area between two points, given as tuples""" return (b[0] - a[0] + 1) * (b[1] - a[1] + 1) class Transcriber(PitchEstimator): def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length) def transcribe(self, audio, batch_size=128, postprocessing='spotify', include_pitch_bends=True, to_midi=True, debug=False): """ Transcribe an audio file or mono waveform in numpy or torch into MIDI with pitch bends. :param audio: str, pathlib.Path, np.ndarray, or torch.Tensor :param batch_size: frames to process at once :param postprocessing: note creation method. 'spotify'(default) or 'tiktok' :param include_pitch_bends: whether to include pitch bends in the MIDI file :param to_midi: whether to return a MIDI file or a list of note events (as tuple) :return: transcribed MIDI file as a pretty_midi.PrettyMIDI object """ out = self.predict(audio, batch_size) if debug: plt.imshow(out['f0'].T, aspect='auto', origin='lower') plt.show() plt.imshow(out['note'].T, aspect='auto', origin='lower') plt.show() plt.imshow(out['onset'].T, aspect='auto', origin='lower') plt.show() plt.imshow(out['offset'].T, aspect='auto', origin='lower') plt.show() if to_midi: return self.out2midi(out, postprocessing, include_pitch_bends) else: return self.out2note(out, postprocessing, include_pitch_bends) def out2note(self, output: Dict[str, np.array], postprocessing='spotify', include_pitch_bends: bool = True, ) -> List[Tuple[float, float, int, float, Optional[List[int]]]]: """Convert model output to notes """ if postprocessing == 'spotify': estimated_notes = spotify_create_notes( output["note"], output["onset"], note_low=self.labeling.midi_centers[0], note_high=self.labeling.midi_centers[-1], onset_thresh=0.5, frame_thresh=0.3, infer_onsets=True, min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70 melodia_trick=True, ) if postprocessing == 'rebab': estimated_notes = spotify_create_notes( output["note"], output["onset"], note_low=self.labeling.midi_centers[0], note_high=self.labeling.midi_centers[-1], onset_thresh=0.2, frame_thresh=0.2, infer_onsets=True, min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70 melodia_trick=True, ) elif postprocessing == 'tiktok': postprocessor = RegressionPostProcessor( frames_per_second=self.sr / self.hop_length, classes_num=self.labeling.midi_centers.shape[0], begin_note=self.labeling.midi_centers[0], onset_threshold=0.2, offset_threshold=0.2, frame_threshold=0.3, pedal_offset_threshold=0.5, ) tiktok_note_dict, _ = postprocessor.output_dict_to_midi_events(output) estimated_notes = [] for list_item in tiktok_note_dict: if list_item['offset_time'] > 0.6 + list_item['onset_time']: estimated_notes.append((int(np.floor(list_item['onset_time']/(output['time'][1]))), int(np.ceil(list_item['offset_time']/(output['time'][1]))), list_item['midi_note'], list_item['velocity']/128)) if include_pitch_bends: estimated_notes_with_pitch_bend = self.get_pitch_bends(output["f0"], estimated_notes) else: estimated_notes_with_pitch_bend = [(note[0], note[1], note[2], note[3], None) for note in estimated_notes] times_s = output['time'] estimated_notes_time_seconds = [ (times_s[note[0]], times_s[note[1]], note[2], note[3], note[4]) for note in estimated_notes_with_pitch_bend ] return estimated_notes_time_seconds def out2midi(self, output: Dict[str, np.array], postprocessing: str = 'spotify', include_pitch_bends: bool = True, ) -> PrettyMIDI: """Convert model output to MIDI Args: output: A dictionary with shape { 'frame': array of shape (n_times, n_freqs), 'onset': array of shape (n_times, n_freqs), 'contour': array of shape (n_times, 3*n_freqs) } representing the output of the basic pitch model. postprocessing: spotify or tiktok postprocessing. include_pitch_bends: If True, include pitch bends. Returns: note_events: A list of note event tuples (start_time_s, end_time_s, pitch_midi, amplitude) """ estimated_notes_time_seconds = self.out2note(output, postprocessing, include_pitch_bends) midi_tempo = 120 # todo: infer tempo from the onsets return self.note2midi(estimated_notes_time_seconds, midi_tempo) def note2midi( self, note_events_with_pitch_bends: List[Tuple[float, float, int, float, Optional[List[int]]]], midi_tempo: float = 120, ): """Create a pretty_midi object from note events :param note_events_with_pitch_bends: list of tuples [(start_time_seconds, end_time_seconds, pitch_midi, amplitude)] :param midi_tempo: #todo: infer tempo from the onsets :return: transcribed MIDI file as a pretty_midi.PrettyMIDI object """ mid = PrettyMIDI(initial_tempo=midi_tempo) program = instrument_name_to_program(self.instrument) instruments: DefaultDict[int, Instrument] = defaultdict( lambda: Instrument(program=program) ) for start_time, end_time, note_number, amplitude, pitch_bend in note_events_with_pitch_bends: instrument = instruments[note_number] note = Note( velocity=int(np.round(127 * amplitude)), pitch=note_number, start=start_time, end=end_time, ) instrument.notes.append(note) if not isinstance(pitch_bend, np.ndarray): continue pitch_bend_times = np.linspace(start_time, end_time, len(pitch_bend)) for pb_time, pb_midi in zip(pitch_bend_times, pitch_bend): instrument.pitch_bends.append(PitchBend(pb_midi, pb_time)) mid.instruments.extend(instruments.values()) return mid def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray, f_chroma2: np.ndarray, f_onset1: np.ndarray = None, f_onset2: np.ndarray = None, input_feature_rate: float = 50, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), threshold_rec: int = 10000, win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), verbose: bool = False, dtw_implementation: str = 'synctoolbox', normalize_chroma: bool = True, chroma_norm_ord: int = 2, chroma_norm_threshold: float = 0.001, visualization_title: str = "MrMsDTW result", anchor_pairs: List[Tuple] = None, linear_inp_idx: List[int] = [], alpha=0.5) -> np.ndarray: """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint regions defined by the alignment found on the previous, coarser level. If onset features are provided, these are used on the finest level in addition to chroma to provide higher synchronization accuracy. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence f_onset1 : np.ndarray [shape=(L, N)] Onset feature matrix of the first sequence (optional, default: None) f_onset2 : np.ndarray [shape=(L, M)] Onset feature matrix of the second sequence (optional, default: None) input_feature_rate: int Input feature rate of the chroma features (default: 50) step_sizes: np.ndarray DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) step_weights: np.ndarray DTW step weights (np.array([1.0, 1.0, 1.0])) threshold_rec: int Defines the maximum area that is spanned by the rectangle of two consecutive elements in the alignment (default: 10000) win_len_smooth : np.ndarray Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) downsamp_smooth : np.ndarray Downsampling factors (default: np.array([50, 25, 5, 1])) verbose : bool Set `True` for visualization (default: False) dtw_implementation : str DTW implementation, librosa or synctoolbox (default: synctoolbox) normalize_chroma : bool Set `True` to normalize input chroma features after each downsampling and smoothing operation. chroma_norm_ord: int Order of chroma normalization, relevant if ``normalize_chroma`` is True. (default: 2) chroma_norm_threshold: float If the norm falls below threshold for a feature vector, then the normalized feature vector is set to be the unit vector. Relevant, if ``normalize_chroma`` is True (default: 0.001) visualization_title : str Title for the visualization plots. Only relevant if 'verbose' is True (default: "MrMsDTW result") anchor_pairs: List[Tuple] Anchor pairs given in seconds. Note that * (0, 0) and (, ) are not allowed. * Anchors must be monotonously increasing. linear_inp_idx: List[int] List of the indices of intervals created by anchor pairs, for which MrMsDTW shouldn't be run, e.g., if the interval only involves silence. 0 ap1 ap2 ap3 | | | | | idx0 | idx1 | idx2 | idx3 OR idx-1 | | | | Note that index -1 corresponds to the last interval, which begins with the last anchor pair until the end of the audio files. alpha: float Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) Returns ------- wp : np.ndarray [shape=(2, T)] Resulting warping path which indicates synchronized indices. """ if anchor_pairs is None: wp = sync_via_mrmsdtw(f_chroma1=f_chroma1, f_chroma2=f_chroma2, f_onset1=f_onset1, f_onset2=f_onset2, input_feature_rate=input_feature_rate, step_sizes=step_sizes, step_weights=step_weights, threshold_rec=threshold_rec, win_len_smooth=win_len_smooth, downsamp_smooth=downsamp_smooth, verbose=verbose, dtw_implementation=dtw_implementation, normalize_chroma=normalize_chroma, chroma_norm_ord=chroma_norm_ord, chroma_norm_threshold=chroma_norm_threshold, visualization_title=visualization_title, alpha=alpha) else: # constant_intervals = [((0, x1), (0, y1), False), # ((x1, x2), (y1, y2), True), # ((x2, -1), (y2, -1), False)] wp = None if verbose: print('Anchor points are given!') __check_anchor_pairs(anchor_pairs, f_chroma1.shape[1], f_chroma2.shape[1], input_feature_rate) # Add ending as the anchor point anchor_pairs.append((-1, -1)) prev_a1 = 0 prev_a2 = 0 for idx, anchor_pair in enumerate(anchor_pairs): cur_a1, cur_a2 = anchor_pair # Split the features f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1, f_onset1, f_chroma2, f_onset2, cur_a1, cur_a2, prev_a1, prev_a2, input_feature_rate) if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx: # Generate a diagonal warping path, if the algorithm is not supposed to executed. # A typical scenario is the silence breaks which are enclosed by two anchor points. if verbose: print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f' '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split) else: if verbose: if cur_a1 != -1 and cur_a2 != -1: print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f' '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) else: print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end' '\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2)) wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split, f_chroma2=f_chroma2_split, f_onset1=f_onset1_split, f_onset2=f_onset2_split, input_feature_rate=input_feature_rate, step_sizes=step_sizes, step_weights=step_weights, threshold_rec=threshold_rec, win_len_smooth=win_len_smooth, downsamp_smooth=downsamp_smooth, verbose=verbose, dtw_implementation=dtw_implementation, normalize_chroma=normalize_chroma, chroma_norm_ord=chroma_norm_ord, chroma_norm_threshold=chroma_norm_threshold, alpha=alpha) if wp is None: wp = np.array(wp_cur, copy=True) # Concatenate warping paths else: wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1) prev_a1 = cur_a1 prev_a2 = cur_a2 anchor_pairs.pop() return wp def sync_via_mrmsdtw(f_chroma1: np.ndarray, f_chroma2: np.ndarray, f_onset1: np.ndarray = None, f_onset2: np.ndarray = None, input_feature_rate: float = 50, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), threshold_rec: int = 10000, win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), verbose: bool = False, dtw_implementation: str = 'synctoolbox', normalize_chroma: bool = True, chroma_norm_ord: int = 2, chroma_norm_threshold: float = 0.001, visualization_title: str = "MrMsDTW result", alpha=0.5) -> np.ndarray: """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint regions defined by the alignment found on the previous, coarser level. If onset features are provided, these are used on the finest level in addition to chroma to provide higher synchronization accuracy. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence f_onset1 : np.ndarray [shape=(L, N)] Onset feature matrix of the first sequence (optional, default: None) f_onset2 : np.ndarray [shape=(L, M)] Onset feature matrix of the second sequence (optional, default: None) input_feature_rate: int Input feature rate of the chroma features (default: 50) step_sizes: np.ndarray DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) step_weights: np.ndarray DTW step weights (np.array([1.0, 1.0, 1.0])) threshold_rec: int Defines the maximum area that is spanned by the rectangle of two consecutive elements in the alignment (default: 10000) win_len_smooth : np.ndarray Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) downsamp_smooth : np.ndarray Downsampling factors (default: np.array([50, 25, 5, 1])) verbose : bool Set `True` for visualization (default: False) dtw_implementation : str DTW implementation, librosa or synctoolbox (default: synctoolbox) normalize_chroma : bool Set `True` to normalize input chroma features after each downsampling and smoothing operation. chroma_norm_ord: int Order of chroma normalization, relevant if ``normalize_chroma`` is True. (default: 2) chroma_norm_threshold: float If the norm falls below threshold for a feature vector, then the normalized feature vector is set to be the unit vector. Relevant, if ``normalize_chroma`` is True (default: 0.001) visualization_title : str Title for the visualization plots. Only relevant if 'verbose' is True (default: "MrMsDTW result") alpha: float Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) Returns ------- alignment: np.ndarray [shape=(2, T)] Resulting warping path which indicates synchronized indices. """ # If onset features are given as input, high resolution MrMsDTW is activated. high_res = False if f_onset1 is not None and f_onset2 is not None: high_res = True if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]): raise ValueError('Chroma and onset features must be of the same length.') if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1: raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.' 'at the last iteration, it is computed at the input feature rate!') num_iterations = win_len_smooth.shape[0] cost_matrix_size_old = tuple() feature_rate_old = input_feature_rate / downsamp_smooth[0] alignment = None total_computation_time = 0.0 # If the area is less than the threshold_rec, don't apply the multiscale DTW. it = (num_iterations - 1) if __compute_area(f_chroma1, f_chroma2) < threshold_rec else 0 while it < num_iterations: tic1 = perf_counter() # Smooth and downsample given raw features f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1, input_feature_rate=input_feature_rate, win_len_smooth=win_len_smooth[it], downsamp_smooth=downsamp_smooth[it]) f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2, input_feature_rate=input_feature_rate, win_len_smooth=win_len_smooth[it], downsamp_smooth=downsamp_smooth[it]) if normalize_chroma: f_chroma1_cur = normalize_feature(f_chroma1_cur, norm_ord=chroma_norm_ord, threshold=chroma_norm_threshold) f_chroma2_cur = normalize_feature(f_chroma2_cur, norm_ord=chroma_norm_ord, threshold=chroma_norm_threshold) # Project path onto new resolution cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1]) if alignment is None: # Initialize the alignment with the start and end frames of the feature sequence anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]]) else: projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment, feature_rate_old=feature_rate_old, feature_rate_new=feature_rate_new, cost_matrix_size_old=cost_matrix_size_old, cost_matrix_size_new=cost_matrix_size_new) anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment, threshold=threshold_rec) # Cost matrix and warping path computation if high_res and it == num_iterations - 1: # Compute cost considering chroma and pitch onset features and alignment only in the last iteration, # where the features are at the finest level. cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, f_onset1=f_onset1, f_onset2=f_onset2, anchors=anchors, alpha=alpha) else: cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, anchors=anchors, alpha=alpha) wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1, step_sizes=step_sizes, step_weights=step_weights, implementation=dtw_implementation) # Concatenate warping paths wp = build_path_from_warping_paths(warping_paths=wp_list, anchors=anchors) anchors_step1 = None wp_step1 = None num_rows_step1 = 0 num_cols_step1 = 0 ax = None toc1 = perf_counter() if verbose and cost_matrices_step1 is not None: anchors_step1 = np.array(anchors, copy=True) wp_step1 = np.array(wp, copy=True) num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int), axis=0) fig, ax = sync_visualize_step1(cost_matrices_step1, num_rows_step1, num_cols_step1, anchors, wp) tic2 = perf_counter() # Compute neighboring anchors and refine alignment using local path between neighboring anchors anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors) # Compute neighboring anchors for refinement neighboring_anchors, neighboring_anchor_indices = \ derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path) if neighboring_anchor_indices.shape[0] > 1 \ and it == num_iterations - 1 and high_res: cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, f_onset1=f_onset1, f_onset2=f_onset2, anchors=neighboring_anchors, alpha=alpha) else: cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, anchors=neighboring_anchors, alpha=alpha) wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2, step_sizes=step_sizes, step_weights=step_weights, implementation=dtw_implementation) wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices) toc2 = perf_counter() computation_time_it = toc2 - tic2 + toc1 - tic1 total_computation_time += computation_time_it alignment = wp feature_rate_old = feature_rate_new cost_matrix_size_old = cost_matrix_size_new if verbose and cost_matrices_step2 is not None: sync_visualize_step2(ax, cost_matrices_step2, wp, wp_step1, num_rows_step1, num_cols_step1, anchors_step1, neighboring_anchors, plot_title=f"{visualization_title} - Level {it + 1}") print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it)) it += 1 if verbose: print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time)) return alignment def __diagonal_warping_path(f1: np.ndarray, f2: np.ndarray) -> np.ndarray: """Generates a diagonal warping path given two feature sequences. Parameters ---------- f1: np.ndarray [shape=(_, N)] First feature sequence f2: np.ndarray [shape=(_, M)] Second feature sequence Returns ------- np.ndarray: Diagonal warping path [shape=(2, T)] """ max_size = np.maximum(f1.shape[1], f2.shape[1]) min_size = np.minimum(f1.shape[1], f2.shape[1]) if min_size == 1: return np.array([max_size - 1, 0]).reshape(-1, 1) elif max_size == f1.shape[1]: return np.array([np.round(np.linspace(0, max_size - 1, min_size)), np.linspace(0, min_size - 1, min_size)]) else: return np.array([np.linspace(0, min_size-1, min_size), np.round(np.linspace(0, max_size - 1, min_size))]) @jit(nopython=True) def __compute_area(f1, f2): """Computes the area of the cost matrix given two feature sequences Parameters ---------- f1: np.ndarray First feature sequence f2: np.ndarray Second feature sequence Returns ------- int: Area of the cost matrix """ return f1.shape[1] * f2.shape[1] def __split_features(f_chroma1: np.ndarray, f_onset1: np.ndarray, f_chroma2: np.ndarray, f_onset2: np.ndarray, cur_a1: float, cur_a2: float, prev_a1: float, prev_a2: float, feature_rate: int) -> Tuple[np.ndarray, Optional[np.ndarray], np.ndarray, Optional[np.ndarray]]: if cur_a1 == -1: f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):] if f_onset1 is not None: f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):] else: f_onset1_split = None else: # Split the features f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] if f_onset1 is not None: f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] else: f_onset1_split = None if cur_a2 == -1: f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):] if f_onset2 is not None: f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):] else: f_onset2_split = None else: f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] if f_onset2 is not None: f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] else: f_onset2_split = None return f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split def __refine_wp(wp: np.ndarray, anchors: np.ndarray, wp_list_refine: List, neighboring_anchors: np.ndarray, neighboring_anchor_indices: np.ndarray) -> np.ndarray: wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1] last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile( wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length) wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list] A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)], axis=1) wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp, anchors=A_tmp) return wp_res def __check_anchor_pairs(anchor_pairs: List, f_len1: int, f_len2: int, feature_rate: int): """Ensures that the anchors satisfy the conditions Parameters ---------- anchor_pairs: List[Tuple] List of anchor pairs f_len1: int Length of the first feature sequence f_len2: int Length of the second feature sequence feature_rate: int Input feature rate of the features """ prev_a1 = 0 prev_a2 = 0 for anchor_pair in anchor_pairs: a1, a2 = anchor_pair if a1 <= 0 or a2 <= 0: raise ValueError('Starting point must be a positive number!') if a1 > f_len1 / feature_rate or a2 > f_len2 / feature_rate: raise ValueError('Anchor points cannot be greater than the length of the input audio files!') if a1 == f_len1 and a2 == f_len2: raise ValueError('Both anchor points cannot be equal to the length of the audio files.') if a1 == prev_a1 and a2 == prev_a2: raise ValueError('Duplicate anchor pairs are not allowed!') if a1 < prev_a1 or a2 < prev_a2: raise ValueError('Anchor points must be monotonously increasing.') prev_a1 = a1 prev_a2 = a2 class PerformanceLabel: """ The dataset labeling class for performance representations. Currently, includes onset, note, and fine-grained f0 representations. Note min, note max, and f0_bin_per_semitone values are to be arranged per instrument. The default values are for violin performance analysis. Fretted instruments might not require such f0 resolutions per semitone. """ def __init__(self, note_min='F#3', note_max='C8', f0_bins_per_semitone=9, f0_smooth_std_c=None, onset_smooth_std=0.7, f0_tolerance_c=200): midi_min = note_name_to_number(note_min) midi_max = note_name_to_number(note_max) self.midi_centers = np.arange(midi_min, midi_max) self.onset_smooth_std=onset_smooth_std # onset smoothing along time axis (compensate for alignment) f0_hz_range = note_to_hz([note_min, note_max]) f0_c_min, f0_c_max = hz2cents(f0_hz_range) self.f0_granularity_c = 100/f0_bins_per_semitone if not f0_smooth_std_c: f0_smooth_std_c = self.f0_granularity_c * 5/4 # Keep the ratio from the CREPE paper (20 cents and 25 cents) self.f0_smooth_std_c = f0_smooth_std_c self.f0_centers_c = np.arange(f0_c_min, f0_c_max, self.f0_granularity_c) self.f0_centers_hz = 10 * 2 ** (self.f0_centers_c / 1200) self.f0_n_bins = len(self.f0_centers_c) self.pdf_normalizer = norm.pdf(0) self.f0_c2hz = lambda c: 10*2**(c/1200) self.f0_hz2c = hz2cents self.midi_centers_c = self.f0_hz2c(midi_to_hz(self.midi_centers)) self.f0_tolerance_bins = int(f0_tolerance_c/self.f0_granularity_c) self.f0_transition_matrix = gaussian_filter1d(np.eye(2*self.f0_tolerance_bins + 1), 25/self.f0_granularity_c) def f0_c2label(self, pitch_c): """ Convert a single f0 value in cents to a one-hot label vector with smoothing (i.e., create a gaussian blur around the target f0 bin for regularization and training stability. The blur is controlled by self.f0_smooth_std_c :param pitch_c: a single pitch value in cents :return: one-hot label vector with frequency blur """ result = norm.pdf((self.f0_centers_c - pitch_c) / self.f0_smooth_std_c).astype(np.float32) result /= self.pdf_normalizer return result def f0_label2c(self, salience, center=None): """ Convert the salience predictions to monophonic f0 in cents. Only outputs a single f0 value per frame! :param salience: f0 activations :param center: f0 center bin to calculate the weighted average. Use argmax if empty :return: f0 array per frame (in cents). """ if salience.ndim == 1: if center is None: center = int(np.argmax(salience)) start = max(0, center - 4) end = min(len(salience), center + 5) salience = salience[start:end] product_sum = np.sum(salience * self.f0_centers_c[start:end]) weight_sum = np.sum(salience) return product_sum / np.clip(weight_sum, 1e-8, None) if salience.ndim == 2: return np.array([self.f0_label2c(salience[i, :]) for i in range(salience.shape[0])]) raise Exception("label should be either 1d or 2d ndarray") def fill_onset_matrix(self, onsets, window, feature_rate): """ Create a sparse onset matrix from window and onsets (per-semitone). Apply a gaussian smoothing (along time) so that we can tolerate better the alignment problems. This is similar to the frequency smoothing for the f0. The temporal smoothing is controlled by the parameter self.onset_smooth_std :param onsets: A 2d np.array of individual note onsets with their respective time values (Nx2: time in seconds - midi number) :param window: Timestamps for the frame centers of the sparse matrix :param feature_rate: Window timestamps are integer, this is to convert them to seconds :return: onset_roll: A sparse matrix filled with temporally blurred onsets. """ onsets = self.get_window_feats(onsets, window, feature_rate) onset_roll = np.zeros((len(window), len(self.midi_centers))) for onset in onsets: onset, note = onset # it was a pair with time and midi note if self.midi_centers[0] < note < self.midi_centers[-1]: # midi note should be in the range defined note = int(note) - self.midi_centers[0] # find the note index in our range onset = (onset*feature_rate)-window[0] # onset index (as float but in frames, not in seconds!) start = max(0, int(onset) - 3) end = min(len(window) - 1, int(onset) + 3) try: vals = norm.pdf(np.linspace(start - onset, end - onset, end - start + 1) / self.onset_smooth_std) # if you increase 0.7 you smooth the peak # if you decrease it, e.g., 0.1, it becomes too peaky! around 0.5-0.7 seems ok vals /= self.pdf_normalizer onset_roll[start:end + 1, note] += vals except ValueError: print('start',start, 'onset', onset, 'end', end) return onset_roll, onsets def fill_note_matrix(self, notes, window, feature_rate): """ Create the note matrix (piano roll) from window timestamps and note values per frame. :param notes: A 2d np.array of individual notes with their active time values Nx2 :param window: Timestamps for the frame centers of the output :param feature_rate: Window timestamps are integer, this is to convert them to seconds :return note_roll: The piano roll in the defined range of [note_min, note_max). """ notes = self.get_window_feats(notes, window, feature_rate) # take the notes in the midi range defined notes = notes[np.logical_and(notes[:,1]>=self.midi_centers[0], notes[:,1]<=self.midi_centers[-1]),:] times = (notes[:,0]*feature_rate - window[0]).astype(int) # in feature samples (fs:self.hop/self.sr) notes = (notes[:,1] - self.midi_centers[0]).astype(int) note_roll = np.zeros((len(window), len(self.midi_centers))) note_roll[(times, notes)] = 1 return note_roll, notes def fill_f0_matrix(self, f0s, window, feature_rate): """ Unlike the labels for onsets and notes, f0 label is only relevant for strictly monophonic regions! Thus, this function returns a boolean which represents where to apply the given values. Never back-propagate without the boolean! Empty frames mean that the label is not that reliable. :param f0s: A 2d np.array of f0 values with the time they belong to (2xN: time in seconds - f0 in Hz) :param window: Timestamps for the frame centers of the output :param feature_rate: Window timestamps are integer, this is to convert them to seconds :return f0_roll: f0 label matrix and f0_hz: f0 values in Hz annotation_bool: A boolean array representing which frames have reliable f0 annotations. """ f0s = self.get_window_feats(f0s, window, feature_rate) f0_cents = np.zeros_like(window, dtype=float) f0s[:,1] = self.f0_hz2c(f0s[:,1]) # convert f0 in hz to cents annotation_bool = np.zeros_like(window, dtype=bool) f0_roll = np.zeros((len(window), len(self.f0_centers_c))) times_in_frame = f0s[:, 0]*feature_rate - window[0] for t, f0 in enumerate(f0s): t = times_in_frame[t] if t%1 < 0.25: # only consider it as annotation if the f0 values is really close to the frame center t = int(np.round(t)) f0_roll[t] = self.f0_c2label(f0[1]) annotation_bool[t] = True f0_cents[t] = f0[1] return f0_roll, f0_cents, annotation_bool @staticmethod def get_window_feats(time_feature_matrix, window, feature_rate): """ Restrict the feature matrix to the features that are inside the window :param window: Timestamps for the frame centers of the output :param time_feature_matrix: A 2d array of Nx2 per the entire file. :param feature_rate: Window timestamps are integer, this is to convert them to seconds :return: window_features: the features inside the given window """ start = time_feature_matrix[:,0]>(window[0]-0.5)/feature_rate end = time_feature_matrix[:,0]<(window[-1]+0.5)/feature_rate window_features = np.logical_and(start, end) window_features = np.array(time_feature_matrix[window_features,:]) return window_features def represent_midi(self, midi, feature_rate): """ Represent a midi file as sparse matrices of onsets, offsets, and notes. No f0 is included. :param midi: A midi file (either a path or a pretty_midi.PrettyMIDI object) :param feature_rate: The feature rate in Hz :return: dict {onset, offset, note, time}: Same format with the model's learning and outputs """ def _get_onsets_offsets_frames(midi_content): if isinstance(midi_content, str): midi_content = PrettyMIDI(midi_content) onsets = [] offsets = [] frames = [] for instrument in midi_content.instruments: for note in instrument.notes: start = int(np.round(note.start * feature_rate)) end = int(np.round(note.end * feature_rate)) note_times = (np.arange(start, end+0.5)/feature_rate)[:, np.newaxis] note_pitch = np.full_like(note_times, fill_value=note.pitch) onsets.append([note.start, note.pitch]) offsets.append([note.end, note.pitch]) frames.append(np.hstack([note_times, note_pitch])) onsets = np.vstack(onsets) offsets = np.vstack(offsets) frames = np.vstack(frames) return onsets, offsets, frames, midi_content onset_array, offset_array, frame_array, midi_object = _get_onsets_offsets_frames(midi) window = np.arange(frame_array[0, 0]*feature_rate, frame_array[-1, 0]*feature_rate, dtype=int) onset_roll, _ = self.fill_onset_matrix(onset_array, window, feature_rate) offset_roll, _ = self.fill_onset_matrix(offset_array, window, feature_rate) note_roll, _ = self.fill_note_matrix(frame_array, window, feature_rate) start_anchor = onset_array[onset_array[:, 0]==np.min(onset_array[:, 0])] end_anchor = offset_array[offset_array[:, 0]==np.max(offset_array[:, 0])] return { 'midi': midi_object, 'note': note_roll, 'onset': onset_roll, 'offset': offset_roll, 'time': window/feature_rate, 'start_anchor': start_anchor, 'end_anchor': end_anchor } class Synchronizer(Transcriber): def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length) def synchronize(self, audio, midi, batch_size=128, include_pitch_bends=True, to_midi=True, debug=False, include_velocity=False, alignment_padding=50, timing_refinement_range_with_f0s=0): """ Synchronize an audio file or mono waveform in numpy or torch with a MIDI file. :param audio: str, pathlib.Path, np.ndarray, or torch.Tensor :param midi: str, pathlib.Path, or pretty_midi.PrettyMIDI :param batch_size: frames to process at once :param include_pitch_bends: whether to include pitch bends in the MIDI file :param to_midi: whether to return a MIDI file or a list of note events (as tuple) :param debug: whether to plot the alignment path and compare the alignment with the predicted notes :param include_velocity: whether to embed the note confidence in place of the velocity in the MIDI file :param alignment_padding: how many frames to pad the audio and MIDI representations with :param timing_refinement_range_with_f0s: how many frames to refine the alignment with the f0 confidence :return: aligned MIDI file as a pretty_midi.PrettyMIDI object Args: debug: to_midi: include_pitch_bends: """ audio = self.predict(audio, batch_size) notes_and_midi = self.out2sync(audio, midi, include_velocity=include_velocity, alignment_padding=alignment_padding) if notes_and_midi: # it might be none notes, midi = notes_and_midi if debug: import pandas as pd estimated_notes = self.out2note(audio, postprocessing='spotify', include_pitch_bends=True) est_df = pd.DataFrame(estimated_notes).sort_values(by=0) note_df = pd.DataFrame(notes).sort_values(by=0) fig, ax = plt.subplots(figsize=(20, 10)) for row in notes: t_start = row[0] # sec t_end = row[1] # sec freq = row[2] # Hz ax.hlines(freq, t_start, t_end, color='k', linewidth=3, zorder=2, alpha=0.5) for row in estimated_notes: t_start = row[0] # sec t_end = row[1] # sec freq = row[2] # Hz ax.hlines(freq, t_start, t_end, color='r', linewidth=3, zorder=2, alpha=0.5) fig.suptitle('alignment (black) vs. estimated (red)') fig.show() if not include_pitch_bends: if to_midi: return midi['midi'] else: return notes else: notes = [(np.argmin(np.abs(audio['time']-note[0])), np.argmin(np.abs(audio['time']-note[1])), note[2], note[3]) for note in notes] notes = self.get_pitch_bends(audio["f0"], notes, timing_refinement_range_with_f0s) notes = [ (audio['time'][note[0]], audio['time'][note[1]], note[2], note[3], note[4]) for note in notes ] if to_midi: return self.note2midi(notes, 120) #int(midi['midi'].estimate_tempo())) else: return notes def out2sync_old(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False): """ Synchronizes the output of the model with the MIDI file. Args: out: Model output dictionary midi: Path to the MIDI file or PrettyMIDI object include_velocity: Whether to encode the note confidence in place of velocity alignment_padding: Number of frames to pad the MIDI features with zeros debug: Visualize the alignment Returns: note events and the aligned PrettyMIDI object """ midi = self.labeling.represent_midi(midi, self.sr/self.hop_length) audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length, pad_length=alignment_padding) if isinstance(audio_midi_anchors, str): print(audio_midi_anchors) return None # the file is corrupted! no possible alignment at all else: audio, midi, anchor_pairs = audio_midi_anchors ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T, f_onset1=np.hstack([ALPHA * audio['onset'], (1 - ALPHA) * audio['offset']]).T, f_chroma2=midi['note'].T, f_onset2=np.hstack([ALPHA * midi['onset'], (1 - ALPHA) * midi['offset']]).T, input_feature_rate=self.sr/self.hop_length, step_weights=np.array([1.5, 1.5, 2.0]), threshold_rec=10 ** 6, verbose=debug, normalize_chroma=False, anchor_pairs=anchor_pairs) wp = make_path_strictly_monotonic(wp).astype(int) audio_time = np.take(audio['time'], wp[0]) midi_time = np.take(midi['time'], wp[1]) notes = [] for instrument in midi['midi'].instruments: for note in instrument.notes: note.start = np.interp(note.start, midi_time, audio_time) note.end = np.interp(note.end, midi_time, audio_time) if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames) note.start = note.start - 0.003 note.end = note.start + 0.012 if include_velocity: # encode the note confidence in place of velocity velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)): np.argmin(np.abs(audio['time']-note.end)), note.pitch-self.labeling.midi_centers[0]]) note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note else: velocity = note.velocity/127 notes.append((note.start, note.end, note.pitch, velocity)) return notes, midi def out2sync(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False): """ Synchronizes the output of the model with the MIDI file. Args: out: Model output dictionary midi: Path to the MIDI file or PrettyMIDI object include_velocity: Whether to encode the note confidence in place of velocity alignment_padding: Number of frames to pad the MIDI features with zeros debug: Visualize the alignment Returns: note events and the aligned PrettyMIDI object """ midi = self.labeling.represent_midi(midi, self.sr/self.hop_length) audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length, pad_length=alignment_padding) if isinstance(audio_midi_anchors, str): print(audio_midi_anchors) return None # the file is corrupted! no possible alignment at all else: audio, midi, anchor_pairs = audio_midi_anchors ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets starts = (np.array(anchor_pairs[0])*self.sr/self.hop_length).astype(int) ends = (np.array(anchor_pairs[1])*self.sr/self.hop_length).astype(int) wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T[:, starts[0]:ends[0]], f_onset1=np.hstack([ALPHA * audio['onset'], (1 - ALPHA) * audio['offset']]).T[:, starts[0]:ends[0]], f_chroma2=midi['note'].T[:, starts[1]:ends[1]], f_onset2=np.hstack([ALPHA * midi['onset'], (1 - ALPHA) * midi['offset']]).T[:, starts[1]:ends[1]], input_feature_rate=self.sr/self.hop_length, step_weights=np.array([1.5, 1.5, 2.0]), threshold_rec=10 ** 6, verbose=debug, normalize_chroma=False, anchor_pairs=None) wp = make_path_strictly_monotonic(wp).astype(int) wp[0] += starts[0] wp[1] += starts[1] wp = np.hstack((wp, ends[:,np.newaxis])) audio_time = np.take(audio['time'], wp[0]) midi_time = np.take(midi['time'], wp[1]) notes = [] for instrument in midi['midi'].instruments: for note in instrument.notes: note.start = np.interp(note.start, midi_time, audio_time) note.end = np.interp(note.end, midi_time, audio_time) if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames) note.start = note.start - 0.003 note.end = note.start + 0.012 if include_velocity: # encode the note confidence in place of velocity velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)): np.argmin(np.abs(audio['time']-note.end)), note.pitch-self.labeling.midi_centers[0]]) note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note else: velocity = note.velocity/127 notes.append((note.start, note.end, note.pitch, velocity)) return notes, midi @staticmethod def pad_representations(dict_of_representations, pad_length=10): """ Pad the representations so that the DTW does not enforce them to encompass the entire duration. Args: dict_of_representations: audio or midi representations pad_length: how many frames to pad Returns: padded representations """ for key, value in dict_of_representations.items(): if key == 'time': padded_time = dict_of_representations[key] padded_time = np.concatenate([padded_time[:2*pad_length], padded_time+padded_time[2*pad_length]]) dict_of_representations[key] = padded_time - padded_time[pad_length] # this is to ensure that the # first frame times are negative until the real zero time elif key in ['onset', 'offset', 'note']: dict_of_representations[key] = np.pad(value, ((pad_length, pad_length), (0, 0))) elif key in ['start_anchor', 'end_anchor']: anchor_time = dict_of_representations[key][0][0] anchor_time = np.argmin(np.abs(dict_of_representations['time'] - anchor_time)) dict_of_representations[key][:,0] = anchor_time dict_of_representations[key] = dict_of_representations[key].astype(np.int) return dict_of_representations def prepare_for_synchronization(self, audio, midi, feature_rate=44100/256, pad_length=100): """ MrMsDTW works better with start and end anchors. This function finds the start and end anchors for audio based on the midi notes. It also pads the MIDI representations since MIDI files most often start with an active note and end with an active note. Thus, the DTW will try to align the active notes to the entire duration of the audio. This is not desirable. Therefore, we pad the MIDI representations with a few frames of silence at the beginning and end of the audio. This way, the DTW will not try to align the active notes to the entire duration. Args: audio: midi: feature_rate: pad_length: Returns: """ # first pad the MIDI midi = self.pad_representations(midi, pad_length) # sometimes f0s are more reliable than the notes. So, we use both the f0s and the notes together to find the # start and end anchors. f0 lookup bins is the number of bins to look around the f0 to assign a note to it. f0_lookup_bins = int(100//(2*self.labeling.f0_granularity_c)) # find the start anchor for the audio # first decide on which notes to use for the start anchor (take the entire chord where the MIDI file starts) anchor_notes = midi['start_anchor'][:, 1] - self.labeling.midi_centers[0] # now find which f0 bins to look at for the start anchor anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes] anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1) # first start anchor proposals come from the notes anchor_vals = np.any(audio['note'][:, anchor_notes]>0.5, axis=1) # now the f0s anchor_vals_f0 = np.any(audio['f0'][:, anchor_f0s]>0.5, axis=1) # combine the two anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0) if not any(anchor_vals): return 'corrupted' # do not consider the file if we cannot find the start anchor audio_start = np.argmax(anchor_vals) # now the end anchor (most string instruments use chords in cadences: in general the end anchor is polyphonic) anchor_notes = midi['end_anchor'][:, 1] - self.labeling.midi_centers[0] anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes] anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1) # the same procedure as above anchor_vals = np.any(audio['note'][::-1, anchor_notes]>0.5, axis=1) anchor_vals_f0 = np.any(audio['f0'][::-1, anchor_f0s]>0.5, axis=1) anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0) if not any(anchor_vals): return 'corrupted' # do not consider the file if we cannot find the end anchor audio_end = audio['note'].shape[0] - np.argmax(anchor_vals) if audio_end - audio_start < (midi['end_anchor'][0][0] - midi['start_anchor'][0][0])/10: # no one plays x10 faster return 'corrupted' # do not consider the interval between anchors is too short anchor_pairs = [(audio_start - 5, midi['start_anchor'][0][0] - 5), (audio_end + 5, midi['end_anchor'][0][0] + 5)] if anchor_pairs[0][0] < 1: anchor_pairs[0] = (1, midi['start_anchor'][0][0]) if anchor_pairs[1][0] > audio['note'].shape[0] - 1: anchor_pairs[1] = (audio['note'].shape[0] - 1, midi['end_anchor'][0][0]) return audio, midi, [(anchor_pairs[0][0]/feature_rate, anchor_pairs[0][1]/feature_rate), (anchor_pairs[1][0]/feature_rate, anchor_pairs[1][1]/feature_rate)] class ConvBlock(nn.Module): def __init__(self, f, w, s, d, in_channels): super().__init__() p1 = d*(w - 1) // 2 p2 = d*(w - 1) - p1 self.pad = nn.ZeroPad2d((0, 0, p1, p2)) self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1), dilation=(d, 1)) self.relu = nn.ReLU() self.bn = nn.BatchNorm2d(f) self.pool = nn.MaxPool2d(kernel_size=(2, 1)) self.dropout = nn.Dropout(0.25) def forward(self, x): x = self.pad(x) x = self.conv2d(x) x = self.relu(x) x = self.bn(x) x = self.pool(x) x = self.dropout(x) return x class NoPadConvBlock(nn.Module): def __init__(self, f, w, s, d, in_channels): super().__init__() self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1), dilation=(d, 1)) self.relu = nn.ReLU() self.bn = nn.BatchNorm2d(f) self.pool = nn.MaxPool2d(kernel_size=(2, 1)) self.dropout = nn.Dropout(0.25) def forward(self, x): x = self.conv2d(x) x = self.relu(x) x = self.bn(x) x = self.pool(x) x = self.dropout(x) return x class TinyPathway(nn.Module): def __init__(self, dilation=1, hop=256, localize=False, model_capacity="full", n_layers=6, chunk_size=256): super().__init__() capacity_multiplier = { 'tiny': 4, 'small': 8, 'medium': 16, 'large': 24, 'full': 32 }[model_capacity] self.layers = [1, 2, 3, 4, 5, 6] self.layers = self.layers[:n_layers] filters = [n * capacity_multiplier for n in [32, 8, 8, 8, 8, 8]] filters = [1] + filters widths = [512, 64, 64, 64, 32, 32] strides = self.deter_dilations(hop//(4*(2**n_layers)), localize=localize) strides[0] = strides[0]*4 # apply 4 times more stride at the first layer dilations = self.deter_dilations(dilation) for i in range(len(self.layers)): f, w, s, d, in_channel = filters[i + 1], widths[i], strides[i], dilations[i], filters[i] self.add_module("conv%d" % i, NoPadConvBlock(f, w, s, d, in_channel)) self.chunk_size = chunk_size self.input_window, self.hop = self.find_input_size_for_pathway() self.out_dim = filters[n_layers] def find_input_size_for_pathway(self): def find_input_size(output_size, kernel_size, stride, dilation, padding): num = (stride*(output_size-1)) + 1 input_size = num - 2*padding + dilation*(kernel_size-1) return input_size conv_calc, n = {}, 0 for i in self.layers: layer = self.__getattr__("conv%d" % (i-1)) for mm in layer.modules(): if hasattr(mm, 'kernel_size'): try: d = mm.dilation[0] except TypeError: d = mm.dilation conv_calc[n] = [mm.kernel_size[0], mm.stride[0], 0, d] n += 1 out = self.chunk_size hop = 1 for n in sorted(conv_calc.keys())[::-1]: kernel_size_n, stride_n, padding_n, dilation_n = conv_calc[n] out = find_input_size(out, kernel_size_n, stride_n, dilation_n, padding_n) hop = hop*stride_n return out, hop def deter_dilations(self, total_dilation, localize=False): n_layers = len(self.layers) if localize: # e.g., 32*1023 window and 3 layers -> [1, 1, 32] a = [total_dilation] + [1 for _ in range(n_layers-1)] else: # e.g., 32*1023 window and 3 layers -> [4, 4, 2] total_dilation = int(np.log2(total_dilation)) a = [] for layer in range(n_layers): this_dilation = int(np.ceil(total_dilation/(n_layers-layer))) a.append(2**this_dilation) total_dilation = total_dilation - this_dilation return a[::-1] def forward(self, x): x = x.view(x.shape[0], 1, -1, 1) for i in range(len(self.layers)): x = self.__getattr__("conv%d" % i)(x) x = x.permute(0, 3, 2, 1) return x #@jit(nopython=True) def cosine_distance(f1, f2, cos_meas_max=2.0, cos_meas_min=1.0): """For all pairs of vectors f1' and f2' in f1 and f2, computes 1 - (f1.f2), where '.' is the dot product, and rescales the results to lie in the range [cos_meas_min, cos_meas_max]. Corresponds to regular cosine distance if f1' and f2' are normalized and cos_meas_min==0.0 and cos_meas_max==1.0.""" return (1 - f1.T @ f2) * (cos_meas_max - cos_meas_min) + cos_meas_min #@jit(nopython=True) def euclidean_distance(f1, f2, l2_meas_max=1.0, l2_meas_min=0.0): """Computes euclidean distances between the vectors in f1 and f2, and rescales the results to lie in the range [cos_meas_min, cos_meas_max].""" #S1 = np.zeros((f1.shape[1], f2.shape[1])) #for n in range(f2.shape[1]): # S1[:, n] = np.sqrt(np.sum((f1.T - f2[:, n]) ** 2, axis=1)) S1 = euclidean_distances(f1.T, f2.T) return S1 * (l2_meas_max - l2_meas_min) + l2_meas_min def compute_high_res_cost_matrix(f_chroma1: np.ndarray, f_chroma2: np.ndarray, f_onset1: np.ndarray, f_onset2: np.ndarray, weights: np.ndarray = np.array([1.0, 1.0]), cos_meas_min: float = 1.0, cos_meas_max: float = 2.0, l2_meas_min: float = 0.0, l2_meas_max: float = 1.0): """Computes cost matrix of two sequences using two feature matrices for each sequence. Cosine distance is used for the chroma sequences and euclidean distance is used for the DLNCO sequences. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence (assumed to be normalized). f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence (assumed to be normalized). f_onset1 : np.ndarray [shape=(12, N)] DLNCO feature matrix of the first sequence f_onset2 : np.ndarray [shape=(12, M)] DLNCO feature matrix of the second sequence weights : np.ndarray [shape=[2,]] Weights array for the high-resolution cost computation. weights[0] * cosine_distance + weights[1] * euclidean_distance cos_meas_min : float Cosine distances are shifted to be at least ``cos_meas_min`` cos_meas_max : float Cosine distances are scaled to be at most ``cos_meas_max`` l2_meas_min : float Euclidean distances are shifted to be at least ``l2_meas_min`` l2_meas_max : float Euclidean distances are scaled to be at most ``l2_meas_max`` Returns ------- C: np.ndarray [shape=(N, M)] Cost matrix """ cos_dis = cosine_distance(f_chroma1, f_chroma2, cos_meas_min=cos_meas_min, cos_meas_max=cos_meas_max) euc_dis = euclidean_distance(f_onset1, f_onset2, l2_meas_min=l2_meas_min, l2_meas_max=l2_meas_max) return weights[0] * cos_dis + weights[1] * euc_dis @jit(nopython=True, cache=True) def __C_to_DE(C: np.ndarray = None, dn: np.ndarray = np.array([1, 1, 0], np.int64), dm: np.ndarray = np.array([1, 0, 1], np.int64), dw: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), sub_sequence: bool = False) -> tuple[np.ndarray, np.ndarray]: """This function computes the accumulated cost matrix D and the step index matrix E. Parameters ---------- C : np.ndarray (np.float32 / np.float64) [shape=(N, M)] Cost matrix dn : np.ndarray (np.int64) [shape=(1, S)] Integer array defining valid steps (N direction of C), default: [1, 1, 0] dm : np.ndarray (np.int64) [shape=(1, S)] Integer array defining valid steps (M direction of C), default: [1, 0, 1] dw : np.ndarray (np.float64) [shape=(1, S)] Double array defining the weight of the each step, default: [1.0, 1.0, 1.0] sub_sequence : bool Set `True` for SubSequence DTW, default: False Returns ------- D : np.ndarray (np.float64) [shape=(N, M)] Accumulated cost matrix of type double E : np.ndarray (np.int64) [shape=(N, M)] Step index matrix. E[n, m] holds the index of the step take to determine the value of D[n, m]. If E[n, m] is zero, no valid step was possible. NaNs in the cost matrix are preserved, invalid fields in the cost matrix are NaNs. """ if C is None: raise ValueError('C must be a 2D numpy array.') N, M = C.shape S = dn.size if S != dm.size or S != dw.size: raise ValueError('The parameters dn,dm, and dw must be of equal length.') # calc bounding box size of steps sbbn = np.max(dn) sbbm = np.max(dm) # initialize E E = np.zeros((N, M), np.int64) - 1 # initialize extended D matrix D = np.ones((sbbn + N, sbbm + M), np.float64) * np.inf if sub_sequence: for m in range(M): D[sbbn, sbbm + m] = C[0, m] else: D[sbbn, sbbm] = C[0, 0] # accumulate for m in range(sbbm, M + sbbm): for n in range(sbbn, N + sbbn): for s in range(S): cost = D[n - dn[s], m - dm[s]] + C[n - sbbn, m - sbbm] * dw[s] if cost < D[n, m]: D[n, m] = cost E[n - sbbn, m - sbbm] = s D = D[sbbn: N + sbbn, sbbm: M + sbbm] return D, E @jit(nopython=True, cache=True) def __E_to_warping_path(E: np.ndarray, dn: np.ndarray = np.array([1, 1, 0], np.int64), dm: np.ndarray = np.array([1, 0, 1], np.int64), sub_sequence: bool = False, end_index: int = -1) -> np.ndarray: """This function computes a warping path based on the provided matrix E and the allowed steps. Parameters ---------- E : np.ndarray (np.int64) [shape=(N, M)] Step index matrix dn : np.ndarray (np.int64) [shape=(1, S)] Integer array defining valid steps (N direction of C), default: [1, 1, 0] dm : np.ndarray (np.int64) [shape=(1, S)] Integer array defining valid steps (M direction of C), default: [1, 0, 1] sub_sequence : bool Set `True` for SubSequence DTW, default: False end_index : int In case of SubSequence DTW Returns ------- warping_path : np.ndarray (np.int64) [shape=(2, M)] Resulting optimal warping path """ N, M = E.shape if not sub_sequence and end_index == -1: end_index = M - 1 m = end_index n = N - 1 warping_path = np.zeros((2, n + m + 1)) index = 0 def _loop(m, n, index): warping_path[:, index] = np.array([n, m]) step_index = E[n, m] m -= dm[step_index] n -= dn[step_index] index += 1 return m, n, index if sub_sequence: while n > 0: m, n, index = _loop(m, n, index) else: while m > 0 or n > 0: m, n, index = _loop(m, n, index) warping_path[:, index] = np.array([n, m]) warping_path = warping_path[:, index::-1] return warping_path def compute_warping_path(C: np.ndarray, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int64), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), implementation: str = 'synctoolbox'): """Applies DTW on cost matrix C. Parameters ---------- C : np.ndarray (np.float32 / np.float64) [shape=(N, M)] Cost matrix step_sizes : np.ndarray (np.int64) [shape=(2, S)] Array of step sizes step_weights : np.ndarray (np.float64) [shape=(2, S)] Array of step weights implementation: str Choose among ``synctoolbox`` and ``librosa``. (default: ``synctoolbox``) Returns ------- D : np.ndarray (np.float64) [shape=(N, M)] Accumulated cost matrix E : np.ndarray (np.int64) [shape=(N, M)] Step index matrix wp : np.ndarray (np.int64) [shape=(2, M)] Warping path """ if implementation == 'librosa': D, wp, E = dtw(C=C, step_sizes_sigma=step_sizes, weights_add=np.array([0, 0, 0]), weights_mul=step_weights, return_steps=True, subseq=False) wp = wp[::-1].T elif implementation == 'synctoolbox': dn = step_sizes[:, 0] dm = step_sizes[:, 1] D, E = __C_to_DE(C, dn=dn, dm=dm, dw=step_weights, sub_sequence=False) wp = __E_to_warping_path(E=E, dn=dn, dm=dm, sub_sequence=False) else: raise NotImplementedError(f'No implementation found called {implementation}') return D, E, wp def compute_warping_paths_from_cost_matrices(cost_matrices: List, step_sizes: np.array = np.array([[1, 0], [0, 1], [1, 1]], int), step_weights: np.array = np.array([1.0, 1.0, 1.0], np.float64), implementation: str = 'synctoolbox') -> List: """Computes a path via DTW on each matrix in cost_matrices Parameters ---------- cost_matrices : list List of cost matrices step_sizes : np.ndarray DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) step_weights : np.ndarray DTW step weights (default: np.array([1.0, 1.0, 1.0])) implementation : str Choose among 'synctoolbox' and 'librosa' (default: 'synctoolbox') Returns ------- wp_list : list List of warping paths """ return [compute_warping_path(C=C, step_sizes=step_sizes, step_weights=step_weights, implementation=implementation)[2] for C in cost_matrices] def compute_cost_matrices_between_anchors(f_chroma1: np.ndarray, f_chroma2: np.ndarray, anchors: np.ndarray, f_onset1: np.ndarray = None, f_onset2: np.ndarray = None, alpha: float = 0.5) -> List: """Computes cost matrices for the given features between subsequent pairs of anchors points. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence anchors : np.ndarray [shape=(2, R)] Anchor sequence f_onset1 : np.ndarray [shape=(L, N)] Onset feature matrix of the first sequence f_onset2 : np.ndarray [shape=(L, M)] Onset feature matrix of the second sequence alpha: float Alpha parameter to weight the cost functions. Returns ------- cost_matrices: list List containing cost matrices """ high_res = False if f_onset1 is not None and f_onset2 is not None: high_res = True cost_matrices = list() for k in range(anchors.shape[1] - 1): a1 = np.array(anchors[:, k].astype(int), copy=True) a2 = np.array(anchors[:, k + 1].astype(int), copy=True) if high_res: cost_matrices.append(compute_high_res_cost_matrix(f_chroma1[:, a1[0]: a2[0] + 1], f_chroma2[:, a1[1]: a2[1] + 1], f_onset1[:, a1[0]: a2[0] + 1], f_onset2[:, a1[1]: a2[1] + 1], weights=np.array([alpha, 1-alpha]))) else: cost_matrices.append(cosine_distance(f_chroma1[:, a1[0]: a2[0] + 1], f_chroma2[:, a1[1]: a2[1] + 1])) return cost_matrices def build_path_from_warping_paths(warping_paths: List, anchors: np.ndarray = None) -> np.ndarray: """The function builds a path from a given list of warping paths and the anchors used to obtain these paths. The indices of the original warping paths are adapted such that they cross the anchors. Parameters ---------- warping_paths : list List of warping paths anchors : np.ndarray [shape=(2, N)] Anchor sequence Returns ------- path : np.ndarray [shape=(2, M)] Merged path """ if anchors is None: # When no anchor points are given, we can construct them from the # subpaths in the wp_list # To do this, we assume that the first path's element is the starting # anchor anchors = warping_paths[0][:, 0] # Retrieve the last element of each path anchors_tmp = np.zeros(len(warping_paths), np.float32) for idx, x in enumerate(warping_paths): anchors_tmp[idx] = x[:, -1] # Correct indices, such that the indices of the anchors are given on a # common path. Each anchor a_l = [Nnew_[l+1];Mnew_[l+1]] # Nnew_[l+1] = N_l + N_[l+1] -1 # Mnew_[l+1] = M_l + M_[l+1] -1 anchors_tmp = np.cumsum(anchors_tmp, axis=1) anchors_tmp[:, 1:] = anchors_tmp[:, 1:] - [np.arange(1, anchors_tmp.shape[1]), np.arange(1, anchors_tmp.shape[1])] anchors = np.concatenate([anchors, anchors_tmp], axis=1) L = len(warping_paths) + 1 path = None wp = None for anchor_idx in range(1, L): anchor1 = anchors[:, anchor_idx - 1] anchor2 = anchors[:, anchor_idx] wp = np.array(warping_paths[anchor_idx - 1], copy=True) # correct indices in warpingPath wp += np.repeat(anchor1.reshape(-1, 1), wp.shape[1], axis=1).astype(wp.dtype) # consistency checks assert np.array_equal(wp[:, 0], anchor1), 'First entry of warping path does not coincide with anchor point' assert np.array_equal(wp[:, -1], anchor2), 'Last entry of warping path does not coincide with anchor point' if path is None: path = np.array(wp[:, :-1], copy=True) else: path = np.concatenate([path, wp[:, :-1]], axis=1) # append last index of warping path path = np.concatenate([path, wp[:, -1].reshape(-1, 1)], axis=1) return path def find_anchor_indices_in_warping_path(warping_path: np.ndarray, anchors: np.ndarray) -> np.ndarray: """Compute the indices in the warping path that corresponds to the elements in 'anchors' Parameters ---------- warping_path : np.ndarray [shape=(2, N)] Warping path anchors : np.ndarray [shape=(2, M)] Anchor sequence Returns ------- indices : np.ndarray [shape=(2, M)] Anchor indices in the ``warping_path`` """ indices = np.zeros(anchors.shape[1]) for k in range(anchors.shape[1]): a = anchors[:, k] indices[k] = np.where((a[0] == warping_path[0, :]) & (a[1] == warping_path[1, :]))[0] return indices def make_path_strictly_monotonic(P: np.ndarray) -> np.ndarray: """Compute strict alignment path from a warping path Wrapper around "compute_strict_alignment_path_mask" from libfmp. Parameters ---------- P: np.ndarray [shape=(2, N)] Warping path Returns ------- P_mod: np.ndarray [shape=(2, M)] Strict alignment path, M <= N """ P_mod = compute_strict_alignment_path_mask(P.T) return P_mod.T def compute_strict_alignment_path_mask(P): """Compute strict alignment path from a warping path Notebook: C3/C3S3_MusicAppTempoCurve.ipynb Args: P (list or np.ndarray): Wapring path Returns: P_mod (list or np.ndarray): Strict alignment path """ P = np.array(P, copy=True) N, M = P[-1] # Get indices for strict monotonicity keep_mask = (P[1:, 0] > P[:-1, 0]) & (P[1:, 1] > P[:-1, 1]) # Add first index to enforce start boundary condition keep_mask = np.concatenate(([True], keep_mask)) # Remove all indices for of last row or column keep_mask[(P[:, 0] == N) | (P[:, 1] == M)] = False # Add last index to enforce end boundary condition keep_mask[-1] = True P_mod = P[keep_mask, :] return P_mod def evaluate_synchronized_positions(ground_truth_positions: np.ndarray, synchronized_positions: np.ndarray, tolerances: List = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 250]): """Compute standard evaluation measures for evaluating the quality of synchronized (musical) positions. When synchronizing two versions of a piece of music, one can evaluate the quality of the resulting alignment by comparing errors at musical positions (e.g. beats or measures) that appear in both versions. This function implements two measures: mean absolute error at positions and the percentage of correctly transferred measures given a threshold. Parameters ---------- ground_truth_positions: np.ndarray [shape=N] Positions (e.g. beat or measure positions) annotated in the target version of a piece of music, in milliseconds. synchronized_positions: np.ndarray [shape=N] The same musical positions as in 'ground_truth_positions' obtained by transfer using music synchronization, in milliseconds. tolerances: list of integers Tolerances (in miliseconds) used for comparing annotated and synchronized positions. Returns ------- mean_absolute_error: float Mean absolute error for synchronized positions, in miliseconds. accuracy_at_tolerances: list of floats Percentages of correctly transferred measures, for each entry in 'tolerances'. """ absolute_errors_at_positions = np.abs(synchronized_positions - ground_truth_positions) print('Measure transfer from recording 1 to 2 yielded:') mean_absolute_error = np.mean(absolute_errors_at_positions) print('\nMean absolute error (MAE): %.2fms (standard deviation: %.2fms)' % (mean_absolute_error, np.std(absolute_errors_at_positions))) print('\nAccuracy of transferred positions at different tolerances:') print('\t\t\tAccuracy') print('################################') accuracy_at_tolerances = [] for tolerance in tolerances: accuracy = np.mean((absolute_errors_at_positions < tolerance)) * 100.0 accuracy_at_tolerances.append(accuracy) print('Tolerance: {} ms \t{:.2f} %'.format(tolerance, accuracy)) return mean_absolute_error, accuracy_at_tolerances def smooth_downsample_feature(f_feature: np.ndarray, input_feature_rate: float, win_len_smooth: int = 0, downsamp_smooth: int = 1) -> Tuple[np.ndarray, float]: """Temporal smoothing and downsampling of a feature sequence Parameters ---------- f_feature : np.ndarray Input feature sequence, size dxN input_feature_rate : float Input feature rate in Hz win_len_smooth : int Smoothing window length. For 0, no smoothing is applied. downsamp_smooth : int Downsampling factor. For 1, no downsampling is applied. Returns ------- f_feature_stat : np.ndarray Downsampled & smoothed feature. new_feature_rate : float New feature rate after downsampling """ if win_len_smooth != 0 or downsamp_smooth != 1: # hack to get the same results as on MATLAB stat_window = np.hanning(win_len_smooth+2)[1:-1] stat_window /= np.sum(stat_window) # upfirdn filters and downsamples each column of f_stat_help f_feature_stat = upfirdn(h=stat_window, x=f_feature, up=1, down=downsamp_smooth) seg_num = f_feature.shape[1] stat_num = int(np.ceil(seg_num / downsamp_smooth)) cut = int(np.floor((win_len_smooth - 1) / (2 * downsamp_smooth))) f_feature_stat = f_feature_stat[:, cut: stat_num + cut] else: f_feature_stat = f_feature new_feature_rate = input_feature_rate / downsamp_smooth return f_feature_stat, new_feature_rate @jit(nopython=True) def normalize_feature(feature: np.ndarray, norm_ord: int, threshold: float) -> np.ndarray: """Normalizes a feature sequence according to the l^norm_ord norm. Parameters ---------- feature : np.ndarray Input feature sequence of size d x N d: dimensionality of feature vectors N: number of feature vectors (time in frames) norm_ord : int Norm degree threshold : float If the norm falls below threshold for a feature vector, then the normalized feature vector is set to be the normalized unit vector. Returns ------- f_normalized : np.ndarray Normalized feature sequence """ # TODO rewrite in vectorized fashion d, N = feature.shape f_normalized = np.zeros((d, N)) # normalize the vectors according to the l^norm_ord norm unit_vec = np.ones(d) unit_vec = unit_vec / np.linalg.norm(unit_vec, norm_ord) for k in range(N): cur_norm = np.linalg.norm(feature[:, k], norm_ord) if cur_norm < threshold: f_normalized[:, k] = unit_vec else: f_normalized[:, k] = feature[:, k] / cur_norm return f_normalized class FourHeads(Synchronizer): def __init__( self, pathway_multiscale: int = 32, num_pathway_layers: int = 2, chunk_size: int = 256, hop_length: int = 256, encoder_dim: int = 256, sr: int = 44100, num_heads: int = 4, ffn_dim: int = 128, num_separator_layers: int = 16, num_representation_layers: int = 4, depthwise_conv_kernel_size: int = 31, dropout: float = 0.25, use_group_norm: bool = False, convolution_first: bool = False, labeling=PerformanceLabel(), wiring='tiktok' ): super().__init__(labeling, sr=sr, hop_length=hop_length) self.main = TinyPathway(dilation=1, hop=hop_length, localize=True, n_layers=num_pathway_layers, chunk_size=chunk_size) self.attendant = TinyPathway(dilation=pathway_multiscale, hop=hop_length, localize=False, n_layers=num_pathway_layers, chunk_size=chunk_size) assert self.main.hop == self.attendant.hop # they should output with the same sample rate print('hop in samples:', self.main.hop) self.input_window = self.attendant.input_window self.encoder_dim = encoder_dim self.dropout = nn.Dropout(dropout) # merge two streams into a conformer input self.stream_merger = nn.Sequential(self.dropout, nn.Linear(self.main.out_dim + self.attendant.out_dim, self.encoder_dim)) print('main stream window:', self.main.input_window, ', attendant stream window:', self.attendant.input_window, ', conformer input dim:', self.encoder_dim) center = ((chunk_size - 1) * self.main.hop) # region labeled with pitch track main_overlap = self.main.input_window - center main_overlap = [int(np.floor(main_overlap / 2)), int(np.ceil(main_overlap / 2))] attendant_overlap = self.attendant.input_window - center attendant_overlap = [int(np.floor(attendant_overlap / 2)), int(np.ceil(attendant_overlap / 2))] print('main frame overlap:', main_overlap, ', attendant frame overlap:', attendant_overlap) main_crop_relative = [attendant_overlap[0] - main_overlap[0], main_overlap[1] - attendant_overlap[1]] print('crop for main pathway', main_crop_relative) print("Total sequence duration is", self.attendant.input_window, 'samples') print('Main stream receptive field for one frame is', (self.main.input_window - center), 'samples') print('Attendant stream receptive field for one frame is', (self.attendant.input_window - center), 'samples') self.frame_overlap = attendant_overlap self.main_stream_crop = main_crop_relative self.max_window_size = self.attendant.input_window self.chunk_size = chunk_size self.separator_stream = nn.ModuleList( # source-separation, reinvented [ ConformerLayer( input_dim=self.encoder_dim, ffn_dim=ffn_dim, num_attention_heads=num_heads, depthwise_conv_kernel_size=depthwise_conv_kernel_size, dropout=dropout, use_group_norm=use_group_norm, convolution_first=convolution_first, ) for _ in range(num_separator_layers) ] ) self.f0_stream = nn.ModuleList( [ ConformerLayer( input_dim=self.encoder_dim, ffn_dim=ffn_dim, num_attention_heads=num_heads, depthwise_conv_kernel_size=depthwise_conv_kernel_size, dropout=dropout, use_group_norm=use_group_norm, convolution_first=convolution_first, ) for _ in range(num_representation_layers) ] ) self.f0_head = nn.Linear(self.encoder_dim, len(self.labeling.f0_centers_c)) self.note_stream = nn.ModuleList( [ ConformerLayer( input_dim=self.encoder_dim, ffn_dim=ffn_dim, num_attention_heads=num_heads, depthwise_conv_kernel_size=depthwise_conv_kernel_size, dropout=dropout, use_group_norm=use_group_norm, convolution_first=convolution_first, ) for _ in range(num_representation_layers) ] ) self.note_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) self.onset_stream = nn.ModuleList( [ ConformerLayer( input_dim=self.encoder_dim, ffn_dim=ffn_dim, num_attention_heads=num_heads, depthwise_conv_kernel_size=depthwise_conv_kernel_size, dropout=dropout, use_group_norm=use_group_norm, convolution_first=convolution_first, ) for _ in range(num_representation_layers) ] ) self.onset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) self.offset_stream = nn.ModuleList( [ ConformerLayer( input_dim=self.encoder_dim, ffn_dim=ffn_dim, num_attention_heads=num_heads, depthwise_conv_kernel_size=depthwise_conv_kernel_size, dropout=dropout, use_group_norm=use_group_norm, convolution_first=convolution_first, ) for _ in range(num_representation_layers) ] ) self.offset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) self.labeling = labeling self.double_merger = nn.Sequential(self.dropout, nn.Linear(2 * self.encoder_dim, self.encoder_dim)) self.triple_merger = nn.Sequential(self.dropout, nn.Linear(3 * self.encoder_dim, self.encoder_dim)) self.wiring = wiring print('Total parameter count: ', self.count_parameters()) def count_parameters(self) -> int: """ Count parameters of encoder """ return sum([p.numel() for p in self.parameters()]) def stream(self, x, representation, key_padding_mask=None): for i, layer in enumerate(self.__getattr__('{}_stream'.format(representation))): x = layer(x, key_padding_mask) return x def head(self, x, representation): return self.__getattr__('{}_head'.format(representation))(x) def forward(self, x, key_padding_mask=None): # two auditory streams followed by the separator stream to ensure timbre-awareness x_attendant = self.attendant(x) x_main = self.main(x[:, self.main_stream_crop[0]:self.main_stream_crop[1]]) x = self.stream_merger(torch_cat((x_attendant, x_main), -1).squeeze(1)) x = self.stream(x, 'separator', key_padding_mask) f0 = self.stream(x, 'f0', key_padding_mask) # they say this is a low level feature :) if self.wiring == 'parallel': note = self.stream(x, 'note', key_padding_mask) onset = self.stream(x, 'onset', key_padding_mask) offset = self.stream(x, 'offset', key_padding_mask) elif self.wiring == 'tiktok': onset = self.stream(x, 'onset', key_padding_mask) offset = self.stream(x, 'offset', key_padding_mask) # f0 is disconnected, note relies on separator, onset, and offset note = self.stream(self.triple_merger(torch_cat((x, onset, offset), -1)), 'note', key_padding_mask) elif self.wiring == 'tiktok2': onset = self.stream(x, 'onset', key_padding_mask) offset = self.stream(x, 'offset', key_padding_mask) # note is connected to f0, onset, and offset note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask) elif self.wiring == 'spotify': # note is connected to f0 only note = self.stream(f0, 'note', key_padding_mask) # here onset and onsets are higher-level features informed by the separator and note onset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'onset', key_padding_mask) offset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'offset', key_padding_mask) else: # onset and offset are connected to f0 and separator streams onset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'onset', key_padding_mask) offset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'offset', key_padding_mask) # note is connected to f0, onset, and offset streams note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask) return {'f0': self.head(f0, 'f0'), 'note': self.head(note, 'note'), 'onset': self.head(onset, 'onset'), 'offset': self.head(offset, 'offset')} class PretrainedModel(FourHeads): def __init__(self,model_json:dict,model:str,device): super().__init__(pathway_multiscale=model_json['pathway_multiscale'],num_pathway_layers=model_json['num_pathway_layers'], wiring=model_json['wiring'],hop_length=model_json['hop_length'], chunk_size=model_json['chunk_size'],labeling=PerformanceLabel(note_min=model_json['note_low'], note_max=model_json['note_high'],f0_bins_per_semitone=model_json['f0_bins_per_semitone'],f0_tolerance_c=200,f0_smooth_std_c=model_json['f0_smooth_std_c'], onset_smooth_std=model_json['onset_smooth_std']), sr=model_json['sampling_rate']) self.load_state_dict(torch_load(model, map_location=device,weights_only=True)) self.eval() def merge_violin_tracks(self,mid:MidiFile): new_mid = MidiFile(ticks_per_beat=mid.ticks_per_beat) new_track = MidiTrack() new_mid.tracks.append(new_track) events = [] for track in mid.tracks: current_time = 0 for msg in track: current_time += msg.time events.append((current_time, msg)) events.sort(key=lambda x: x[0]) last_time = 0 for event_time, msg in events: delta_time = event_time - last_time new_track.append(msg.copy(time=delta_time)) last_time = event_time for track in mid.tracks: for msg in track: if msg.type == 'set_tempo': new_track.insert(0, msg) return new_mid def transcribe_music(self, audio, batch_size, postprocessing): self.transcribe(audio, batch_size, postprocessing).write("output.mid") self.merge_violin_tracks(MidiFile("output.mid")).save("output.mid") return "output.mid"