from numba import jit import numpy as np import time from typing import List, Tuple, Optional from .anchor import derive_anchors_from_projected_alignment, derive_neighboring_anchors, \ project_alignment_on_a_new_feature_rate from .utils import build_path_from_warping_paths, compute_cost_matrices_between_anchors, smooth_downsample_feature, normalize_feature, compute_warping_paths_from_cost_matrices, find_anchor_indices_in_warping_path from .visualization import sync_visualize_step1, sync_visualize_step2 def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray, f_chroma2: np.ndarray, f_onset1: np.ndarray = None, f_onset2: np.ndarray = None, input_feature_rate: float = 50, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), threshold_rec: int = 10000, win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), verbose: bool = False, dtw_implementation: str = 'synctoolbox', normalize_chroma: bool = True, chroma_norm_ord: int = 2, chroma_norm_threshold: float = 0.001, visualization_title: str = "MrMsDTW result", anchor_pairs: List[Tuple] = None, linear_inp_idx: List[int] = [], alpha=0.5) -> np.ndarray: """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint regions defined by the alignment found on the previous, coarser level. If onset features are provided, these are used on the finest level in addition to chroma to provide higher synchronization accuracy. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence f_onset1 : np.ndarray [shape=(L, N)] Onset feature matrix of the first sequence (optional, default: None) f_onset2 : np.ndarray [shape=(L, M)] Onset feature matrix of the second sequence (optional, default: None) input_feature_rate: int Input feature rate of the chroma features (default: 50) step_sizes: np.ndarray DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) step_weights: np.ndarray DTW step weights (np.array([1.0, 1.0, 1.0])) threshold_rec: int Defines the maximum area that is spanned by the rectangle of two consecutive elements in the alignment (default: 10000) win_len_smooth : np.ndarray Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) downsamp_smooth : np.ndarray Downsampling factors (default: np.array([50, 25, 5, 1])) verbose : bool Set `True` for visualization (default: False) dtw_implementation : str DTW implementation, librosa or synctoolbox (default: synctoolbox) normalize_chroma : bool Set `True` to normalize input chroma features after each downsampling and smoothing operation. chroma_norm_ord: int Order of chroma normalization, relevant if ``normalize_chroma`` is True. (default: 2) chroma_norm_threshold: float If the norm falls below threshold for a feature vector, then the normalized feature vector is set to be the unit vector. Relevant, if ``normalize_chroma`` is True (default: 0.001) visualization_title : str Title for the visualization plots. Only relevant if 'verbose' is True (default: "MrMsDTW result") anchor_pairs: List[Tuple] Anchor pairs given in seconds. Note that * (0, 0) and (, ) are not allowed. * Anchors must be monotonously increasing. linear_inp_idx: List[int] List of the indices of intervals created by anchor pairs, for which MrMsDTW shouldn't be run, e.g., if the interval only involves silence. 0 ap1 ap2 ap3 | | | | | idx0 | idx1 | idx2 | idx3 OR idx-1 | | | | Note that index -1 corresponds to the last interval, which begins with the last anchor pair until the end of the audio files. alpha: float Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) Returns ------- wp : np.ndarray [shape=(2, T)] Resulting warping path which indicates synchronized indices. """ if anchor_pairs is None: wp = sync_via_mrmsdtw(f_chroma1=f_chroma1, f_chroma2=f_chroma2, f_onset1=f_onset1, f_onset2=f_onset2, input_feature_rate=input_feature_rate, step_sizes=step_sizes, step_weights=step_weights, threshold_rec=threshold_rec, win_len_smooth=win_len_smooth, downsamp_smooth=downsamp_smooth, verbose=verbose, dtw_implementation=dtw_implementation, normalize_chroma=normalize_chroma, chroma_norm_ord=chroma_norm_ord, chroma_norm_threshold=chroma_norm_threshold, visualization_title=visualization_title, alpha=alpha) else: # constant_intervals = [((0, x1), (0, y1), False), # ((x1, x2), (y1, y2), True), # ((x2, -1), (y2, -1), False)] wp = None if verbose: print('Anchor points are given!') __check_anchor_pairs(anchor_pairs, f_chroma1.shape[1], f_chroma2.shape[1], input_feature_rate) # Add ending as the anchor point anchor_pairs.append((-1, -1)) prev_a1 = 0 prev_a2 = 0 for idx, anchor_pair in enumerate(anchor_pairs): cur_a1, cur_a2 = anchor_pair # Split the features f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1, f_onset1, f_chroma2, f_onset2, cur_a1, cur_a2, prev_a1, prev_a2, input_feature_rate) if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx: # Generate a diagonal warping path, if the algorithm is not supposed to executed. # A typical scenario is the silence breaks which are enclosed by two anchor points. if verbose: print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f' '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split) else: if verbose: if cur_a1 != -1 and cur_a2 != -1: print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f' '\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) else: print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end' '\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2)) wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split, f_chroma2=f_chroma2_split, f_onset1=f_onset1_split, f_onset2=f_onset2_split, input_feature_rate=input_feature_rate, step_sizes=step_sizes, step_weights=step_weights, threshold_rec=threshold_rec, win_len_smooth=win_len_smooth, downsamp_smooth=downsamp_smooth, verbose=verbose, dtw_implementation=dtw_implementation, normalize_chroma=normalize_chroma, chroma_norm_ord=chroma_norm_ord, chroma_norm_threshold=chroma_norm_threshold, alpha=alpha) if wp is None: wp = np.array(wp_cur, copy=True) # Concatenate warping paths else: wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1) prev_a1 = cur_a1 prev_a2 = cur_a2 anchor_pairs.pop() return wp def sync_via_mrmsdtw(f_chroma1: np.ndarray, f_chroma2: np.ndarray, f_onset1: np.ndarray = None, f_onset2: np.ndarray = None, input_feature_rate: float = 50, step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), threshold_rec: int = 10000, win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), verbose: bool = False, dtw_implementation: str = 'synctoolbox', normalize_chroma: bool = True, chroma_norm_ord: int = 2, chroma_norm_threshold: float = 0.001, visualization_title: str = "MrMsDTW result", alpha=0.5) -> np.ndarray: """Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint regions defined by the alignment found on the previous, coarser level. If onset features are provided, these are used on the finest level in addition to chroma to provide higher synchronization accuracy. Parameters ---------- f_chroma1 : np.ndarray [shape=(12, N)] Chroma feature matrix of the first sequence f_chroma2 : np.ndarray [shape=(12, M)] Chroma feature matrix of the second sequence f_onset1 : np.ndarray [shape=(L, N)] Onset feature matrix of the first sequence (optional, default: None) f_onset2 : np.ndarray [shape=(L, M)] Onset feature matrix of the second sequence (optional, default: None) input_feature_rate: int Input feature rate of the chroma features (default: 50) step_sizes: np.ndarray DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) step_weights: np.ndarray DTW step weights (np.array([1.0, 1.0, 1.0])) threshold_rec: int Defines the maximum area that is spanned by the rectangle of two consecutive elements in the alignment (default: 10000) win_len_smooth : np.ndarray Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) downsamp_smooth : np.ndarray Downsampling factors (default: np.array([50, 25, 5, 1])) verbose : bool Set `True` for visualization (default: False) dtw_implementation : str DTW implementation, librosa or synctoolbox (default: synctoolbox) normalize_chroma : bool Set `True` to normalize input chroma features after each downsampling and smoothing operation. chroma_norm_ord: int Order of chroma normalization, relevant if ``normalize_chroma`` is True. (default: 2) chroma_norm_threshold: float If the norm falls below threshold for a feature vector, then the normalized feature vector is set to be the unit vector. Relevant, if ``normalize_chroma`` is True (default: 0.001) visualization_title : str Title for the visualization plots. Only relevant if 'verbose' is True (default: "MrMsDTW result") alpha: float Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) Returns ------- alignment: np.ndarray [shape=(2, T)] Resulting warping path which indicates synchronized indices. """ # If onset features are given as input, high resolution MrMsDTW is activated. high_res = False if f_onset1 is not None and f_onset2 is not None: high_res = True if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]): raise ValueError('Chroma and onset features must be of the same length.') if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1: raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.' 'at the last iteration, it is computed at the input feature rate!') num_iterations = win_len_smooth.shape[0] cost_matrix_size_old = tuple() feature_rate_old = input_feature_rate / downsamp_smooth[0] alignment = None total_computation_time = 0.0 # If the area is less than the threshold_rec, don't apply the multiscale DTW. it = (num_iterations - 1) if __compute_area(f_chroma1, f_chroma2) < threshold_rec else 0 while it < num_iterations: tic1 = time.perf_counter() # Smooth and downsample given raw features f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1, input_feature_rate=input_feature_rate, win_len_smooth=win_len_smooth[it], downsamp_smooth=downsamp_smooth[it]) f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2, input_feature_rate=input_feature_rate, win_len_smooth=win_len_smooth[it], downsamp_smooth=downsamp_smooth[it]) if normalize_chroma: f_chroma1_cur = normalize_feature(f_chroma1_cur, norm_ord=chroma_norm_ord, threshold=chroma_norm_threshold) f_chroma2_cur = normalize_feature(f_chroma2_cur, norm_ord=chroma_norm_ord, threshold=chroma_norm_threshold) # Project path onto new resolution cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1]) if alignment is None: # Initialize the alignment with the start and end frames of the feature sequence anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]]) else: projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment, feature_rate_old=feature_rate_old, feature_rate_new=feature_rate_new, cost_matrix_size_old=cost_matrix_size_old, cost_matrix_size_new=cost_matrix_size_new) anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment, threshold=threshold_rec) # Cost matrix and warping path computation if high_res and it == num_iterations - 1: # Compute cost considering chroma and pitch onset features and alignment only in the last iteration, # where the features are at the finest level. cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, f_onset1=f_onset1, f_onset2=f_onset2, anchors=anchors, alpha=alpha) else: cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, anchors=anchors, alpha=alpha) wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1, step_sizes=step_sizes, step_weights=step_weights, implementation=dtw_implementation) # Concatenate warping paths wp = build_path_from_warping_paths(warping_paths=wp_list, anchors=anchors) anchors_step1 = None wp_step1 = None num_rows_step1 = 0 num_cols_step1 = 0 ax = None toc1 = time.perf_counter() if verbose and cost_matrices_step1 is not None: anchors_step1 = np.array(anchors, copy=True) wp_step1 = np.array(wp, copy=True) num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int), axis=0) fig, ax = sync_visualize_step1(cost_matrices_step1, num_rows_step1, num_cols_step1, anchors, wp) tic2 = time.perf_counter() # Compute neighboring anchors and refine alignment using local path between neighboring anchors anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors) # Compute neighboring anchors for refinement neighboring_anchors, neighboring_anchor_indices = \ derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path) if neighboring_anchor_indices.shape[0] > 1 \ and it == num_iterations - 1 and high_res: cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, f_onset1=f_onset1, f_onset2=f_onset2, anchors=neighboring_anchors, alpha=alpha) else: cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, f_chroma2=f_chroma2_cur, anchors=neighboring_anchors, alpha=alpha) wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2, step_sizes=step_sizes, step_weights=step_weights, implementation=dtw_implementation) wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices) toc2 = time.perf_counter() computation_time_it = toc2 - tic2 + toc1 - tic1 total_computation_time += computation_time_it alignment = wp feature_rate_old = feature_rate_new cost_matrix_size_old = cost_matrix_size_new if verbose and cost_matrices_step2 is not None: sync_visualize_step2(ax, cost_matrices_step2, wp, wp_step1, num_rows_step1, num_cols_step1, anchors_step1, neighboring_anchors, plot_title=f"{visualization_title} - Level {it + 1}") print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it)) it += 1 if verbose: print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time)) return alignment def __diagonal_warping_path(f1: np.ndarray, f2: np.ndarray) -> np.ndarray: """Generates a diagonal warping path given two feature sequences. Parameters ---------- f1: np.ndarray [shape=(_, N)] First feature sequence f2: np.ndarray [shape=(_, M)] Second feature sequence Returns ------- np.ndarray: Diagonal warping path [shape=(2, T)] """ max_size = np.maximum(f1.shape[1], f2.shape[1]) min_size = np.minimum(f1.shape[1], f2.shape[1]) if min_size == 1: return np.array([max_size - 1, 0]).reshape(-1, 1) elif max_size == f1.shape[1]: return np.array([np.round(np.linspace(0, max_size - 1, min_size)), np.linspace(0, min_size - 1, min_size)]) else: return np.array([np.linspace(0, min_size-1, min_size), np.round(np.linspace(0, max_size - 1, min_size))]) @jit(nopython=True) def __compute_area(f1, f2): """Computes the area of the cost matrix given two feature sequences Parameters ---------- f1: np.ndarray First feature sequence f2: np.ndarray Second feature sequence Returns ------- int: Area of the cost matrix """ return f1.shape[1] * f2.shape[1] def __split_features(f_chroma1: np.ndarray, f_onset1: np.ndarray, f_chroma2: np.ndarray, f_onset2: np.ndarray, cur_a1: float, cur_a2: float, prev_a1: float, prev_a2: float, feature_rate: int) -> Tuple[np.ndarray, Optional[np.ndarray], np.ndarray, Optional[np.ndarray]]: if cur_a1 == -1: f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):] if f_onset1 is not None: f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):] else: f_onset1_split = None else: # Split the features f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] if f_onset1 is not None: f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] else: f_onset1_split = None if cur_a2 == -1: f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):] if f_onset2 is not None: f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):] else: f_onset2_split = None else: f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] if f_onset2 is not None: f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] else: f_onset2_split = None return f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split def __refine_wp(wp: np.ndarray, anchors: np.ndarray, wp_list_refine: List, neighboring_anchors: np.ndarray, neighboring_anchor_indices: np.ndarray) -> np.ndarray: wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1] last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile( wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length) wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list] A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)], axis=1) wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp, anchors=A_tmp) return wp_res def __check_anchor_pairs(anchor_pairs: List, f_len1: int, f_len2: int, feature_rate: int): """Ensures that the anchors satisfy the conditions Parameters ---------- anchor_pairs: List[Tuple] List of anchor pairs f_len1: int Length of the first feature sequence f_len2: int Length of the second feature sequence feature_rate: int Input feature rate of the features """ prev_a1 = 0 prev_a2 = 0 for anchor_pair in anchor_pairs: a1, a2 = anchor_pair if a1 <= 0 or a2 <= 0: raise ValueError('Starting point must be a positive number!') if a1 > f_len1 / feature_rate or a2 > f_len2 / feature_rate: raise ValueError('Anchor points cannot be greater than the length of the input audio files!') if a1 == f_len1 and a2 == f_len2: raise ValueError('Both anchor points cannot be equal to the length of the audio files.') if a1 == prev_a1 and a2 == prev_a2: raise ValueError('Duplicate anchor pairs are not allowed!') if a1 < prev_a1 or a2 < prev_a2: raise ValueError('Anchor points must be monotonously increasing.') prev_a1 = a1 prev_a2 = a2