Audio2Melody / utils_violin_transcript.py
shethjenil's picture
Upload 3 files
f8b3075 verified
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.cm import ScalarMappable
from matplotlib.patches import Rectangle
from librosa.sequence import viterbi_discriminative , dtw
from librosa import note_to_hz,midi_to_hz
from numba import jit
from scipy.stats import norm
from scipy.ndimage import gaussian_filter1d
from scipy.signal import medfilt ,upfirdn,argrelmax
from torchaudio.models.conformer import ConformerLayer
from torchaudio import load as torchaudio_load
from torchaudio.functional import resample as torchaudio_functional_resample
from torch import cat as torch_cat , load as torch_load ,Tensor as torch_Tensor , from_numpy as torch_from_numpy,no_grad as torch_no_grad ,mean as torch_mean,std as torch_std,sigmoid as torch_sigmoid,nan_to_num as torch_nan_to_num,nn
from sklearn.metrics.pairwise import euclidean_distances
from mir_eval.melody import hz2cents
from pretty_midi import PrettyMIDI , Instrument , Note , PitchBend , instrument_name_to_program ,note_name_to_number
from time import perf_counter
from collections import defaultdict
from typing import DefaultDict, Dict, List, Optional, Tuple
from pathlib import Path
from mido import MidiFile,MidiTrack
class PitchEstimator(nn.Module):
"""
This is the base class that everything else inherits from. The hierarchy is:
PitchEstimator -> Transcriber -> Synchronizer -> AutonomousAgent -> The n-Head Music Performance Analysis Models
PitchEstimator can handle reading the audio, predicting all the features,
estimating a single frame level f0 using viterbi, or
MIDI pitch bend creation for the predicted note events when used inside a Transcriber, or
score-informed f0 estimation when used inside a Synchronizer.
"""
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160):
super().__init__()
self.labeling = labeling
self.sr = sr
self.window_size = window_size
self.hop_length = hop_length
self.instrument = instrument
self.f0_bins_per_semitone = int(np.round(100/self.labeling.f0_granularity_c))
def read_audio(self, audio):
"""
Read and resample an audio file, convert to mono, and unfold into representation frames.
The time array represents the center of each small frame with 5.8ms hop length. This is different than the chunk
level frames. The chunk level frames represent the entire sequence the model sees. Whereas it predicts with the
small frames intervals (5.8ms).
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor
:return: frames: (n_big_frames, frame_length), times: (n_small_frames,)
"""
if isinstance(audio, str) or isinstance(audio, Path):
audio, sample_rate = torchaudio_load(audio, normalize=True)
audio = audio.mean(axis=0) # convert to mono
if sample_rate != self.sr:
audio = torchaudio_functional_resample(audio, sample_rate, self.sr)
elif isinstance(audio, np.ndarray):
audio = torch_from_numpy(audio)
else:
assert isinstance(audio, torch_Tensor)
len_audio = audio.shape[-1]
n_frames = int(np.ceil((len_audio + sum(self.frame_overlap)) / (self.hop_length * self.chunk_size)))
audio = nn.functional.pad(audio, (self.frame_overlap[0],
self.frame_overlap[1] + (n_frames * self.hop_length * self.chunk_size) - len_audio))
frames = audio.unfold(0, self.max_window_size, self.hop_length*self.chunk_size)
times = np.arange(0, len_audio, self.hop_length) / self.sr # not tensor, we don't compute anything with it
return frames, times
def predict(self, audio, batch_size):
frames, times = self.read_audio(audio)
performance = {'f0': [], 'note': [], 'onset': [], 'offset': []}
self.eval()
device = self.main.conv0.conv2d.weight.device
with torch_no_grad():
for i in range(0, len(frames), batch_size):
f = frames[i:min(i + batch_size, len(frames))].to(device)
f -= (torch_mean(f, axis=1).unsqueeze(-1))
f /= (torch_std(f, axis=1).unsqueeze(-1))
out = self.forward(f)
for key, value in out.items():
value = torch_sigmoid(value)
value = torch_nan_to_num(value) # the model outputs nan when the frame is silent (this is an expected behavior due to normalization)
value = value.view(-1, value.shape[-1])
value = value.detach().cpu().numpy()
performance[key].append(value)
performance = {key: np.concatenate(value, axis=0)[:len(times)] for key, value in performance.items()}
performance['time'] = times
return performance
def estimate_pitch(self, audio, batch_size, viterbi=False):
out = self.predict(audio, batch_size)
f0_hz = self.out2f0(out, viterbi)
return out['time'], f0_hz
def out2f0(self, out, viterbi=False):
"""
Monophonic f0 estimation from the model output. The viterbi postprocessing is specialized for the violin family.
"""
salience = out['f0']
if viterbi == 'constrained':
assert hasattr(self, 'out2note')
notes = spotify_create_notes( out["note"], out["onset"], note_low=self.labeling.midi_centers[0],
note_high=self.labeling.midi_centers[-1], onset_thresh=0.5, frame_thresh=0.3,
infer_onsets=True, melodia_trick=True,
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))))
note_cents = self.get_pitch_bends(salience, notes, to_midi=False, timing_refinement_range=0)
cents = np.zeros_like(out['time'])
cents[note_cents[:,0].astype(int)] = note_cents[:,1]
elif viterbi:
# transition probabilities inducing continuous pitch
# big changes are penalized with one order of magnitude
transition = gaussian_filter1d(np.eye(self.labeling.f0_n_bins), 30) + 99 * gaussian_filter1d(
np.eye(self.labeling.f0_n_bins), 2)
transition = transition / np.sum(transition, axis=1)[:, None]
p = salience / salience.sum(axis=1)[:, None]
p[np.isnan(p.sum(axis=1)), :] = np.ones(self.labeling.f0_n_bins) * 1 / self.labeling.f0_n_bins
path = viterbi_discriminative(p.T, transition)
cents = np.array([self.labeling.f0_label2c(salience[i, :], path[i]) for i in range(len(path))])
else:
cents = self.labeling.f0_label2c(salience, center=None) # use argmax for center
f0_hz = self.labeling.f0_c2hz(cents)
f0_hz[np.isnan(f0_hz)] = 0
return f0_hz
def get_pitch_bends(
self,
contours: np.ndarray, note_events: List[Tuple[int, int, int, float]],
timing_refinement_range: int = 0, to_midi: bool = True,
) -> List[Tuple[int, int, int, float, Optional[List[int]]]]:
"""Modified version of an excellent script from Spotify/basic_pitch!! Thank you!!!!
Given note events and contours, estimate pitch bends per note.
Pitch bends are represented as a sequence of evenly spaced midi pitch bend control units.
The time stamps of each pitch bend can be inferred by computing an evenly spaced grid between
the start and end times of each note event.
Args:
contours: Matrix of estimated pitch contours
note_events: note event tuple
timing_refinement_range: if > 0, refine onset/offset boundaries with f0 confidence
to_midi: whether to convert pitch bends to midi pitch bends. If False, return pitch estimates in the format
[time (index), pitch (Hz), confidence in range [0, 1]].
Returns:
note events with pitch bends
"""
f0_matrix = [] # [time (index), pitch (Hz), confidence in range [0, 1]]
note_events_with_pitch_bends = []
for start_idx, end_idx, pitch_midi, amplitude in note_events:
if timing_refinement_range:
start_idx = np.max([0, start_idx - timing_refinement_range])
end_idx = np.min([contours.shape[0], end_idx + timing_refinement_range])
freq_idx = int(np.round(self.midi_pitch_to_contour_bin(pitch_midi)))
freq_start_idx = np.max([freq_idx - self.labeling.f0_tolerance_bins, 0])
freq_end_idx = np.min([self.labeling.f0_n_bins, freq_idx + self.labeling.f0_tolerance_bins + 1])
trans_start_idx = np.max([0, self.labeling.f0_tolerance_bins - freq_idx])
trans_end_idx = (2 * self.labeling.f0_tolerance_bins + 1) - \
np.max([0, freq_idx - (self.labeling.f0_n_bins - self.labeling.f0_tolerance_bins - 1)])
# apply regional viterbi to estimate the intonation
# observation probabilities come from the f0_roll matrix
observation = contours[start_idx:end_idx, freq_start_idx:freq_end_idx]
observation = observation / observation.sum(axis=1)[:, None]
observation[np.isnan(observation.sum(axis=1)), :] = np.ones(freq_end_idx - freq_start_idx) * 1 / (
freq_end_idx - freq_start_idx)
# transition probabilities assure continuity
transition = self.labeling.f0_transition_matrix[trans_start_idx:trans_end_idx,
trans_start_idx:trans_end_idx] + 1e-6
transition = transition / np.sum(transition, axis=1)[:, None]
path = viterbi_discriminative(observation.T / observation.sum(axis=1), transition) + freq_start_idx
cents = np.array([self.labeling.f0_label2c(contours[i + start_idx, :], path[i]) for i in range(len(path))])
bends = cents - self.labeling.midi_centers_c[pitch_midi - self.labeling.midi_centers[0]]
if to_midi:
bends = (bends * 4096 / 100).astype(int)
bends[bends > 8191] = 8191
bends[bends < -8192] = -8192
if timing_refinement_range:
confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))])
threshold = np.median(confidences)
threshold = (np.median(confidences > threshold) + threshold) / 2 # some magic
median_kernel = 2 * (timing_refinement_range // 2) + 1 # some more magic
confidences = medfilt(confidences, kernel_size=median_kernel)
conf_bool = confidences > threshold
onset_idx = np.argmax(conf_bool)
offset_idx = len(confidences) - np.argmax(conf_bool[::-1])
bends = bends[onset_idx:offset_idx]
start_idx = start_idx + onset_idx
end_idx = start_idx + offset_idx
note_events_with_pitch_bends.append((start_idx, end_idx, pitch_midi, amplitude, bends))
else:
confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))])
time_idx = np.arange(len(path)) + start_idx
# f0_hz = self.labeling.f0_c2hz(cents)
possible_f0s = np.array([time_idx, cents, confidences]).T
f0_matrix.append(possible_f0s[np.abs(bends)<100]) # filter out pitch bends that are too large
if not to_midi:
return np.vstack(f0_matrix)
else:
return note_events_with_pitch_bends
def midi_pitch_to_contour_bin(self, pitch_midi: int) -> np.array:
"""Convert midi pitch to corresponding index in contour matrix
Args:
pitch_midi: pitch in midi
Returns:
index in contour matrix
"""
pitch_hz = midi_to_hz(pitch_midi)
return np.argmin(np.abs(self.labeling.f0_centers_hz - pitch_hz))
# SPOTIFY
def get_inferred_onsets(onset_roll: np.array, note_roll: np.array, n_diff: int = 2) -> np.array:
"""
Infer onsets from large changes in note roll matrix amplitudes.
Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py
:param onset_roll: Onset activation matrix (n_times, n_freqs).
:param note_roll: Frame-level note activation matrix (n_times, n_freqs).
:param n_diff: Differences used to detect onsets.
:return: The maximum between the predicted onsets and its differences.
"""
diffs = []
for n in range(1, n_diff + 1):
frames_appended = np.concatenate([np.zeros((n, note_roll.shape[1])), note_roll])
diffs.append(frames_appended[n:, :] - frames_appended[:-n, :])
frame_diff = np.min(diffs, axis=0)
frame_diff[frame_diff < 0] = 0
frame_diff[:n_diff, :] = 0
frame_diff = np.max(onset_roll) * frame_diff / np.max(frame_diff) # rescale to have the same max as onsets
max_onsets_diff = np.max([onset_roll, frame_diff],
axis=0) # use the max of the predicted onsets and the differences
return max_onsets_diff
def spotify_create_notes(
note_roll: np.array,
onset_roll: np.array,
onset_thresh: float,
frame_thresh: float,
min_note_len: int,
infer_onsets: bool,
note_low : int, #self.labeling.midi_centers[0]
note_high : int, #self.labeling.midi_centers[-1],
melodia_trick: bool = True,
energy_tol: int = 11,
) -> List[Tuple[int, int, int, float]]:
"""Decode raw model output to polyphonic note events
Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py
Args:
note_roll: Frame activation matrix (n_times, n_freqs).
onset_roll: Onset activation matrix (n_times, n_freqs).
onset_thresh: Minimum amplitude of an onset activation to be considered an onset.
frame_thresh: Minimum amplitude of a frame activation for a note to remain "on".
min_note_len: Minimum allowed note length in frames.
infer_onsets: If True, add additional onsets when there are large differences in frame amplitudes.
melodia_trick : Whether to use the melodia trick to better detect notes.
energy_tol: Drop notes below this energy.
Returns:
list of tuples [(start_time_frames, end_time_frames, pitch_midi, amplitude)]
representing the note events, where amplitude is a number between 0 and 1
"""
n_frames = note_roll.shape[0]
# use onsets inferred from frames in addition to the predicted onsets
if infer_onsets:
onset_roll = get_inferred_onsets(onset_roll, note_roll)
peak_thresh_mat = np.zeros(onset_roll.shape)
peaks = argrelmax(onset_roll, axis=0)
peak_thresh_mat[peaks] = onset_roll[peaks]
onset_idx = np.where(peak_thresh_mat >= onset_thresh)
onset_time_idx = onset_idx[0][::-1] # sort to go backwards in time
onset_freq_idx = onset_idx[1][::-1] # sort to go backwards in time
remaining_energy = np.zeros(note_roll.shape)
remaining_energy[:, :] = note_roll[:, :]
# loop over onsets
note_events = []
for note_start_idx, freq_idx in zip(onset_time_idx, onset_freq_idx):
# if we're too close to the end of the audio, continue
if note_start_idx >= n_frames - 1:
continue
# find time index at this frequency band where the frames drop below an energy threshold
i = note_start_idx + 1
k = 0 # number of frames since energy dropped below threshold
while i < n_frames - 1 and k < energy_tol:
if remaining_energy[i, freq_idx] < frame_thresh:
k += 1
else:
k = 0
i += 1
i -= k # go back to frame above threshold
# if the note is too short, skip it
if i - note_start_idx <= min_note_len:
continue
remaining_energy[note_start_idx:i, freq_idx] = 0
if freq_idx < note_high:
remaining_energy[note_start_idx:i, freq_idx + 1] = 0
if freq_idx > note_low:
remaining_energy[note_start_idx:i, freq_idx - 1] = 0
# add the note
amplitude = np.mean(note_roll[note_start_idx:i, freq_idx])
note_events.append(
(
note_start_idx,
i,
freq_idx + note_low,
amplitude,
)
)
if melodia_trick:
energy_shape = remaining_energy.shape
while np.max(remaining_energy) > frame_thresh:
i_mid, freq_idx = np.unravel_index(np.argmax(remaining_energy), energy_shape)
remaining_energy[i_mid, freq_idx] = 0
# forward pass
i = i_mid + 1
k = 0
while i < n_frames - 1 and k < energy_tol:
if remaining_energy[i, freq_idx] < frame_thresh:
k += 1
else:
k = 0
remaining_energy[i, freq_idx] = 0
if freq_idx < note_high:
remaining_energy[i, freq_idx + 1] = 0
if freq_idx > note_low:
remaining_energy[i, freq_idx - 1] = 0
i += 1
i_end = i - 1 - k # go back to frame above threshold
# backward pass
i = i_mid - 1
k = 0
while i > 0 and k < energy_tol:
if remaining_energy[i, freq_idx] < frame_thresh:
k += 1
else:
k = 0
remaining_energy[i, freq_idx] = 0
if freq_idx < note_high:
remaining_energy[i, freq_idx + 1] = 0
if freq_idx > note_low:
remaining_energy[i, freq_idx - 1] = 0
i -= 1
i_start = i + 1 + k # go back to frame above threshold
assert i_start >= 0, "{}".format(i_start)
assert i_end < n_frames
if i_end - i_start <= min_note_len:
# note is too short, skip it
continue
# add the note
amplitude = np.mean(note_roll[i_start:i_end, freq_idx])
note_events.append(
(
i_start,
i_end,
freq_idx + note_low,
amplitude,
)
)
return note_events
# TIKTOK
def note_detection_with_onset_offset_regress(frame_output, onset_output,
onset_shift_output, offset_output, offset_shift_output, velocity_output,
frame_threshold):
"""Process prediction matrices to note events information.
First, detect onsets with onset outputs. Then, detect offsets
with frame and offset outputs.
Args:
frame_output: (frames_num,)
onset_output: (frames_num,)
onset_shift_output: (frames_num,)
offset_output: (frames_num,)
offset_shift_output: (frames_num,)
velocity_output: (frames_num,)
frame_threshold: float
Returns:
output_tuples: list of [bgn, fin, onset_shift, offset_shift, normalized_velocity],
e.g., [
[1821, 1909, 0.47498, 0.3048533, 0.72119445],
[1909, 1947, 0.30730522, -0.45764327, 0.64200014],
...]
"""
output_tuples = []
bgn = None
frame_disappear = None
offset_occur = None
for i in range(onset_output.shape[0]):
if onset_output[i] == 1:
"""Onset detected"""
if bgn:
"""Consecutive onsets. E.g., pedal is not released, but two
consecutive notes being played."""
fin = max(i - 1, 0)
output_tuples.append([bgn, fin, onset_shift_output[bgn],
0, velocity_output[bgn]])
frame_disappear, offset_occur = None, None
bgn = i
if bgn and i > bgn:
"""If onset found, then search offset"""
if frame_output[i] <= frame_threshold and not frame_disappear:
"""Frame disappear detected"""
frame_disappear = i
if offset_output[i] == 1 and not offset_occur:
"""Offset detected"""
offset_occur = i
if frame_disappear:
if offset_occur and offset_occur - bgn > frame_disappear - offset_occur:
"""bgn --------- offset_occur --- frame_disappear"""
fin = offset_occur
else:
"""bgn --- offset_occur --------- frame_disappear"""
fin = frame_disappear
output_tuples.append([bgn, fin, onset_shift_output[bgn],
offset_shift_output[fin], velocity_output[bgn]])
bgn, frame_disappear, offset_occur = None, None, None
if bgn and (i - bgn >= 600 or i == onset_output.shape[0] - 1):
"""Offset not detected"""
fin = i
output_tuples.append([bgn, fin, onset_shift_output[bgn],
offset_shift_output[fin], velocity_output[bgn]])
bgn, frame_disappear, offset_occur = None, None, None
# Sort pairs by onsets
output_tuples.sort(key=lambda pair: pair[0])
return output_tuples
class RegressionPostProcessor(object):
def __init__(self, frames_per_second, classes_num, onset_threshold,
offset_threshold, frame_threshold, pedal_offset_threshold,
begin_note):
"""Postprocess the output probabilities of a transription model to MIDI
events.
Args:
frames_per_second: float
classes_num: int
onset_threshold: float
offset_threshold: float
frame_threshold: float
pedal_offset_threshold: float
"""
self.frames_per_second = frames_per_second
self.classes_num = classes_num
self.onset_threshold = onset_threshold
self.offset_threshold = offset_threshold
self.frame_threshold = frame_threshold
self.pedal_offset_threshold = pedal_offset_threshold
self.begin_note = begin_note
self.velocity_scale = 128
def output_dict_to_midi_events(self, output_dict):
"""Main function. Post process model outputs to MIDI events.
Args:
output_dict: {
'reg_onset_output': (segment_frames, classes_num),
'reg_offset_output': (segment_frames, classes_num),
'frame_output': (segment_frames, classes_num),
'velocity_output': (segment_frames, classes_num),
'reg_pedal_onset_output': (segment_frames, 1),
'reg_pedal_offset_output': (segment_frames, 1),
'pedal_frame_output': (segment_frames, 1)}
Outputs:
est_note_events: list of dict, e.g. [
{'onset_time': 39.74, 'offset_time': 39.87, 'midi_note': 27, 'velocity': 83},
{'onset_time': 11.98, 'offset_time': 12.11, 'midi_note': 33, 'velocity': 88}]
est_pedal_events: list of dict, e.g. [
{'onset_time': 0.17, 'offset_time': 0.96},
{'osnet_time': 1.17, 'offset_time': 2.65}]
"""
output_dict['frame_output'] = output_dict['note']
output_dict['velocity_output'] = output_dict['note']
output_dict['reg_onset_output'] = output_dict['onset']
output_dict['reg_offset_output'] = output_dict['offset']
# Post process piano note outputs to piano note and pedal events information
(est_on_off_note_vels, est_pedal_on_offs) = \
self.output_dict_to_note_pedal_arrays(output_dict)
"""est_on_off_note_vels: (events_num, 4), the four columns are: [onset_time, offset_time, piano_note, velocity],
est_pedal_on_offs: (pedal_events_num, 2), the two columns are: [onset_time, offset_time]"""
# Reformat notes to MIDI events
est_note_events = self.detected_notes_to_events(est_on_off_note_vels)
if est_pedal_on_offs is None:
est_pedal_events = None
else:
est_pedal_events = self.detected_pedals_to_events(est_pedal_on_offs)
return est_note_events, est_pedal_events
def output_dict_to_note_pedal_arrays(self, output_dict):
"""Postprocess the output probabilities of a transription model to MIDI
events.
Args:
output_dict: dict, {
'reg_onset_output': (frames_num, classes_num),
'reg_offset_output': (frames_num, classes_num),
'frame_output': (frames_num, classes_num),
'velocity_output': (frames_num, classes_num),
...}
Returns:
est_on_off_note_vels: (events_num, 4), the 4 columns are onset_time,
offset_time, piano_note and velocity. E.g. [
[39.74, 39.87, 27, 0.65],
[11.98, 12.11, 33, 0.69],
...]
est_pedal_on_offs: (pedal_events_num, 2), the 2 columns are onset_time
and offset_time. E.g. [
[0.17, 0.96],
[1.17, 2.65],
...]
"""
# ------ 1. Process regression outputs to binarized outputs ------
# For example, onset or offset of [0., 0., 0.15, 0.30, 0.40, 0.35, 0.20, 0.05, 0., 0.]
# will be processed to [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.]
# Calculate binarized onset output from regression output
(onset_output, onset_shift_output) = \
self.get_binarized_output_from_regression(
reg_output=output_dict['reg_onset_output'],
threshold=self.onset_threshold, neighbour=2)
output_dict['onset_output'] = onset_output # Values are 0 or 1
output_dict['onset_shift_output'] = onset_shift_output
# Calculate binarized offset output from regression output
(offset_output, offset_shift_output) = \
self.get_binarized_output_from_regression(
reg_output=output_dict['reg_offset_output'],
threshold=self.offset_threshold, neighbour=4)
output_dict['offset_output'] = offset_output # Values are 0 or 1
output_dict['offset_shift_output'] = offset_shift_output
if 'reg_pedal_onset_output' in output_dict.keys():
"""Pedal onsets are not used in inference. Instead, frame-wise pedal
predictions are used to detect onsets. We empirically found this is
more accurate to detect pedal onsets."""
pass
if 'reg_pedal_offset_output' in output_dict.keys():
# Calculate binarized pedal offset output from regression output
(pedal_offset_output, pedal_offset_shift_output) = \
self.get_binarized_output_from_regression(
reg_output=output_dict['reg_pedal_offset_output'],
threshold=self.pedal_offset_threshold, neighbour=4)
output_dict['pedal_offset_output'] = pedal_offset_output # Values are 0 or 1
output_dict['pedal_offset_shift_output'] = pedal_offset_shift_output
# ------ 2. Process matrices results to event results ------
# Detect piano notes from output_dict
est_on_off_note_vels = self.output_dict_to_detected_notes(output_dict)
est_pedal_on_offs = None
return est_on_off_note_vels, est_pedal_on_offs
def get_binarized_output_from_regression(self, reg_output, threshold, neighbour):
"""Calculate binarized output and shifts of onsets or offsets from the
regression results.
Args:
reg_output: (frames_num, classes_num)
threshold: float
neighbour: int
Returns:
binary_output: (frames_num, classes_num)
shift_output: (frames_num, classes_num)
"""
binary_output = np.zeros_like(reg_output)
shift_output = np.zeros_like(reg_output)
(frames_num, classes_num) = reg_output.shape
for k in range(classes_num):
x = reg_output[:, k]
for n in range(neighbour, frames_num - neighbour):
if x[n] > threshold and self.is_monotonic_neighbour(x, n, neighbour):
binary_output[n, k] = 1
"""See Section III-D in [1] for deduction.
[1] Q. Kong, et al., High-resolution Piano Transcription
with Pedals by Regressing Onsets and Offsets Times, 2020."""
if x[n - 1] > x[n + 1]:
shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n + 1]) / 2
else:
shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n - 1]) / 2
shift_output[n, k] = shift
return binary_output, shift_output
def is_monotonic_neighbour(self, x, n, neighbour):
"""Detect if values are monotonic in both side of x[n].
Args:
x: (frames_num,)
n: int
neighbour: int
Returns:
monotonic: bool
"""
monotonic = True
for i in range(neighbour):
if x[n - i] < x[n - i - 1]:
monotonic = False
if x[n + i] < x[n + i + 1]:
monotonic = False
return monotonic
def output_dict_to_detected_notes(self, output_dict):
"""Postprocess output_dict to piano notes.
Args:
output_dict: dict, e.g. {
'onset_output': (frames_num, classes_num),
'onset_shift_output': (frames_num, classes_num),
'offset_output': (frames_num, classes_num),
'offset_shift_output': (frames_num, classes_num),
'frame_output': (frames_num, classes_num),
'onset_output': (frames_num, classes_num),
...}
Returns:
est_on_off_note_vels: (notes, 4), the four columns are onsets, offsets,
MIDI notes and velocities. E.g.,
[[39.7375, 39.7500, 27., 0.6638],
[11.9824, 12.5000, 33., 0.6892],
...]
"""
est_tuples = []
est_midi_notes = []
classes_num = output_dict['frame_output'].shape[-1]
for piano_note in range(classes_num):
"""Detect piano notes"""
est_tuples_per_note = note_detection_with_onset_offset_regress(
frame_output=output_dict['frame_output'][:, piano_note],
onset_output=output_dict['onset_output'][:, piano_note],
onset_shift_output=output_dict['onset_shift_output'][:, piano_note],
offset_output=output_dict['offset_output'][:, piano_note],
offset_shift_output=output_dict['offset_shift_output'][:, piano_note],
velocity_output=output_dict['velocity_output'][:, piano_note],
frame_threshold=self.frame_threshold)
est_tuples += est_tuples_per_note
est_midi_notes += [piano_note + self.begin_note] * len(est_tuples_per_note)
est_tuples = np.array(est_tuples) # (notes, 5)
"""(notes, 5), the five columns are onset, offset, onset_shift,
offset_shift and normalized_velocity"""
est_midi_notes = np.array(est_midi_notes) # (notes,)
onset_times = (est_tuples[:, 0] + est_tuples[:, 2]) / self.frames_per_second
offset_times = (est_tuples[:, 1] + est_tuples[:, 3]) / self.frames_per_second
velocities = est_tuples[:, 4]
est_on_off_note_vels = np.stack((onset_times, offset_times, est_midi_notes, velocities), axis=-1)
"""(notes, 3), the three columns are onset_times, offset_times and velocity."""
est_on_off_note_vels = est_on_off_note_vels.astype(np.float32)
return est_on_off_note_vels
def detected_notes_to_events(self, est_on_off_note_vels):
"""Reformat detected notes to midi events.
Args:
est_on_off_vels: (notes, 3), the three columns are onset_times,
offset_times and velocity. E.g.
[[32.8376, 35.7700, 0.7932],
[37.3712, 39.9300, 0.8058],
...]
Returns:
midi_events, list, e.g.,
[{'onset_time': 39.7376, 'offset_time': 39.75, 'midi_note': 27, 'velocity': 84},
{'onset_time': 11.9824, 'offset_time': 12.50, 'midi_note': 33, 'velocity': 88},
...]
"""
midi_events = []
for i in range(est_on_off_note_vels.shape[0]):
midi_events.append({
'onset_time': est_on_off_note_vels[i][0],
'offset_time': est_on_off_note_vels[i][1],
'midi_note': int(est_on_off_note_vels[i][2]),
'velocity': int(est_on_off_note_vels[i][3] * self.velocity_scale)})
return midi_events
def sync_visualize_step1(cost_matrices: List,
num_rows: int,
num_cols: int,
anchors: np.ndarray,
wp: np.ndarray) -> Tuple[plt.Figure, plt.Axes]:
fig, ax = plt.subplots(1, 1, dpi=72)
ax = __visualize_cost_matrices(ax, cost_matrices)
__visualize_constraint_rectangles(anchors[[1, 0], :],
edgecolor='firebrick')
__visualize_path_in_matrix(ax=ax,
wp=wp,
axisX=np.arange(0, num_rows),
axisY=np.arange(0, num_cols),
path_color='firebrick')
return fig, ax
def sync_visualize_step2(ax: plt.Axes,
cost_matrices: list,
wp_step2: np.ndarray,
wp_step1: np.ndarray,
num_rows_step1: int,
num_cols_step1: int,
anchors_step1: np.ndarray,
neighboring_anchors: np.ndarray,
plot_title: str = ""):
offset_x = neighboring_anchors[0, 0] - 1
offset_y = neighboring_anchors[1, 0] - 1
ax = __visualize_cost_matrices(ax=ax,
cost_matrices=cost_matrices,
offset_x=offset_x,
offset_y=offset_y)
__visualize_constraint_rectangles(anchors_step1[[1, 0], :],
edgecolor='firebrick')
__visualize_path_in_matrix(ax=ax,
wp=wp_step1,
axisX=np.arange(0, num_rows_step1),
axisY=np.arange(0, num_cols_step1),
path_color='firebrick')
__visualize_constraint_rectangles(neighboring_anchors[[1, 0], :] - 1,
edgecolor='orangered',
linestyle='--')
__visualize_path_in_matrix(ax=ax,
wp=wp_step2,
axisX=np.arange(0, num_rows_step1),
axisY=np.arange(0, num_cols_step1),
path_color='orangered')
ax.set_title(plot_title)
ax.set_ylabel("Version 1 (frames)")
ax.set_xlabel("Version 2 (frames)")
ax = plt.gca() # get the current axes
pcm = None
for pcm in ax.get_children():
if isinstance(pcm, ScalarMappable):
break
plt.colorbar(pcm, ax=ax)
plt.tight_layout()
plt.show()
def __size_dtw_matrices(dtw_matrices: List) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Gives information about the dimensionality of a DTW matrix
given in form of a list matrix
Parameters
----------
dtw_matrices: list
The DTW matrix (cost matrix or accumulated cost matrix) given in form a list.
Returns
-------
axisX_list: list
A list containing a horizontal axis for each of the sub matrices
which specifies the horizontal position of the respective submatrix
in the overall cost matrix.
axis_y_list: list
A list containing a vertical axis for each of the
sub matrices which specifies the vertical position of the
respective submatrix in the overall cost matrix.
"""
num_matrices = len(dtw_matrices)
size_list = [dtw_mat.shape for dtw_mat in dtw_matrices]
axis_x_list = list()
axis_y_list = list()
x_acc = 0
y_acc = 0
for i in range(num_matrices):
curr_size_list = size_list[i]
axis_x_list.append(np.arange(x_acc, x_acc + curr_size_list[0]))
axis_y_list.append(np.arange(y_acc, y_acc + curr_size_list[1]))
x_acc += curr_size_list[0] - 1
y_acc += curr_size_list[1] - 1
return axis_x_list, axis_y_list
def __visualize_cost_matrices(ax: plt.Axes,
cost_matrices: list = None,
offset_x: float = 0.0,
offset_y: float = 0.0) -> plt.Axes:
"""Visualizes cost matrices
Parameters
----------
ax : axes
The Axes instance to plot on
cost_matrices : list
List of DTW cost matrices.
offset_x : float
Offset on the x axis.
offset_y : float
Offset on the y axis.
Returns
-------
ax: axes
The Axes instance to plot on
"""
x_ax, y_ax = __size_dtw_matrices(dtw_matrices=cost_matrices)
for i, cur_cost in enumerate(cost_matrices[::-1]):
curr_x_ax = x_ax[i] + offset_x
curr_y_ax = y_ax[i] + offset_y
cur_cost = cost_matrices[i]
ax.imshow(cur_cost, cmap='gray_r', aspect='auto', origin='lower',
extent=[curr_y_ax[0], curr_y_ax[-1], curr_x_ax[0], curr_x_ax[-1]])
return ax
def __visualize_path_in_matrix(ax,
wp: np.ndarray = None,
axisX: np.ndarray = None,
axisY: np.ndarray = None,
path_color: str = 'r'):
"""Plots a warping path on top of a given matrix. The matrix is
usually an accumulated cost matrix.
Parameters
----------
ax : axes
The Axes instance to plot on
wp : np.ndarray
Warping path
axisX : np.ndarray
Array of X axis
axisY : np.ndarray
Array of Y axis
path_color : str
Color of the warping path to be plotted. (default: r)
"""
assert axisX is not None and isinstance(axisX, np.ndarray), 'axisX must be a numpy array!'
assert axisY is not None and isinstance(axisY, np.ndarray), 'axisY must be a numpy array!'
wp = wp.astype(int)
ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], '-k', linewidth=5)
ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], color=path_color, linewidth=3)
def __visualize_constraint_rectangles(anchors: np.ndarray,
linestyle: str = '-',
edgecolor: str = 'royalblue',
linewidth: float = 1.0):
for k in range(anchors.shape[1]-1):
a1 = anchors[:, k]
a2 = anchors[:, k + 1]
# a rectangle is defined by [x y width height]
x = a1[0]
y = a1[1]
w = a2[0] - a1[0] + np.finfo(float).eps
h = a2[1] - a1[1] + np.finfo(float).eps
rect = Rectangle((x, y), w, h,
linewidth=linewidth,
edgecolor=edgecolor,
linestyle=linestyle,
facecolor='none')
plt.gca().add_patch(rect)
def project_alignment_on_a_new_feature_rate(alignment: np.ndarray,
feature_rate_old: int,
feature_rate_new: int,
cost_matrix_size_old: tuple = (),
cost_matrix_size_new: tuple = ()) -> np.ndarray:
"""Projects an alignment computed for a cost matrix on a certain
feature resolution on a cost matrix having a different feature
resolution.
Parameters
----------
alignment : np.ndarray [shape=(2, N)]
Alignment matrix
feature_rate_old : int
Feature rate of the old cost matrix
feature_rate_new : int
Feature rate of the new cost matrix
cost_matrix_size_old : tuple
Size of the old cost matrix. Possibly needed to deal with border cases
cost_matrix_size_new : tuple
Size of the new cost matrix. Possibly needed to deal with border cases
Returns
-------
np.ndarray [shape=(2, N)]
Anchor sequence for the new cost matrix
"""
# Project the alignment on the new feature rate
fac = feature_rate_new / feature_rate_old
anchors = np.round(alignment * fac) + 1
# In case the sizes of the cost matrices are given explicitly and the
# alignment specifies to align the first and last elements, handle this case
# separately since this might cause problems in the general projection
# procedure.
if cost_matrix_size_old is not None and cost_matrix_size_new is not None:
if np.array_equal(alignment[:, 0], np.array([0, 0])):
anchors[:, 0] = np.array([1, 1])
if np.array_equal(alignment[:, -1], np.array(cost_matrix_size_old) - 1):
anchors[:, -1] = np.array(cost_matrix_size_new)
return anchors - 1
def derive_anchors_from_projected_alignment(projected_alignment: np.ndarray,
threshold: int) -> np.ndarray:
"""Derive anchors from a projected alignment such that the area of the rectangle
defined by two subsequent anchors a1 and a2 is below a given threshold.
Parameters
----------
projected_alignment : np.ndarray [shape=(2, N)]
Projected alignment array
threshold : int
Maximum area of the constraint rectangle
Returns
-------
anchors_res : np.ndarray [shape=(2, M)]
Resulting anchor sequence
"""
L = projected_alignment.shape[1]
a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1)
a2 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1)
if __compute_area(a1, a2) <= threshold:
anchors_res = np.concatenate([a1, a2], axis=1)
elif L > 2:
center = int(np.floor(L/2 + 1))
a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1)
a2 = np.array(projected_alignment[:, center - 1], copy=True).reshape(-1, 1)
a3 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1)
if __compute_area(a1, a2) > threshold:
anchors_1 = derive_anchors_from_projected_alignment(projected_alignment[:, 0:center], threshold)
else:
anchors_1 = np.concatenate([a1, a2], axis=1)
if __compute_area(a2, a3) > threshold:
anchors_2 = derive_anchors_from_projected_alignment(projected_alignment[:, center - 1:], threshold)
else:
anchors_2 = np.concatenate([a2, a3], axis=1)
anchors_res = np.concatenate([anchors_1, anchors_2[:, 1:]], axis=1)
else:
if __compute_area(a1, a2) > threshold:
print('Only two anchor points are given which do not fulfill the constraint.')
anchors_res = np.concatenate([a1, a2], axis=1)
return anchors_res
def derive_neighboring_anchors(warping_path: np.ndarray,
anchor_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Compute anchor points in the neighborhood of previous anchor points.
Parameters
----------
warping_path : np.ndarray [shape=(2, N)]
Warping path
anchor_indices : np.ndarray
Indices corresponding to the anchor points in the ``warping_path``
Returns
-------
neighboring_anchors : np.ndarray [shape=(2, N-1)]
Sequence of neighboring anchors
neighboring_anchor_indices : np.ndarray
Indices into ``warping path`` corresponding to ``neighboring_anchors``
"""
L = anchor_indices.shape[0]
neighboring_anchor_indices = np.zeros(L-1, dtype=int)
neighboring_anchors = np.zeros((2, L-1), dtype=int)
for k in range(1, L):
i1 = anchor_indices[k-1]
i2 = anchor_indices[k]
neighboring_anchor_indices[k-1] = i1 + np.floor((i2 - i1) / 2)
neighboring_anchors[:, k-1] = warping_path[:, neighboring_anchor_indices[k - 1]]
return neighboring_anchors, neighboring_anchor_indices
@jit(nopython=True)
def __compute_area(a: tuple,
b: tuple):
"""Computes the area between two points, given as tuples"""
return (b[0] - a[0] + 1) * (b[1] - a[1] + 1)
class Transcriber(PitchEstimator):
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160):
super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length)
def transcribe(self, audio, batch_size=128, postprocessing='spotify', include_pitch_bends=True, to_midi=True,
debug=False):
"""
Transcribe an audio file or mono waveform in numpy or torch into MIDI with pitch bends.
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor
:param batch_size: frames to process at once
:param postprocessing: note creation method. 'spotify'(default) or 'tiktok'
:param include_pitch_bends: whether to include pitch bends in the MIDI file
:param to_midi: whether to return a MIDI file or a list of note events (as tuple)
:return: transcribed MIDI file as a pretty_midi.PrettyMIDI object
"""
out = self.predict(audio, batch_size)
if debug:
plt.imshow(out['f0'].T, aspect='auto', origin='lower')
plt.show()
plt.imshow(out['note'].T, aspect='auto', origin='lower')
plt.show()
plt.imshow(out['onset'].T, aspect='auto', origin='lower')
plt.show()
plt.imshow(out['offset'].T, aspect='auto', origin='lower')
plt.show()
if to_midi:
return self.out2midi(out, postprocessing, include_pitch_bends)
else:
return self.out2note(out, postprocessing, include_pitch_bends)
def out2note(self, output: Dict[str, np.array], postprocessing='spotify',
include_pitch_bends: bool = True,
) -> List[Tuple[float, float, int, float, Optional[List[int]]]]:
"""Convert model output to notes
"""
if postprocessing == 'spotify':
estimated_notes = spotify_create_notes(
output["note"],
output["onset"],
note_low=self.labeling.midi_centers[0],
note_high=self.labeling.midi_centers[-1],
onset_thresh=0.5,
frame_thresh=0.3,
infer_onsets=True,
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70
melodia_trick=True,
)
if postprocessing == 'rebab':
estimated_notes = spotify_create_notes(
output["note"],
output["onset"],
note_low=self.labeling.midi_centers[0],
note_high=self.labeling.midi_centers[-1],
onset_thresh=0.2,
frame_thresh=0.2,
infer_onsets=True,
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70
melodia_trick=True,
)
elif postprocessing == 'tiktok':
postprocessor = RegressionPostProcessor(
frames_per_second=self.sr / self.hop_length,
classes_num=self.labeling.midi_centers.shape[0],
begin_note=self.labeling.midi_centers[0],
onset_threshold=0.2,
offset_threshold=0.2,
frame_threshold=0.3,
pedal_offset_threshold=0.5,
)
tiktok_note_dict, _ = postprocessor.output_dict_to_midi_events(output)
estimated_notes = []
for list_item in tiktok_note_dict:
if list_item['offset_time'] > 0.6 + list_item['onset_time']:
estimated_notes.append((int(np.floor(list_item['onset_time']/(output['time'][1]))),
int(np.ceil(list_item['offset_time']/(output['time'][1]))),
list_item['midi_note'], list_item['velocity']/128))
if include_pitch_bends:
estimated_notes_with_pitch_bend = self.get_pitch_bends(output["f0"], estimated_notes)
else:
estimated_notes_with_pitch_bend = [(note[0], note[1], note[2], note[3], None) for note in estimated_notes]
times_s = output['time']
estimated_notes_time_seconds = [
(times_s[note[0]], times_s[note[1]], note[2], note[3], note[4]) for note in estimated_notes_with_pitch_bend
]
return estimated_notes_time_seconds
def out2midi(self, output: Dict[str, np.array], postprocessing: str = 'spotify', include_pitch_bends: bool = True,
) -> PrettyMIDI:
"""Convert model output to MIDI
Args:
output: A dictionary with shape
{
'frame': array of shape (n_times, n_freqs),
'onset': array of shape (n_times, n_freqs),
'contour': array of shape (n_times, 3*n_freqs)
}
representing the output of the basic pitch model.
postprocessing: spotify or tiktok postprocessing.
include_pitch_bends: If True, include pitch bends.
Returns:
note_events: A list of note event tuples (start_time_s, end_time_s, pitch_midi, amplitude)
"""
estimated_notes_time_seconds = self.out2note(output, postprocessing, include_pitch_bends)
midi_tempo = 120 # todo: infer tempo from the onsets
return self.note2midi(estimated_notes_time_seconds, midi_tempo)
def note2midi(
self, note_events_with_pitch_bends: List[Tuple[float, float, int, float, Optional[List[int]]]],
midi_tempo: float = 120,
):
"""Create a pretty_midi object from note events
:param note_events_with_pitch_bends: list of tuples
[(start_time_seconds, end_time_seconds, pitch_midi, amplitude)]
:param midi_tempo: #todo: infer tempo from the onsets
:return: transcribed MIDI file as a pretty_midi.PrettyMIDI object
"""
mid = PrettyMIDI(initial_tempo=midi_tempo)
program = instrument_name_to_program(self.instrument)
instruments: DefaultDict[int, Instrument] = defaultdict(
lambda: Instrument(program=program)
)
for start_time, end_time, note_number, amplitude, pitch_bend in note_events_with_pitch_bends:
instrument = instruments[note_number]
note = Note(
velocity=int(np.round(127 * amplitude)),
pitch=note_number,
start=start_time,
end=end_time,
)
instrument.notes.append(note)
if not isinstance(pitch_bend, np.ndarray):
continue
pitch_bend_times = np.linspace(start_time, end_time, len(pitch_bend))
for pb_time, pb_midi in zip(pitch_bend_times, pitch_bend):
instrument.pitch_bends.append(PitchBend(pb_midi, pb_time))
mid.instruments.extend(instruments.values())
return mid
def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray,
f_chroma2: np.ndarray,
f_onset1: np.ndarray = None,
f_onset2: np.ndarray = None,
input_feature_rate: float = 50,
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
threshold_rec: int = 10000,
win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
verbose: bool = False,
dtw_implementation: str = 'synctoolbox',
normalize_chroma: bool = True,
chroma_norm_ord: int = 2,
chroma_norm_threshold: float = 0.001,
visualization_title: str = "MrMsDTW result",
anchor_pairs: List[Tuple] = None,
linear_inp_idx: List[int] = [],
alpha=0.5) -> np.ndarray:
"""Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
regions defined by the alignment found on the previous, coarser level.
If onset features are provided, these are used on the finest level in addition to chroma
to provide higher synchronization accuracy.
Parameters
----------
f_chroma1 : np.ndarray [shape=(12, N)]
Chroma feature matrix of the first sequence
f_chroma2 : np.ndarray [shape=(12, M)]
Chroma feature matrix of the second sequence
f_onset1 : np.ndarray [shape=(L, N)]
Onset feature matrix of the first sequence (optional, default: None)
f_onset2 : np.ndarray [shape=(L, M)]
Onset feature matrix of the second sequence (optional, default: None)
input_feature_rate: int
Input feature rate of the chroma features (default: 50)
step_sizes: np.ndarray
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
step_weights: np.ndarray
DTW step weights (np.array([1.0, 1.0, 1.0]))
threshold_rec: int
Defines the maximum area that is spanned by the rectangle of two
consecutive elements in the alignment (default: 10000)
win_len_smooth : np.ndarray
Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))
downsamp_smooth : np.ndarray
Downsampling factors (default: np.array([50, 25, 5, 1]))
verbose : bool
Set `True` for visualization (default: False)
dtw_implementation : str
DTW implementation, librosa or synctoolbox (default: synctoolbox)
normalize_chroma : bool
Set `True` to normalize input chroma features after each downsampling
and smoothing operation.
chroma_norm_ord: int
Order of chroma normalization, relevant if ``normalize_chroma`` is True.
(default: 2)
chroma_norm_threshold: float
If the norm falls below threshold for a feature vector, then the
normalized feature vector is set to be the unit vector. Relevant, if
``normalize_chroma`` is True (default: 0.001)
visualization_title : str
Title for the visualization plots. Only relevant if 'verbose' is True
(default: "MrMsDTW result")
anchor_pairs: List[Tuple]
Anchor pairs given in seconds. Note that
* (0, 0) and (<audio-len1>, <audio-len2>) are not allowed.
* Anchors must be monotonously increasing.
linear_inp_idx: List[int]
List of the indices of intervals created by anchor pairs, for which
MrMsDTW shouldn't be run, e.g., if the interval only involves silence.
0 ap1 ap2 ap3
| | | |
| idx0 | idx1 | idx2 | idx3 OR idx-1
| | | |
Note that index -1 corresponds to the last interval, which begins with
the last anchor pair until the end of the audio files.
alpha: float
Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm.
C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5)
Returns
-------
wp : np.ndarray [shape=(2, T)]
Resulting warping path which indicates synchronized indices.
"""
if anchor_pairs is None:
wp = sync_via_mrmsdtw(f_chroma1=f_chroma1,
f_chroma2=f_chroma2,
f_onset1=f_onset1,
f_onset2=f_onset2,
input_feature_rate=input_feature_rate,
step_sizes=step_sizes,
step_weights=step_weights,
threshold_rec=threshold_rec,
win_len_smooth=win_len_smooth,
downsamp_smooth=downsamp_smooth,
verbose=verbose,
dtw_implementation=dtw_implementation,
normalize_chroma=normalize_chroma,
chroma_norm_ord=chroma_norm_ord,
chroma_norm_threshold=chroma_norm_threshold,
visualization_title=visualization_title,
alpha=alpha)
else:
# constant_intervals = [((0, x1), (0, y1), False),
# ((x1, x2), (y1, y2), True),
# ((x2, -1), (y2, -1), False)]
wp = None
if verbose:
print('Anchor points are given!')
__check_anchor_pairs(anchor_pairs, f_chroma1.shape[1], f_chroma2.shape[1], input_feature_rate)
# Add ending as the anchor point
anchor_pairs.append((-1, -1))
prev_a1 = 0
prev_a2 = 0
for idx, anchor_pair in enumerate(anchor_pairs):
cur_a1, cur_a2 = anchor_pair
# Split the features
f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1,
f_onset1,
f_chroma2,
f_onset2,
cur_a1,
cur_a2,
prev_a1,
prev_a2,
input_feature_rate)
if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx:
# Generate a diagonal warping path, if the algorithm is not supposed to executed.
# A typical scenario is the silence breaks which are enclosed by two anchor points.
if verbose:
print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f'
'\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split)
else:
if verbose:
if cur_a1 != -1 and cur_a2 != -1:
print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f'
'\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2))
else:
print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end'
'\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2))
wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split,
f_chroma2=f_chroma2_split,
f_onset1=f_onset1_split,
f_onset2=f_onset2_split,
input_feature_rate=input_feature_rate,
step_sizes=step_sizes,
step_weights=step_weights,
threshold_rec=threshold_rec,
win_len_smooth=win_len_smooth,
downsamp_smooth=downsamp_smooth,
verbose=verbose,
dtw_implementation=dtw_implementation,
normalize_chroma=normalize_chroma,
chroma_norm_ord=chroma_norm_ord,
chroma_norm_threshold=chroma_norm_threshold,
alpha=alpha)
if wp is None:
wp = np.array(wp_cur, copy=True)
# Concatenate warping paths
else:
wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1)
prev_a1 = cur_a1
prev_a2 = cur_a2
anchor_pairs.pop()
return wp
def sync_via_mrmsdtw(f_chroma1: np.ndarray,
f_chroma2: np.ndarray,
f_onset1: np.ndarray = None,
f_onset2: np.ndarray = None,
input_feature_rate: float = 50,
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32),
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
threshold_rec: int = 10000,
win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]),
downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]),
verbose: bool = False,
dtw_implementation: str = 'synctoolbox',
normalize_chroma: bool = True,
chroma_norm_ord: int = 2,
chroma_norm_threshold: float = 0.001,
visualization_title: str = "MrMsDTW result",
alpha=0.5) -> np.ndarray:
"""Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features.
MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint
regions defined by the alignment found on the previous, coarser level.
If onset features are provided, these are used on the finest level in addition to chroma
to provide higher synchronization accuracy.
Parameters
----------
f_chroma1 : np.ndarray [shape=(12, N)]
Chroma feature matrix of the first sequence
f_chroma2 : np.ndarray [shape=(12, M)]
Chroma feature matrix of the second sequence
f_onset1 : np.ndarray [shape=(L, N)]
Onset feature matrix of the first sequence (optional, default: None)
f_onset2 : np.ndarray [shape=(L, M)]
Onset feature matrix of the second sequence (optional, default: None)
input_feature_rate: int
Input feature rate of the chroma features (default: 50)
step_sizes: np.ndarray
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
step_weights: np.ndarray
DTW step weights (np.array([1.0, 1.0, 1.0]))
threshold_rec: int
Defines the maximum area that is spanned by the rectangle of two
consecutive elements in the alignment (default: 10000)
win_len_smooth : np.ndarray
Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1]))
downsamp_smooth : np.ndarray
Downsampling factors (default: np.array([50, 25, 5, 1]))
verbose : bool
Set `True` for visualization (default: False)
dtw_implementation : str
DTW implementation, librosa or synctoolbox (default: synctoolbox)
normalize_chroma : bool
Set `True` to normalize input chroma features after each downsampling
and smoothing operation.
chroma_norm_ord: int
Order of chroma normalization, relevant if ``normalize_chroma`` is True.
(default: 2)
chroma_norm_threshold: float
If the norm falls below threshold for a feature vector, then the
normalized feature vector is set to be the unit vector. Relevant, if
``normalize_chroma`` is True (default: 0.001)
visualization_title : str
Title for the visualization plots. Only relevant if 'verbose' is True
(default: "MrMsDTW result")
alpha: float
Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm.
C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5)
Returns
-------
alignment: np.ndarray [shape=(2, T)]
Resulting warping path which indicates synchronized indices.
"""
# If onset features are given as input, high resolution MrMsDTW is activated.
high_res = False
if f_onset1 is not None and f_onset2 is not None:
high_res = True
if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]):
raise ValueError('Chroma and onset features must be of the same length.')
if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1:
raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.'
'at the last iteration, it is computed at the input feature rate!')
num_iterations = win_len_smooth.shape[0]
cost_matrix_size_old = tuple()
feature_rate_old = input_feature_rate / downsamp_smooth[0]
alignment = None
total_computation_time = 0.0
# If the area is less than the threshold_rec, don't apply the multiscale DTW.
it = (num_iterations - 1) if __compute_area(f_chroma1, f_chroma2) < threshold_rec else 0
while it < num_iterations:
tic1 = perf_counter()
# Smooth and downsample given raw features
f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1,
input_feature_rate=input_feature_rate,
win_len_smooth=win_len_smooth[it],
downsamp_smooth=downsamp_smooth[it])
f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2,
input_feature_rate=input_feature_rate,
win_len_smooth=win_len_smooth[it],
downsamp_smooth=downsamp_smooth[it])
if normalize_chroma:
f_chroma1_cur = normalize_feature(f_chroma1_cur,
norm_ord=chroma_norm_ord,
threshold=chroma_norm_threshold)
f_chroma2_cur = normalize_feature(f_chroma2_cur,
norm_ord=chroma_norm_ord,
threshold=chroma_norm_threshold)
# Project path onto new resolution
cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1])
if alignment is None:
# Initialize the alignment with the start and end frames of the feature sequence
anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]])
else:
projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment,
feature_rate_old=feature_rate_old,
feature_rate_new=feature_rate_new,
cost_matrix_size_old=cost_matrix_size_old,
cost_matrix_size_new=cost_matrix_size_new)
anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment,
threshold=threshold_rec)
# Cost matrix and warping path computation
if high_res and it == num_iterations - 1:
# Compute cost considering chroma and pitch onset features and alignment only in the last iteration,
# where the features are at the finest level.
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
f_onset1=f_onset1,
f_onset2=f_onset2,
anchors=anchors,
alpha=alpha)
else:
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
anchors=anchors,
alpha=alpha)
wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1,
step_sizes=step_sizes,
step_weights=step_weights,
implementation=dtw_implementation)
# Concatenate warping paths
wp = build_path_from_warping_paths(warping_paths=wp_list,
anchors=anchors)
anchors_step1 = None
wp_step1 = None
num_rows_step1 = 0
num_cols_step1 = 0
ax = None
toc1 = perf_counter()
if verbose and cost_matrices_step1 is not None:
anchors_step1 = np.array(anchors, copy=True)
wp_step1 = np.array(wp, copy=True)
num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int),
axis=0)
fig, ax = sync_visualize_step1(cost_matrices_step1,
num_rows_step1,
num_cols_step1,
anchors,
wp)
tic2 = perf_counter()
# Compute neighboring anchors and refine alignment using local path between neighboring anchors
anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors)
# Compute neighboring anchors for refinement
neighboring_anchors, neighboring_anchor_indices = \
derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path)
if neighboring_anchor_indices.shape[0] > 1 \
and it == num_iterations - 1 and high_res:
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
f_onset1=f_onset1,
f_onset2=f_onset2,
anchors=neighboring_anchors,
alpha=alpha)
else:
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur,
f_chroma2=f_chroma2_cur,
anchors=neighboring_anchors,
alpha=alpha)
wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2,
step_sizes=step_sizes,
step_weights=step_weights,
implementation=dtw_implementation)
wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices)
toc2 = perf_counter()
computation_time_it = toc2 - tic2 + toc1 - tic1
total_computation_time += computation_time_it
alignment = wp
feature_rate_old = feature_rate_new
cost_matrix_size_old = cost_matrix_size_new
if verbose and cost_matrices_step2 is not None:
sync_visualize_step2(ax,
cost_matrices_step2,
wp,
wp_step1,
num_rows_step1,
num_cols_step1,
anchors_step1,
neighboring_anchors,
plot_title=f"{visualization_title} - Level {it + 1}")
print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it))
it += 1
if verbose:
print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time))
return alignment
def __diagonal_warping_path(f1: np.ndarray,
f2: np.ndarray) -> np.ndarray:
"""Generates a diagonal warping path given two feature sequences.
Parameters
----------
f1: np.ndarray [shape=(_, N)]
First feature sequence
f2: np.ndarray [shape=(_, M)]
Second feature sequence
Returns
-------
np.ndarray: Diagonal warping path [shape=(2, T)]
"""
max_size = np.maximum(f1.shape[1], f2.shape[1])
min_size = np.minimum(f1.shape[1], f2.shape[1])
if min_size == 1:
return np.array([max_size - 1, 0]).reshape(-1, 1)
elif max_size == f1.shape[1]:
return np.array([np.round(np.linspace(0, max_size - 1, min_size)), np.linspace(0, min_size - 1, min_size)])
else:
return np.array([np.linspace(0, min_size-1, min_size), np.round(np.linspace(0, max_size - 1, min_size))])
@jit(nopython=True)
def __compute_area(f1, f2):
"""Computes the area of the cost matrix given two feature sequences
Parameters
----------
f1: np.ndarray
First feature sequence
f2: np.ndarray
Second feature sequence
Returns
-------
int: Area of the cost matrix
"""
return f1.shape[1] * f2.shape[1]
def __split_features(f_chroma1: np.ndarray,
f_onset1: np.ndarray,
f_chroma2: np.ndarray,
f_onset2: np.ndarray,
cur_a1: float,
cur_a2: float,
prev_a1: float,
prev_a2: float,
feature_rate: int) -> Tuple[np.ndarray, Optional[np.ndarray], np.ndarray, Optional[np.ndarray]]:
if cur_a1 == -1:
f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):]
if f_onset1 is not None:
f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):]
else:
f_onset1_split = None
else:
# Split the features
f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)]
if f_onset1 is not None:
f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)]
else:
f_onset1_split = None
if cur_a2 == -1:
f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):]
if f_onset2 is not None:
f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):]
else:
f_onset2_split = None
else:
f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)]
if f_onset2 is not None:
f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)]
else:
f_onset2_split = None
return f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split
def __refine_wp(wp: np.ndarray,
anchors: np.ndarray,
wp_list_refine: List,
neighboring_anchors: np.ndarray,
neighboring_anchor_indices: np.ndarray) -> np.ndarray:
wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1]
last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile(
wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length)
wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list]
A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)],
axis=1)
wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp,
anchors=A_tmp)
return wp_res
def __check_anchor_pairs(anchor_pairs: List,
f_len1: int,
f_len2: int,
feature_rate: int):
"""Ensures that the anchors satisfy the conditions
Parameters
----------
anchor_pairs: List[Tuple]
List of anchor pairs
f_len1: int
Length of the first feature sequence
f_len2: int
Length of the second feature sequence
feature_rate: int
Input feature rate of the features
"""
prev_a1 = 0
prev_a2 = 0
for anchor_pair in anchor_pairs:
a1, a2 = anchor_pair
if a1 <= 0 or a2 <= 0:
raise ValueError('Starting point must be a positive number!')
if a1 > f_len1 / feature_rate or a2 > f_len2 / feature_rate:
raise ValueError('Anchor points cannot be greater than the length of the input audio files!')
if a1 == f_len1 and a2 == f_len2:
raise ValueError('Both anchor points cannot be equal to the length of the audio files.')
if a1 == prev_a1 and a2 == prev_a2:
raise ValueError('Duplicate anchor pairs are not allowed!')
if a1 < prev_a1 or a2 < prev_a2:
raise ValueError('Anchor points must be monotonously increasing.')
prev_a1 = a1
prev_a2 = a2
class PerformanceLabel:
"""
The dataset labeling class for performance representations. Currently, includes onset, note, and fine-grained f0
representations. Note min, note max, and f0_bin_per_semitone values are to be arranged per instrument. The default
values are for violin performance analysis. Fretted instruments might not require such f0 resolutions per semitone.
"""
def __init__(self, note_min='F#3', note_max='C8', f0_bins_per_semitone=9, f0_smooth_std_c=None,
onset_smooth_std=0.7, f0_tolerance_c=200):
midi_min = note_name_to_number(note_min)
midi_max = note_name_to_number(note_max)
self.midi_centers = np.arange(midi_min, midi_max)
self.onset_smooth_std=onset_smooth_std # onset smoothing along time axis (compensate for alignment)
f0_hz_range = note_to_hz([note_min, note_max])
f0_c_min, f0_c_max = hz2cents(f0_hz_range)
self.f0_granularity_c = 100/f0_bins_per_semitone
if not f0_smooth_std_c:
f0_smooth_std_c = self.f0_granularity_c * 5/4 # Keep the ratio from the CREPE paper (20 cents and 25 cents)
self.f0_smooth_std_c = f0_smooth_std_c
self.f0_centers_c = np.arange(f0_c_min, f0_c_max, self.f0_granularity_c)
self.f0_centers_hz = 10 * 2 ** (self.f0_centers_c / 1200)
self.f0_n_bins = len(self.f0_centers_c)
self.pdf_normalizer = norm.pdf(0)
self.f0_c2hz = lambda c: 10*2**(c/1200)
self.f0_hz2c = hz2cents
self.midi_centers_c = self.f0_hz2c(midi_to_hz(self.midi_centers))
self.f0_tolerance_bins = int(f0_tolerance_c/self.f0_granularity_c)
self.f0_transition_matrix = gaussian_filter1d(np.eye(2*self.f0_tolerance_bins + 1), 25/self.f0_granularity_c)
def f0_c2label(self, pitch_c):
"""
Convert a single f0 value in cents to a one-hot label vector with smoothing (i.e., create a gaussian blur around
the target f0 bin for regularization and training stability. The blur is controlled by self.f0_smooth_std_c
:param pitch_c: a single pitch value in cents
:return: one-hot label vector with frequency blur
"""
result = norm.pdf((self.f0_centers_c - pitch_c) / self.f0_smooth_std_c).astype(np.float32)
result /= self.pdf_normalizer
return result
def f0_label2c(self, salience, center=None):
"""
Convert the salience predictions to monophonic f0 in cents. Only outputs a single f0 value per frame!
:param salience: f0 activations
:param center: f0 center bin to calculate the weighted average. Use argmax if empty
:return: f0 array per frame (in cents).
"""
if salience.ndim == 1:
if center is None:
center = int(np.argmax(salience))
start = max(0, center - 4)
end = min(len(salience), center + 5)
salience = salience[start:end]
product_sum = np.sum(salience * self.f0_centers_c[start:end])
weight_sum = np.sum(salience)
return product_sum / np.clip(weight_sum, 1e-8, None)
if salience.ndim == 2:
return np.array([self.f0_label2c(salience[i, :]) for i in range(salience.shape[0])])
raise Exception("label should be either 1d or 2d ndarray")
def fill_onset_matrix(self, onsets, window, feature_rate):
"""
Create a sparse onset matrix from window and onsets (per-semitone). Apply a gaussian smoothing (along time)
so that we can tolerate better the alignment problems. This is similar to the frequency smoothing for the f0.
The temporal smoothing is controlled by the parameter self.onset_smooth_std
:param onsets: A 2d np.array of individual note onsets with their respective time values
(Nx2: time in seconds - midi number)
:param window: Timestamps for the frame centers of the sparse matrix
:param feature_rate: Window timestamps are integer, this is to convert them to seconds
:return: onset_roll: A sparse matrix filled with temporally blurred onsets.
"""
onsets = self.get_window_feats(onsets, window, feature_rate)
onset_roll = np.zeros((len(window), len(self.midi_centers)))
for onset in onsets:
onset, note = onset # it was a pair with time and midi note
if self.midi_centers[0] < note < self.midi_centers[-1]: # midi note should be in the range defined
note = int(note) - self.midi_centers[0] # find the note index in our range
onset = (onset*feature_rate)-window[0] # onset index (as float but in frames, not in seconds!)
start = max(0, int(onset) - 3)
end = min(len(window) - 1, int(onset) + 3)
try:
vals = norm.pdf(np.linspace(start - onset, end - onset, end - start + 1) / self.onset_smooth_std)
# if you increase 0.7 you smooth the peak
# if you decrease it, e.g., 0.1, it becomes too peaky! around 0.5-0.7 seems ok
vals /= self.pdf_normalizer
onset_roll[start:end + 1, note] += vals
except ValueError:
print('start',start, 'onset', onset, 'end', end)
return onset_roll, onsets
def fill_note_matrix(self, notes, window, feature_rate):
"""
Create the note matrix (piano roll) from window timestamps and note values per frame.
:param notes: A 2d np.array of individual notes with their active time values Nx2
:param window: Timestamps for the frame centers of the output
:param feature_rate: Window timestamps are integer, this is to convert them to seconds
:return note_roll: The piano roll in the defined range of [note_min, note_max).
"""
notes = self.get_window_feats(notes, window, feature_rate)
# take the notes in the midi range defined
notes = notes[np.logical_and(notes[:,1]>=self.midi_centers[0], notes[:,1]<=self.midi_centers[-1]),:]
times = (notes[:,0]*feature_rate - window[0]).astype(int) # in feature samples (fs:self.hop/self.sr)
notes = (notes[:,1] - self.midi_centers[0]).astype(int)
note_roll = np.zeros((len(window), len(self.midi_centers)))
note_roll[(times, notes)] = 1
return note_roll, notes
def fill_f0_matrix(self, f0s, window, feature_rate):
"""
Unlike the labels for onsets and notes, f0 label is only relevant for strictly monophonic regions! Thus, this
function returns a boolean which represents where to apply the given values.
Never back-propagate without the boolean! Empty frames mean that the label is not that reliable.
:param f0s: A 2d np.array of f0 values with the time they belong to (2xN: time in seconds - f0 in Hz)
:param window: Timestamps for the frame centers of the output
:param feature_rate: Window timestamps are integer, this is to convert them to seconds
:return f0_roll: f0 label matrix and
f0_hz: f0 values in Hz
annotation_bool: A boolean array representing which frames have reliable f0 annotations.
"""
f0s = self.get_window_feats(f0s, window, feature_rate)
f0_cents = np.zeros_like(window, dtype=float)
f0s[:,1] = self.f0_hz2c(f0s[:,1]) # convert f0 in hz to cents
annotation_bool = np.zeros_like(window, dtype=bool)
f0_roll = np.zeros((len(window), len(self.f0_centers_c)))
times_in_frame = f0s[:, 0]*feature_rate - window[0]
for t, f0 in enumerate(f0s):
t = times_in_frame[t]
if t%1 < 0.25: # only consider it as annotation if the f0 values is really close to the frame center
t = int(np.round(t))
f0_roll[t] = self.f0_c2label(f0[1])
annotation_bool[t] = True
f0_cents[t] = f0[1]
return f0_roll, f0_cents, annotation_bool
@staticmethod
def get_window_feats(time_feature_matrix, window, feature_rate):
"""
Restrict the feature matrix to the features that are inside the window
:param window: Timestamps for the frame centers of the output
:param time_feature_matrix: A 2d array of Nx2 per the entire file.
:param feature_rate: Window timestamps are integer, this is to convert them to seconds
:return: window_features: the features inside the given window
"""
start = time_feature_matrix[:,0]>(window[0]-0.5)/feature_rate
end = time_feature_matrix[:,0]<(window[-1]+0.5)/feature_rate
window_features = np.logical_and(start, end)
window_features = np.array(time_feature_matrix[window_features,:])
return window_features
def represent_midi(self, midi, feature_rate):
"""
Represent a midi file as sparse matrices of onsets, offsets, and notes. No f0 is included.
:param midi: A midi file (either a path or a pretty_midi.PrettyMIDI object)
:param feature_rate: The feature rate in Hz
:return: dict {onset, offset, note, time}: Same format with the model's learning and outputs
"""
def _get_onsets_offsets_frames(midi_content):
if isinstance(midi_content, str):
midi_content = PrettyMIDI(midi_content)
onsets = []
offsets = []
frames = []
for instrument in midi_content.instruments:
for note in instrument.notes:
start = int(np.round(note.start * feature_rate))
end = int(np.round(note.end * feature_rate))
note_times = (np.arange(start, end+0.5)/feature_rate)[:, np.newaxis]
note_pitch = np.full_like(note_times, fill_value=note.pitch)
onsets.append([note.start, note.pitch])
offsets.append([note.end, note.pitch])
frames.append(np.hstack([note_times, note_pitch]))
onsets = np.vstack(onsets)
offsets = np.vstack(offsets)
frames = np.vstack(frames)
return onsets, offsets, frames, midi_content
onset_array, offset_array, frame_array, midi_object = _get_onsets_offsets_frames(midi)
window = np.arange(frame_array[0, 0]*feature_rate, frame_array[-1, 0]*feature_rate, dtype=int)
onset_roll, _ = self.fill_onset_matrix(onset_array, window, feature_rate)
offset_roll, _ = self.fill_onset_matrix(offset_array, window, feature_rate)
note_roll, _ = self.fill_note_matrix(frame_array, window, feature_rate)
start_anchor = onset_array[onset_array[:, 0]==np.min(onset_array[:, 0])]
end_anchor = offset_array[offset_array[:, 0]==np.max(offset_array[:, 0])]
return {
'midi': midi_object,
'note': note_roll,
'onset': onset_roll,
'offset': offset_roll,
'time': window/feature_rate,
'start_anchor': start_anchor,
'end_anchor': end_anchor
}
class Synchronizer(Transcriber):
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160):
super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length)
def synchronize(self, audio, midi, batch_size=128, include_pitch_bends=True, to_midi=True, debug=False,
include_velocity=False, alignment_padding=50, timing_refinement_range_with_f0s=0):
"""
Synchronize an audio file or mono waveform in numpy or torch with a MIDI file.
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor
:param midi: str, pathlib.Path, or pretty_midi.PrettyMIDI
:param batch_size: frames to process at once
:param include_pitch_bends: whether to include pitch bends in the MIDI file
:param to_midi: whether to return a MIDI file or a list of note events (as tuple)
:param debug: whether to plot the alignment path and compare the alignment with the predicted notes
:param include_velocity: whether to embed the note confidence in place of the velocity in the MIDI file
:param alignment_padding: how many frames to pad the audio and MIDI representations with
:param timing_refinement_range_with_f0s: how many frames to refine the alignment with the f0 confidence
:return: aligned MIDI file as a pretty_midi.PrettyMIDI object
Args:
debug:
to_midi:
include_pitch_bends:
"""
audio = self.predict(audio, batch_size)
notes_and_midi = self.out2sync(audio, midi, include_velocity=include_velocity,
alignment_padding=alignment_padding)
if notes_and_midi: # it might be none
notes, midi = notes_and_midi
if debug:
import pandas as pd
estimated_notes = self.out2note(audio, postprocessing='spotify', include_pitch_bends=True)
est_df = pd.DataFrame(estimated_notes).sort_values(by=0)
note_df = pd.DataFrame(notes).sort_values(by=0)
fig, ax = plt.subplots(figsize=(20, 10))
for row in notes:
t_start = row[0] # sec
t_end = row[1] # sec
freq = row[2] # Hz
ax.hlines(freq, t_start, t_end, color='k', linewidth=3, zorder=2, alpha=0.5)
for row in estimated_notes:
t_start = row[0] # sec
t_end = row[1] # sec
freq = row[2] # Hz
ax.hlines(freq, t_start, t_end, color='r', linewidth=3, zorder=2, alpha=0.5)
fig.suptitle('alignment (black) vs. estimated (red)')
fig.show()
if not include_pitch_bends:
if to_midi:
return midi['midi']
else:
return notes
else:
notes = [(np.argmin(np.abs(audio['time']-note[0])),
np.argmin(np.abs(audio['time']-note[1])),
note[2], note[3]) for note in notes]
notes = self.get_pitch_bends(audio["f0"], notes, timing_refinement_range_with_f0s)
notes = [
(audio['time'][note[0]], audio['time'][note[1]], note[2], note[3], note[4]) for note in
notes
]
if to_midi:
return self.note2midi(notes, 120) #int(midi['midi'].estimate_tempo()))
else:
return notes
def out2sync_old(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False):
"""
Synchronizes the output of the model with the MIDI file.
Args:
out: Model output dictionary
midi: Path to the MIDI file or PrettyMIDI object
include_velocity: Whether to encode the note confidence in place of velocity
alignment_padding: Number of frames to pad the MIDI features with zeros
debug: Visualize the alignment
Returns:
note events and the aligned PrettyMIDI object
"""
midi = self.labeling.represent_midi(midi, self.sr/self.hop_length)
audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length,
pad_length=alignment_padding)
if isinstance(audio_midi_anchors, str):
print(audio_midi_anchors)
return None # the file is corrupted! no possible alignment at all
else:
audio, midi, anchor_pairs = audio_midi_anchors
ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets
wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T,
f_onset1=np.hstack([ALPHA * audio['onset'],
(1 - ALPHA) * audio['offset']]).T,
f_chroma2=midi['note'].T,
f_onset2=np.hstack([ALPHA * midi['onset'],
(1 - ALPHA) * midi['offset']]).T,
input_feature_rate=self.sr/self.hop_length,
step_weights=np.array([1.5, 1.5, 2.0]),
threshold_rec=10 ** 6,
verbose=debug, normalize_chroma=False,
anchor_pairs=anchor_pairs)
wp = make_path_strictly_monotonic(wp).astype(int)
audio_time = np.take(audio['time'], wp[0])
midi_time = np.take(midi['time'], wp[1])
notes = []
for instrument in midi['midi'].instruments:
for note in instrument.notes:
note.start = np.interp(note.start, midi_time, audio_time)
note.end = np.interp(note.end, midi_time, audio_time)
if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames)
note.start = note.start - 0.003
note.end = note.start + 0.012
if include_velocity: # encode the note confidence in place of velocity
velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)):
np.argmin(np.abs(audio['time']-note.end)),
note.pitch-self.labeling.midi_centers[0]])
note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note
else:
velocity = note.velocity/127
notes.append((note.start, note.end, note.pitch, velocity))
return notes, midi
def out2sync(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False):
"""
Synchronizes the output of the model with the MIDI file.
Args:
out: Model output dictionary
midi: Path to the MIDI file or PrettyMIDI object
include_velocity: Whether to encode the note confidence in place of velocity
alignment_padding: Number of frames to pad the MIDI features with zeros
debug: Visualize the alignment
Returns:
note events and the aligned PrettyMIDI object
"""
midi = self.labeling.represent_midi(midi, self.sr/self.hop_length)
audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length,
pad_length=alignment_padding)
if isinstance(audio_midi_anchors, str):
print(audio_midi_anchors)
return None # the file is corrupted! no possible alignment at all
else:
audio, midi, anchor_pairs = audio_midi_anchors
ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets
starts = (np.array(anchor_pairs[0])*self.sr/self.hop_length).astype(int)
ends = (np.array(anchor_pairs[1])*self.sr/self.hop_length).astype(int)
wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T[:, starts[0]:ends[0]],
f_onset1=np.hstack([ALPHA * audio['onset'],
(1 - ALPHA) * audio['offset']]).T[:, starts[0]:ends[0]],
f_chroma2=midi['note'].T[:, starts[1]:ends[1]],
f_onset2=np.hstack([ALPHA * midi['onset'],
(1 - ALPHA) * midi['offset']]).T[:, starts[1]:ends[1]],
input_feature_rate=self.sr/self.hop_length,
step_weights=np.array([1.5, 1.5, 2.0]),
threshold_rec=10 ** 6,
verbose=debug, normalize_chroma=False,
anchor_pairs=None)
wp = make_path_strictly_monotonic(wp).astype(int)
wp[0] += starts[0]
wp[1] += starts[1]
wp = np.hstack((wp, ends[:,np.newaxis]))
audio_time = np.take(audio['time'], wp[0])
midi_time = np.take(midi['time'], wp[1])
notes = []
for instrument in midi['midi'].instruments:
for note in instrument.notes:
note.start = np.interp(note.start, midi_time, audio_time)
note.end = np.interp(note.end, midi_time, audio_time)
if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames)
note.start = note.start - 0.003
note.end = note.start + 0.012
if include_velocity: # encode the note confidence in place of velocity
velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)):
np.argmin(np.abs(audio['time']-note.end)),
note.pitch-self.labeling.midi_centers[0]])
note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note
else:
velocity = note.velocity/127
notes.append((note.start, note.end, note.pitch, velocity))
return notes, midi
@staticmethod
def pad_representations(dict_of_representations, pad_length=10):
"""
Pad the representations so that the DTW does not enforce them to encompass the entire duration.
Args:
dict_of_representations: audio or midi representations
pad_length: how many frames to pad
Returns:
padded representations
"""
for key, value in dict_of_representations.items():
if key == 'time':
padded_time = dict_of_representations[key]
padded_time = np.concatenate([padded_time[:2*pad_length], padded_time+padded_time[2*pad_length]])
dict_of_representations[key] = padded_time - padded_time[pad_length] # this is to ensure that the
# first frame times are negative until the real zero time
elif key in ['onset', 'offset', 'note']:
dict_of_representations[key] = np.pad(value, ((pad_length, pad_length), (0, 0)))
elif key in ['start_anchor', 'end_anchor']:
anchor_time = dict_of_representations[key][0][0]
anchor_time = np.argmin(np.abs(dict_of_representations['time'] - anchor_time))
dict_of_representations[key][:,0] = anchor_time
dict_of_representations[key] = dict_of_representations[key].astype(np.int)
return dict_of_representations
def prepare_for_synchronization(self, audio, midi, feature_rate=44100/256, pad_length=100):
"""
MrMsDTW works better with start and end anchors. This function finds the start and end anchors for audio
based on the midi notes. It also pads the MIDI representations since MIDI files most often start with an active
note and end with an active note. Thus, the DTW will try to align the active notes to the entire duration of the
audio. This is not desirable. Therefore, we pad the MIDI representations with a few frames of silence at the
beginning and end of the audio. This way, the DTW will not try to align the active notes to the entire duration.
Args:
audio:
midi:
feature_rate:
pad_length:
Returns:
"""
# first pad the MIDI
midi = self.pad_representations(midi, pad_length)
# sometimes f0s are more reliable than the notes. So, we use both the f0s and the notes together to find the
# start and end anchors. f0 lookup bins is the number of bins to look around the f0 to assign a note to it.
f0_lookup_bins = int(100//(2*self.labeling.f0_granularity_c))
# find the start anchor for the audio
# first decide on which notes to use for the start anchor (take the entire chord where the MIDI file starts)
anchor_notes = midi['start_anchor'][:, 1] - self.labeling.midi_centers[0]
# now find which f0 bins to look at for the start anchor
anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes]
anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1)
# first start anchor proposals come from the notes
anchor_vals = np.any(audio['note'][:, anchor_notes]>0.5, axis=1)
# now the f0s
anchor_vals_f0 = np.any(audio['f0'][:, anchor_f0s]>0.5, axis=1)
# combine the two
anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0)
if not any(anchor_vals):
return 'corrupted' # do not consider the file if we cannot find the start anchor
audio_start = np.argmax(anchor_vals)
# now the end anchor (most string instruments use chords in cadences: in general the end anchor is polyphonic)
anchor_notes = midi['end_anchor'][:, 1] - self.labeling.midi_centers[0]
anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes]
anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1)
# the same procedure as above
anchor_vals = np.any(audio['note'][::-1, anchor_notes]>0.5, axis=1)
anchor_vals_f0 = np.any(audio['f0'][::-1, anchor_f0s]>0.5, axis=1)
anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0)
if not any(anchor_vals):
return 'corrupted' # do not consider the file if we cannot find the end anchor
audio_end = audio['note'].shape[0] - np.argmax(anchor_vals)
if audio_end - audio_start < (midi['end_anchor'][0][0] - midi['start_anchor'][0][0])/10: # no one plays x10 faster
return 'corrupted' # do not consider the interval between anchors is too short
anchor_pairs = [(audio_start - 5, midi['start_anchor'][0][0] - 5),
(audio_end + 5, midi['end_anchor'][0][0] + 5)]
if anchor_pairs[0][0] < 1:
anchor_pairs[0] = (1, midi['start_anchor'][0][0])
if anchor_pairs[1][0] > audio['note'].shape[0] - 1:
anchor_pairs[1] = (audio['note'].shape[0] - 1, midi['end_anchor'][0][0])
return audio, midi, [(anchor_pairs[0][0]/feature_rate, anchor_pairs[0][1]/feature_rate),
(anchor_pairs[1][0]/feature_rate, anchor_pairs[1][1]/feature_rate)]
class ConvBlock(nn.Module):
def __init__(self, f, w, s, d, in_channels):
super().__init__()
p1 = d*(w - 1) // 2
p2 = d*(w - 1) - p1
self.pad = nn.ZeroPad2d((0, 0, p1, p2))
self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1), dilation=(d, 1))
self.relu = nn.ReLU()
self.bn = nn.BatchNorm2d(f)
self.pool = nn.MaxPool2d(kernel_size=(2, 1))
self.dropout = nn.Dropout(0.25)
def forward(self, x):
x = self.pad(x)
x = self.conv2d(x)
x = self.relu(x)
x = self.bn(x)
x = self.pool(x)
x = self.dropout(x)
return x
class NoPadConvBlock(nn.Module):
def __init__(self, f, w, s, d, in_channels):
super().__init__()
self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1),
dilation=(d, 1))
self.relu = nn.ReLU()
self.bn = nn.BatchNorm2d(f)
self.pool = nn.MaxPool2d(kernel_size=(2, 1))
self.dropout = nn.Dropout(0.25)
def forward(self, x):
x = self.conv2d(x)
x = self.relu(x)
x = self.bn(x)
x = self.pool(x)
x = self.dropout(x)
return x
class TinyPathway(nn.Module):
def __init__(self, dilation=1, hop=256, localize=False,
model_capacity="full", n_layers=6, chunk_size=256):
super().__init__()
capacity_multiplier = {
'tiny': 4, 'small': 8, 'medium': 16, 'large': 24, 'full': 32
}[model_capacity]
self.layers = [1, 2, 3, 4, 5, 6]
self.layers = self.layers[:n_layers]
filters = [n * capacity_multiplier for n in [32, 8, 8, 8, 8, 8]]
filters = [1] + filters
widths = [512, 64, 64, 64, 32, 32]
strides = self.deter_dilations(hop//(4*(2**n_layers)), localize=localize)
strides[0] = strides[0]*4 # apply 4 times more stride at the first layer
dilations = self.deter_dilations(dilation)
for i in range(len(self.layers)):
f, w, s, d, in_channel = filters[i + 1], widths[i], strides[i], dilations[i], filters[i]
self.add_module("conv%d" % i, NoPadConvBlock(f, w, s, d, in_channel))
self.chunk_size = chunk_size
self.input_window, self.hop = self.find_input_size_for_pathway()
self.out_dim = filters[n_layers]
def find_input_size_for_pathway(self):
def find_input_size(output_size, kernel_size, stride, dilation, padding):
num = (stride*(output_size-1)) + 1
input_size = num - 2*padding + dilation*(kernel_size-1)
return input_size
conv_calc, n = {}, 0
for i in self.layers:
layer = self.__getattr__("conv%d" % (i-1))
for mm in layer.modules():
if hasattr(mm, 'kernel_size'):
try:
d = mm.dilation[0]
except TypeError:
d = mm.dilation
conv_calc[n] = [mm.kernel_size[0], mm.stride[0], 0, d]
n += 1
out = self.chunk_size
hop = 1
for n in sorted(conv_calc.keys())[::-1]:
kernel_size_n, stride_n, padding_n, dilation_n = conv_calc[n]
out = find_input_size(out, kernel_size_n, stride_n, dilation_n, padding_n)
hop = hop*stride_n
return out, hop
def deter_dilations(self, total_dilation, localize=False):
n_layers = len(self.layers)
if localize: # e.g., 32*1023 window and 3 layers -> [1, 1, 32]
a = [total_dilation] + [1 for _ in range(n_layers-1)]
else: # e.g., 32*1023 window and 3 layers -> [4, 4, 2]
total_dilation = int(np.log2(total_dilation))
a = []
for layer in range(n_layers):
this_dilation = int(np.ceil(total_dilation/(n_layers-layer)))
a.append(2**this_dilation)
total_dilation = total_dilation - this_dilation
return a[::-1]
def forward(self, x):
x = x.view(x.shape[0], 1, -1, 1)
for i in range(len(self.layers)):
x = self.__getattr__("conv%d" % i)(x)
x = x.permute(0, 3, 2, 1)
return x
#@jit(nopython=True)
def cosine_distance(f1, f2, cos_meas_max=2.0, cos_meas_min=1.0):
"""For all pairs of vectors f1' and f2' in f1 and f2, computes 1 - (f1.f2),
where '.' is the dot product, and rescales the results to lie in the
range [cos_meas_min, cos_meas_max].
Corresponds to regular cosine distance if f1' and f2' are normalized and
cos_meas_min==0.0 and cos_meas_max==1.0."""
return (1 - f1.T @ f2) * (cos_meas_max - cos_meas_min) + cos_meas_min
#@jit(nopython=True)
def euclidean_distance(f1, f2, l2_meas_max=1.0, l2_meas_min=0.0):
"""Computes euclidean distances between the vectors in f1 and f2, and
rescales the results to lie in the range [cos_meas_min, cos_meas_max]."""
#S1 = np.zeros((f1.shape[1], f2.shape[1]))
#for n in range(f2.shape[1]):
# S1[:, n] = np.sqrt(np.sum((f1.T - f2[:, n]) ** 2, axis=1))
S1 = euclidean_distances(f1.T, f2.T)
return S1 * (l2_meas_max - l2_meas_min) + l2_meas_min
def compute_high_res_cost_matrix(f_chroma1: np.ndarray,
f_chroma2: np.ndarray,
f_onset1: np.ndarray,
f_onset2: np.ndarray,
weights: np.ndarray = np.array([1.0, 1.0]),
cos_meas_min: float = 1.0,
cos_meas_max: float = 2.0,
l2_meas_min: float = 0.0,
l2_meas_max: float = 1.0):
"""Computes cost matrix of two sequences using two feature matrices
for each sequence. Cosine distance is used for the chroma sequences and
euclidean distance is used for the DLNCO sequences.
Parameters
----------
f_chroma1 : np.ndarray [shape=(12, N)]
Chroma feature matrix of the first sequence (assumed to be normalized).
f_chroma2 : np.ndarray [shape=(12, M)]
Chroma feature matrix of the second sequence (assumed to be normalized).
f_onset1 : np.ndarray [shape=(12, N)]
DLNCO feature matrix of the first sequence
f_onset2 : np.ndarray [shape=(12, M)]
DLNCO feature matrix of the second sequence
weights : np.ndarray [shape=[2,]]
Weights array for the high-resolution cost computation.
weights[0] * cosine_distance + weights[1] * euclidean_distance
cos_meas_min : float
Cosine distances are shifted to be at least ``cos_meas_min``
cos_meas_max : float
Cosine distances are scaled to be at most ``cos_meas_max``
l2_meas_min : float
Euclidean distances are shifted to be at least ``l2_meas_min``
l2_meas_max : float
Euclidean distances are scaled to be at most ``l2_meas_max``
Returns
-------
C: np.ndarray [shape=(N, M)]
Cost matrix
"""
cos_dis = cosine_distance(f_chroma1, f_chroma2, cos_meas_min=cos_meas_min, cos_meas_max=cos_meas_max)
euc_dis = euclidean_distance(f_onset1, f_onset2, l2_meas_min=l2_meas_min, l2_meas_max=l2_meas_max)
return weights[0] * cos_dis + weights[1] * euc_dis
@jit(nopython=True, cache=True)
def __C_to_DE(C: np.ndarray = None,
dn: np.ndarray = np.array([1, 1, 0], np.int64),
dm: np.ndarray = np.array([1, 0, 1], np.int64),
dw: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
sub_sequence: bool = False) -> tuple[np.ndarray, np.ndarray]:
"""This function computes the accumulated cost matrix D and the step index
matrix E.
Parameters
----------
C : np.ndarray (np.float32 / np.float64) [shape=(N, M)]
Cost matrix
dn : np.ndarray (np.int64) [shape=(1, S)]
Integer array defining valid steps (N direction of C), default: [1, 1, 0]
dm : np.ndarray (np.int64) [shape=(1, S)]
Integer array defining valid steps (M direction of C), default: [1, 0, 1]
dw : np.ndarray (np.float64) [shape=(1, S)]
Double array defining the weight of the each step, default: [1.0, 1.0, 1.0]
sub_sequence : bool
Set `True` for SubSequence DTW, default: False
Returns
-------
D : np.ndarray (np.float64) [shape=(N, M)]
Accumulated cost matrix of type double
E : np.ndarray (np.int64) [shape=(N, M)]
Step index matrix.
E[n, m] holds the index of the step take to determine the value of D[n, m].
If E[n, m] is zero, no valid step was possible.
NaNs in the cost matrix are preserved, invalid fields in the cost matrix are NaNs.
"""
if C is None:
raise ValueError('C must be a 2D numpy array.')
N, M = C.shape
S = dn.size
if S != dm.size or S != dw.size:
raise ValueError('The parameters dn,dm, and dw must be of equal length.')
# calc bounding box size of steps
sbbn = np.max(dn)
sbbm = np.max(dm)
# initialize E
E = np.zeros((N, M), np.int64) - 1
# initialize extended D matrix
D = np.ones((sbbn + N, sbbm + M), np.float64) * np.inf
if sub_sequence:
for m in range(M):
D[sbbn, sbbm + m] = C[0, m]
else:
D[sbbn, sbbm] = C[0, 0]
# accumulate
for m in range(sbbm, M + sbbm):
for n in range(sbbn, N + sbbn):
for s in range(S):
cost = D[n - dn[s], m - dm[s]] + C[n - sbbn, m - sbbm] * dw[s]
if cost < D[n, m]:
D[n, m] = cost
E[n - sbbn, m - sbbm] = s
D = D[sbbn: N + sbbn, sbbm: M + sbbm]
return D, E
@jit(nopython=True, cache=True)
def __E_to_warping_path(E: np.ndarray,
dn: np.ndarray = np.array([1, 1, 0], np.int64),
dm: np.ndarray = np.array([1, 0, 1], np.int64),
sub_sequence: bool = False,
end_index: int = -1) -> np.ndarray:
"""This function computes a warping path based on the provided matrix E
and the allowed steps.
Parameters
----------
E : np.ndarray (np.int64) [shape=(N, M)]
Step index matrix
dn : np.ndarray (np.int64) [shape=(1, S)]
Integer array defining valid steps (N direction of C), default: [1, 1, 0]
dm : np.ndarray (np.int64) [shape=(1, S)]
Integer array defining valid steps (M direction of C), default: [1, 0, 1]
sub_sequence : bool
Set `True` for SubSequence DTW, default: False
end_index : int
In case of SubSequence DTW
Returns
-------
warping_path : np.ndarray (np.int64) [shape=(2, M)]
Resulting optimal warping path
"""
N, M = E.shape
if not sub_sequence and end_index == -1:
end_index = M - 1
m = end_index
n = N - 1
warping_path = np.zeros((2, n + m + 1))
index = 0
def _loop(m, n, index):
warping_path[:, index] = np.array([n, m])
step_index = E[n, m]
m -= dm[step_index]
n -= dn[step_index]
index += 1
return m, n, index
if sub_sequence:
while n > 0:
m, n, index = _loop(m, n, index)
else:
while m > 0 or n > 0:
m, n, index = _loop(m, n, index)
warping_path[:, index] = np.array([n, m])
warping_path = warping_path[:, index::-1]
return warping_path
def compute_warping_path(C: np.ndarray,
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int64),
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64),
implementation: str = 'synctoolbox'):
"""Applies DTW on cost matrix C.
Parameters
----------
C : np.ndarray (np.float32 / np.float64) [shape=(N, M)]
Cost matrix
step_sizes : np.ndarray (np.int64) [shape=(2, S)]
Array of step sizes
step_weights : np.ndarray (np.float64) [shape=(2, S)]
Array of step weights
implementation: str
Choose among ``synctoolbox`` and ``librosa``. (default: ``synctoolbox``)
Returns
-------
D : np.ndarray (np.float64) [shape=(N, M)]
Accumulated cost matrix
E : np.ndarray (np.int64) [shape=(N, M)]
Step index matrix
wp : np.ndarray (np.int64) [shape=(2, M)]
Warping path
"""
if implementation == 'librosa':
D, wp, E = dtw(C=C,
step_sizes_sigma=step_sizes,
weights_add=np.array([0, 0, 0]),
weights_mul=step_weights,
return_steps=True,
subseq=False)
wp = wp[::-1].T
elif implementation == 'synctoolbox':
dn = step_sizes[:, 0]
dm = step_sizes[:, 1]
D, E = __C_to_DE(C,
dn=dn,
dm=dm,
dw=step_weights,
sub_sequence=False)
wp = __E_to_warping_path(E=E,
dn=dn,
dm=dm,
sub_sequence=False)
else:
raise NotImplementedError(f'No implementation found called {implementation}')
return D, E, wp
def compute_warping_paths_from_cost_matrices(cost_matrices: List,
step_sizes: np.array = np.array([[1, 0], [0, 1], [1, 1]], int),
step_weights: np.array = np.array([1.0, 1.0, 1.0], np.float64),
implementation: str = 'synctoolbox') -> List:
"""Computes a path via DTW on each matrix in cost_matrices
Parameters
----------
cost_matrices : list
List of cost matrices
step_sizes : np.ndarray
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]]))
step_weights : np.ndarray
DTW step weights (default: np.array([1.0, 1.0, 1.0]))
implementation : str
Choose among 'synctoolbox' and 'librosa' (default: 'synctoolbox')
Returns
-------
wp_list : list
List of warping paths
"""
return [compute_warping_path(C=C,
step_sizes=step_sizes,
step_weights=step_weights,
implementation=implementation)[2] for C in cost_matrices]
def compute_cost_matrices_between_anchors(f_chroma1: np.ndarray,
f_chroma2: np.ndarray,
anchors: np.ndarray,
f_onset1: np.ndarray = None,
f_onset2: np.ndarray = None,
alpha: float = 0.5) -> List:
"""Computes cost matrices for the given features between subsequent
pairs of anchors points.
Parameters
----------
f_chroma1 : np.ndarray [shape=(12, N)]
Chroma feature matrix of the first sequence
f_chroma2 : np.ndarray [shape=(12, M)]
Chroma feature matrix of the second sequence
anchors : np.ndarray [shape=(2, R)]
Anchor sequence
f_onset1 : np.ndarray [shape=(L, N)]
Onset feature matrix of the first sequence
f_onset2 : np.ndarray [shape=(L, M)]
Onset feature matrix of the second sequence
alpha: float
Alpha parameter to weight the cost functions.
Returns
-------
cost_matrices: list
List containing cost matrices
"""
high_res = False
if f_onset1 is not None and f_onset2 is not None:
high_res = True
cost_matrices = list()
for k in range(anchors.shape[1] - 1):
a1 = np.array(anchors[:, k].astype(int), copy=True)
a2 = np.array(anchors[:, k + 1].astype(int), copy=True)
if high_res:
cost_matrices.append(compute_high_res_cost_matrix(f_chroma1[:, a1[0]: a2[0] + 1],
f_chroma2[:, a1[1]: a2[1] + 1],
f_onset1[:, a1[0]: a2[0] + 1],
f_onset2[:, a1[1]: a2[1] + 1],
weights=np.array([alpha, 1-alpha])))
else:
cost_matrices.append(cosine_distance(f_chroma1[:, a1[0]: a2[0] + 1],
f_chroma2[:, a1[1]: a2[1] + 1]))
return cost_matrices
def build_path_from_warping_paths(warping_paths: List,
anchors: np.ndarray = None) -> np.ndarray:
"""The function builds a path from a given list of warping paths
and the anchors used to obtain these paths. The indices of the original
warping paths are adapted such that they cross the anchors.
Parameters
----------
warping_paths : list
List of warping paths
anchors : np.ndarray [shape=(2, N)]
Anchor sequence
Returns
-------
path : np.ndarray [shape=(2, M)]
Merged path
"""
if anchors is None:
# When no anchor points are given, we can construct them from the
# subpaths in the wp_list
# To do this, we assume that the first path's element is the starting
# anchor
anchors = warping_paths[0][:, 0]
# Retrieve the last element of each path
anchors_tmp = np.zeros(len(warping_paths), np.float32)
for idx, x in enumerate(warping_paths):
anchors_tmp[idx] = x[:, -1]
# Correct indices, such that the indices of the anchors are given on a
# common path. Each anchor a_l = [Nnew_[l+1];Mnew_[l+1]]
# Nnew_[l+1] = N_l + N_[l+1] -1
# Mnew_[l+1] = M_l + M_[l+1] -1
anchors_tmp = np.cumsum(anchors_tmp, axis=1)
anchors_tmp[:, 1:] = anchors_tmp[:, 1:] - [np.arange(1, anchors_tmp.shape[1]),
np.arange(1, anchors_tmp.shape[1])]
anchors = np.concatenate([anchors, anchors_tmp], axis=1)
L = len(warping_paths) + 1
path = None
wp = None
for anchor_idx in range(1, L):
anchor1 = anchors[:, anchor_idx - 1]
anchor2 = anchors[:, anchor_idx]
wp = np.array(warping_paths[anchor_idx - 1], copy=True)
# correct indices in warpingPath
wp += np.repeat(anchor1.reshape(-1, 1), wp.shape[1], axis=1).astype(wp.dtype)
# consistency checks
assert np.array_equal(wp[:, 0], anchor1), 'First entry of warping path does not coincide with anchor point'
assert np.array_equal(wp[:, -1], anchor2), 'Last entry of warping path does not coincide with anchor point'
if path is None:
path = np.array(wp[:, :-1], copy=True)
else:
path = np.concatenate([path, wp[:, :-1]], axis=1)
# append last index of warping path
path = np.concatenate([path, wp[:, -1].reshape(-1, 1)], axis=1)
return path
def find_anchor_indices_in_warping_path(warping_path: np.ndarray,
anchors: np.ndarray) -> np.ndarray:
"""Compute the indices in the warping path that corresponds
to the elements in 'anchors'
Parameters
----------
warping_path : np.ndarray [shape=(2, N)]
Warping path
anchors : np.ndarray [shape=(2, M)]
Anchor sequence
Returns
-------
indices : np.ndarray [shape=(2, M)]
Anchor indices in the ``warping_path``
"""
indices = np.zeros(anchors.shape[1])
for k in range(anchors.shape[1]):
a = anchors[:, k]
indices[k] = np.where((a[0] == warping_path[0, :]) & (a[1] == warping_path[1, :]))[0]
return indices
def make_path_strictly_monotonic(P: np.ndarray) -> np.ndarray:
"""Compute strict alignment path from a warping path
Wrapper around "compute_strict_alignment_path_mask" from libfmp.
Parameters
----------
P: np.ndarray [shape=(2, N)]
Warping path
Returns
-------
P_mod: np.ndarray [shape=(2, M)]
Strict alignment path, M <= N
"""
P_mod = compute_strict_alignment_path_mask(P.T)
return P_mod.T
def compute_strict_alignment_path_mask(P):
"""Compute strict alignment path from a warping path
Notebook: C3/C3S3_MusicAppTempoCurve.ipynb
Args:
P (list or np.ndarray): Wapring path
Returns:
P_mod (list or np.ndarray): Strict alignment path
"""
P = np.array(P, copy=True)
N, M = P[-1]
# Get indices for strict monotonicity
keep_mask = (P[1:, 0] > P[:-1, 0]) & (P[1:, 1] > P[:-1, 1])
# Add first index to enforce start boundary condition
keep_mask = np.concatenate(([True], keep_mask))
# Remove all indices for of last row or column
keep_mask[(P[:, 0] == N) | (P[:, 1] == M)] = False
# Add last index to enforce end boundary condition
keep_mask[-1] = True
P_mod = P[keep_mask, :]
return P_mod
def evaluate_synchronized_positions(ground_truth_positions: np.ndarray,
synchronized_positions: np.ndarray,
tolerances: List = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 250]):
"""Compute standard evaluation measures for evaluating the quality of synchronized (musical) positions.
When synchronizing two versions of a piece of music, one can evaluate the quality of the resulting alignment
by comparing errors at musical positions (e.g. beats or measures) that appear in both versions.
This function implements two measures: mean absolute error at positions and the percentage of correctly transferred
measures given a threshold.
Parameters
----------
ground_truth_positions: np.ndarray [shape=N]
Positions (e.g. beat or measure positions) annotated in the target version of a piece of music, in milliseconds.
synchronized_positions: np.ndarray [shape=N]
The same musical positions as in 'ground_truth_positions' obtained by transfer using music synchronization,
in milliseconds.
tolerances: list of integers
Tolerances (in miliseconds) used for comparing annotated and synchronized positions.
Returns
-------
mean_absolute_error: float
Mean absolute error for synchronized positions, in miliseconds.
accuracy_at_tolerances: list of floats
Percentages of correctly transferred measures, for each entry in 'tolerances'.
"""
absolute_errors_at_positions = np.abs(synchronized_positions - ground_truth_positions)
print('Measure transfer from recording 1 to 2 yielded:')
mean_absolute_error = np.mean(absolute_errors_at_positions)
print('\nMean absolute error (MAE): %.2fms (standard deviation: %.2fms)' % (mean_absolute_error,
np.std(absolute_errors_at_positions)))
print('\nAccuracy of transferred positions at different tolerances:')
print('\t\t\tAccuracy')
print('################################')
accuracy_at_tolerances = []
for tolerance in tolerances:
accuracy = np.mean((absolute_errors_at_positions < tolerance)) * 100.0
accuracy_at_tolerances.append(accuracy)
print('Tolerance: {} ms \t{:.2f} %'.format(tolerance, accuracy))
return mean_absolute_error, accuracy_at_tolerances
def smooth_downsample_feature(f_feature: np.ndarray,
input_feature_rate: float,
win_len_smooth: int = 0,
downsamp_smooth: int = 1) -> Tuple[np.ndarray, float]:
"""Temporal smoothing and downsampling of a feature sequence
Parameters
----------
f_feature : np.ndarray
Input feature sequence, size dxN
input_feature_rate : float
Input feature rate in Hz
win_len_smooth : int
Smoothing window length. For 0, no smoothing is applied.
downsamp_smooth : int
Downsampling factor. For 1, no downsampling is applied.
Returns
-------
f_feature_stat : np.ndarray
Downsampled & smoothed feature.
new_feature_rate : float
New feature rate after downsampling
"""
if win_len_smooth != 0 or downsamp_smooth != 1:
# hack to get the same results as on MATLAB
stat_window = np.hanning(win_len_smooth+2)[1:-1]
stat_window /= np.sum(stat_window)
# upfirdn filters and downsamples each column of f_stat_help
f_feature_stat = upfirdn(h=stat_window, x=f_feature, up=1, down=downsamp_smooth)
seg_num = f_feature.shape[1]
stat_num = int(np.ceil(seg_num / downsamp_smooth))
cut = int(np.floor((win_len_smooth - 1) / (2 * downsamp_smooth)))
f_feature_stat = f_feature_stat[:, cut: stat_num + cut]
else:
f_feature_stat = f_feature
new_feature_rate = input_feature_rate / downsamp_smooth
return f_feature_stat, new_feature_rate
@jit(nopython=True)
def normalize_feature(feature: np.ndarray,
norm_ord: int,
threshold: float) -> np.ndarray:
"""Normalizes a feature sequence according to the l^norm_ord norm.
Parameters
----------
feature : np.ndarray
Input feature sequence of size d x N
d: dimensionality of feature vectors
N: number of feature vectors (time in frames)
norm_ord : int
Norm degree
threshold : float
If the norm falls below threshold for a feature vector, then the
normalized feature vector is set to be the normalized unit vector.
Returns
-------
f_normalized : np.ndarray
Normalized feature sequence
"""
# TODO rewrite in vectorized fashion
d, N = feature.shape
f_normalized = np.zeros((d, N))
# normalize the vectors according to the l^norm_ord norm
unit_vec = np.ones(d)
unit_vec = unit_vec / np.linalg.norm(unit_vec, norm_ord)
for k in range(N):
cur_norm = np.linalg.norm(feature[:, k], norm_ord)
if cur_norm < threshold:
f_normalized[:, k] = unit_vec
else:
f_normalized[:, k] = feature[:, k] / cur_norm
return f_normalized
class FourHeads(Synchronizer):
def __init__(
self,
pathway_multiscale: int = 32,
num_pathway_layers: int = 2,
chunk_size: int = 256,
hop_length: int = 256,
encoder_dim: int = 256,
sr: int = 44100,
num_heads: int = 4,
ffn_dim: int = 128,
num_separator_layers: int = 16,
num_representation_layers: int = 4,
depthwise_conv_kernel_size: int = 31,
dropout: float = 0.25,
use_group_norm: bool = False,
convolution_first: bool = False,
labeling=PerformanceLabel(),
wiring='tiktok'
):
super().__init__(labeling, sr=sr, hop_length=hop_length)
self.main = TinyPathway(dilation=1, hop=hop_length, localize=True,
n_layers=num_pathway_layers, chunk_size=chunk_size)
self.attendant = TinyPathway(dilation=pathway_multiscale, hop=hop_length, localize=False,
n_layers=num_pathway_layers, chunk_size=chunk_size)
assert self.main.hop == self.attendant.hop # they should output with the same sample rate
print('hop in samples:', self.main.hop)
self.input_window = self.attendant.input_window
self.encoder_dim = encoder_dim
self.dropout = nn.Dropout(dropout)
# merge two streams into a conformer input
self.stream_merger = nn.Sequential(self.dropout,
nn.Linear(self.main.out_dim + self.attendant.out_dim, self.encoder_dim))
print('main stream window:', self.main.input_window,
', attendant stream window:', self.attendant.input_window,
', conformer input dim:', self.encoder_dim)
center = ((chunk_size - 1) * self.main.hop) # region labeled with pitch track
main_overlap = self.main.input_window - center
main_overlap = [int(np.floor(main_overlap / 2)), int(np.ceil(main_overlap / 2))]
attendant_overlap = self.attendant.input_window - center
attendant_overlap = [int(np.floor(attendant_overlap / 2)), int(np.ceil(attendant_overlap / 2))]
print('main frame overlap:', main_overlap, ', attendant frame overlap:', attendant_overlap)
main_crop_relative = [attendant_overlap[0] - main_overlap[0], main_overlap[1] - attendant_overlap[1]]
print('crop for main pathway', main_crop_relative)
print("Total sequence duration is", self.attendant.input_window, 'samples')
print('Main stream receptive field for one frame is', (self.main.input_window - center), 'samples')
print('Attendant stream receptive field for one frame is', (self.attendant.input_window - center), 'samples')
self.frame_overlap = attendant_overlap
self.main_stream_crop = main_crop_relative
self.max_window_size = self.attendant.input_window
self.chunk_size = chunk_size
self.separator_stream = nn.ModuleList( # source-separation, reinvented
[
ConformerLayer(
input_dim=self.encoder_dim,
ffn_dim=ffn_dim,
num_attention_heads=num_heads,
depthwise_conv_kernel_size=depthwise_conv_kernel_size,
dropout=dropout,
use_group_norm=use_group_norm,
convolution_first=convolution_first,
)
for _ in range(num_separator_layers)
]
)
self.f0_stream = nn.ModuleList(
[
ConformerLayer(
input_dim=self.encoder_dim,
ffn_dim=ffn_dim,
num_attention_heads=num_heads,
depthwise_conv_kernel_size=depthwise_conv_kernel_size,
dropout=dropout,
use_group_norm=use_group_norm,
convolution_first=convolution_first,
)
for _ in range(num_representation_layers)
]
)
self.f0_head = nn.Linear(self.encoder_dim, len(self.labeling.f0_centers_c))
self.note_stream = nn.ModuleList(
[
ConformerLayer(
input_dim=self.encoder_dim,
ffn_dim=ffn_dim,
num_attention_heads=num_heads,
depthwise_conv_kernel_size=depthwise_conv_kernel_size,
dropout=dropout,
use_group_norm=use_group_norm,
convolution_first=convolution_first,
)
for _ in range(num_representation_layers)
]
)
self.note_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
self.onset_stream = nn.ModuleList(
[
ConformerLayer(
input_dim=self.encoder_dim,
ffn_dim=ffn_dim,
num_attention_heads=num_heads,
depthwise_conv_kernel_size=depthwise_conv_kernel_size,
dropout=dropout,
use_group_norm=use_group_norm,
convolution_first=convolution_first,
)
for _ in range(num_representation_layers)
]
)
self.onset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
self.offset_stream = nn.ModuleList(
[
ConformerLayer(
input_dim=self.encoder_dim,
ffn_dim=ffn_dim,
num_attention_heads=num_heads,
depthwise_conv_kernel_size=depthwise_conv_kernel_size,
dropout=dropout,
use_group_norm=use_group_norm,
convolution_first=convolution_first,
)
for _ in range(num_representation_layers)
]
)
self.offset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers))
self.labeling = labeling
self.double_merger = nn.Sequential(self.dropout, nn.Linear(2 * self.encoder_dim, self.encoder_dim))
self.triple_merger = nn.Sequential(self.dropout, nn.Linear(3 * self.encoder_dim, self.encoder_dim))
self.wiring = wiring
print('Total parameter count: ', self.count_parameters())
def count_parameters(self) -> int:
""" Count parameters of encoder """
return sum([p.numel() for p in self.parameters()])
def stream(self, x, representation, key_padding_mask=None):
for i, layer in enumerate(self.__getattr__('{}_stream'.format(representation))):
x = layer(x, key_padding_mask)
return x
def head(self, x, representation):
return self.__getattr__('{}_head'.format(representation))(x)
def forward(self, x, key_padding_mask=None):
# two auditory streams followed by the separator stream to ensure timbre-awareness
x_attendant = self.attendant(x)
x_main = self.main(x[:, self.main_stream_crop[0]:self.main_stream_crop[1]])
x = self.stream_merger(torch_cat((x_attendant, x_main), -1).squeeze(1))
x = self.stream(x, 'separator', key_padding_mask)
f0 = self.stream(x, 'f0', key_padding_mask) # they say this is a low level feature :)
if self.wiring == 'parallel':
note = self.stream(x, 'note', key_padding_mask)
onset = self.stream(x, 'onset', key_padding_mask)
offset = self.stream(x, 'offset', key_padding_mask)
elif self.wiring == 'tiktok':
onset = self.stream(x, 'onset', key_padding_mask)
offset = self.stream(x, 'offset', key_padding_mask)
# f0 is disconnected, note relies on separator, onset, and offset
note = self.stream(self.triple_merger(torch_cat((x, onset, offset), -1)), 'note', key_padding_mask)
elif self.wiring == 'tiktok2':
onset = self.stream(x, 'onset', key_padding_mask)
offset = self.stream(x, 'offset', key_padding_mask)
# note is connected to f0, onset, and offset
note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask)
elif self.wiring == 'spotify':
# note is connected to f0 only
note = self.stream(f0, 'note', key_padding_mask)
# here onset and onsets are higher-level features informed by the separator and note
onset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'onset', key_padding_mask)
offset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'offset', key_padding_mask)
else:
# onset and offset are connected to f0 and separator streams
onset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'onset', key_padding_mask)
offset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'offset', key_padding_mask)
# note is connected to f0, onset, and offset streams
note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask)
return {'f0': self.head(f0, 'f0'),
'note': self.head(note, 'note'),
'onset': self.head(onset, 'onset'),
'offset': self.head(offset, 'offset')}
class PretrainedModel(FourHeads):
def __init__(self,model_json:dict,model:str,device):
super().__init__(pathway_multiscale=model_json['pathway_multiscale'],num_pathway_layers=model_json['num_pathway_layers'], wiring=model_json['wiring'],hop_length=model_json['hop_length'], chunk_size=model_json['chunk_size'],labeling=PerformanceLabel(note_min=model_json['note_low'], note_max=model_json['note_high'],f0_bins_per_semitone=model_json['f0_bins_per_semitone'],f0_tolerance_c=200,f0_smooth_std_c=model_json['f0_smooth_std_c'], onset_smooth_std=model_json['onset_smooth_std']), sr=model_json['sampling_rate'])
self.load_state_dict(torch_load(model, map_location=device,weights_only=True))
self.eval()
def merge_violin_tracks(self,mid:MidiFile):
new_mid = MidiFile(ticks_per_beat=mid.ticks_per_beat)
new_track = MidiTrack()
new_mid.tracks.append(new_track)
events = []
for track in mid.tracks:
current_time = 0
for msg in track:
current_time += msg.time
events.append((current_time, msg))
events.sort(key=lambda x: x[0])
last_time = 0
for event_time, msg in events:
delta_time = event_time - last_time
new_track.append(msg.copy(time=delta_time))
last_time = event_time
for track in mid.tracks:
for msg in track:
if msg.type == 'set_tempo':
new_track.insert(0, msg)
return new_mid
def transcribe_music(self, audio, batch_size, postprocessing):
self.transcribe(audio, batch_size, postprocessing).write("output.mid")
self.merge_violin_tracks(MidiFile("output.mid")).save("output.mid")
return "output.mid"