import librosa import soundfile as sf import glob import os import copy import sys import numpy as np import pyrubberband as pyrb import pretty_midi from omegaconf import OmegaConf from tqdm.auto import tqdm from synctoolbox.dtw.mrmsdtw import sync_via_mrmsdtw from synctoolbox.dtw.utils import ( compute_optimal_chroma_shift, shift_chroma_vectors, make_path_strictly_monotonic, ) from synctoolbox.feature.chroma import ( pitch_to_chroma, quantize_chroma, quantized_chroma_to_CENS, ) from synctoolbox.feature.dlnco import pitch_onset_features_to_DLNCO from synctoolbox.feature.pitch import audio_to_pitch_features from synctoolbox.feature.pitch_onset import audio_to_pitch_onset_features from synctoolbox.feature.utils import estimate_tuning sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) print(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.dsp import normalize, get_stereo from midiaudiopair import MidiAudioPair Fs = 22050 feature_rate = 50 step_weights = np.array([1.5, 1.5, 2.0]) threshold_rec = 10 ** 6 def save_delayed_song( sample, dry_run, ): import warnings warnings.filterwarnings(action="ignore") song_audio, _ = librosa.load(sample.original_song, Fs) midi_pm = pretty_midi.PrettyMIDI(sample.original_midi) if np.power(song_audio, 2).sum() < 1: # low energy: invalid file print("invalid audio :", sample.original_song) sample.delete_files_myself() return rd = get_aligned_results(midi_pm=midi_pm, song_audio=song_audio) mix_song = rd["mix_song"] song_pitch_shifted = rd["song_pitch_shifted"] midi_warped_pm = rd["midi_warped_pm"] pitch_shift_for_song_audio = rd["pitch_shift_for_song_audio"] tuning_offset_song = rd["tuning_offset_song"] tuning_offset_piano = rd["tuning_offset_piano"] try: if dry_run: print("write audio files: ", sample.song) else: sf.write( file=sample.song, data=song_pitch_shifted, samplerate=Fs, format="wav", ) except: print("Fail : ", sample.song) try: if dry_run: print("write warped midi :", sample.midi) else: midi_warped_pm.write(sample.midi) except: midi_warped_pm._tick_scales = midi_pm._tick_scales try: if dry_run: print("write warped midi2 :", sample.midi) else: midi_warped_pm.write(sample.midi) except: print("ad-hoc failed midi : ", sample.midi) print("ad-hoc midi : ", sample.midi) sample.yaml.song.pitch_shift = pitch_shift_for_song_audio.item() sample.yaml.song.tuning_offset = tuning_offset_song.item() sample.yaml.piano.tuning_offset = tuning_offset_piano.item() OmegaConf.save(sample.yaml, sample.yaml_path) def get_aligned_results(midi_pm, song_audio): piano_audio = midi_pm.fluidsynth(Fs) song_audio = normalize(song_audio) # The reason for estimating tuning :: # https://www.audiolabs-erlangen.de/resources/MIR/FMP/C3/C3S1_TranspositionTuning.html tuning_offset_1 = estimate_tuning(song_audio, Fs) tuning_offset_2 = estimate_tuning(piano_audio, Fs) # DLNCO features (Sebastian Ewert, Meinard Müller, and Peter Grosche: High Resolution Audio Synchronization Using Chroma Onset Features, In Proceedings of IEEE International Conference on Acoustics, Speech, and Signal Processing (ICASSP): 1869–1872, 2009.): # helpful to increase synchronization accuracy, especially for music with clear onsets. # Quantized and smoothed chroma : CENS features # Because, MrMsDTW Requires CENS. f_chroma_quantized_1, f_DLNCO_1 = get_features_from_audio( song_audio, tuning_offset_1 ) f_chroma_quantized_2, f_DLNCO_2 = get_features_from_audio( piano_audio, tuning_offset_2 ) # Shift chroma vectors : # Otherwise, different keys of two audio leads to degradation of alignment. opt_chroma_shift = compute_optimal_chroma_shift( quantized_chroma_to_CENS(f_chroma_quantized_1, 201, 50, feature_rate)[0], quantized_chroma_to_CENS(f_chroma_quantized_2, 201, 50, feature_rate)[0], ) f_chroma_quantized_2 = shift_chroma_vectors(f_chroma_quantized_2, opt_chroma_shift) f_DLNCO_2 = shift_chroma_vectors(f_DLNCO_2, opt_chroma_shift) wp = sync_via_mrmsdtw( f_chroma1=f_chroma_quantized_1, f_onset1=f_DLNCO_1, f_chroma2=f_chroma_quantized_2, f_onset2=f_DLNCO_2, input_feature_rate=feature_rate, step_weights=step_weights, threshold_rec=threshold_rec, verbose=False, ) wp = make_path_strictly_monotonic(wp) pitch_shift_for_song_audio = -opt_chroma_shift % 12 if pitch_shift_for_song_audio > 6: pitch_shift_for_song_audio -= 12 if pitch_shift_for_song_audio != 0: song_audio_shifted = pyrb.pitch_shift( song_audio, Fs, pitch_shift_for_song_audio ) else: song_audio_shifted = song_audio time_map_second = wp / feature_rate midi_pm_warped = copy.deepcopy(midi_pm) midi_pm_warped = simple_adjust_times( midi_pm_warped, time_map_second[1], time_map_second[0] ) piano_audio_warped = midi_pm_warped.fluidsynth(Fs) song_audio_shifted = normalize(song_audio_shifted) stereo_sonification_piano = get_stereo(song_audio_shifted, piano_audio_warped) rd = dict( mix_song=stereo_sonification_piano, song_pitch_shifted=song_audio_shifted, midi_warped_pm=midi_pm_warped, pitch_shift_for_song_audio=pitch_shift_for_song_audio, tuning_offset_song=tuning_offset_1, tuning_offset_piano=tuning_offset_2, ) return rd def simple_adjust_times(pm, original_times, new_times): """ most of these codes are from original pretty_midi https://github.com/craffel/pretty-midi/blob/main/pretty_midi/pretty_midi.py """ for instrument in pm.instruments: instrument.notes = [ copy.deepcopy(note) for note in instrument.notes if note.start >= original_times[0] and note.end <= original_times[-1] ] # Get array of note-on locations and correct them note_ons = np.array( [note.start for instrument in pm.instruments for note in instrument.notes] ) adjusted_note_ons = np.interp(note_ons, original_times, new_times) # Same for note-offs note_offs = np.array( [note.end for instrument in pm.instruments for note in instrument.notes] ) adjusted_note_offs = np.interp(note_offs, original_times, new_times) # Correct notes for n, note in enumerate( [note for instrument in pm.instruments for note in instrument.notes] ): note.start = (adjusted_note_ons[n] > 0) * adjusted_note_ons[n] note.end = (adjusted_note_offs[n] > 0) * adjusted_note_offs[n] # After performing alignment, some notes may have an end time which is # on or before the start time. Remove these! pm.remove_invalid_notes() def adjust_events(event_getter): """This function calls event_getter with each instrument as the sole argument and adjusts the events which are returned.""" # Sort the events by time for instrument in pm.instruments: event_getter(instrument).sort(key=lambda e: e.time) # Correct the events by interpolating event_times = np.array( [ event.time for instrument in pm.instruments for event in event_getter(instrument) ] ) adjusted_event_times = np.interp(event_times, original_times, new_times) for n, event in enumerate( [ event for instrument in pm.instruments for event in event_getter(instrument) ] ): event.time = adjusted_event_times[n] for instrument in pm.instruments: # We want to keep only the final event which has time == # new_times[0] valid_events = [ event for event in event_getter(instrument) if event.time == new_times[0] ] if valid_events: valid_events = valid_events[-1:] # Otherwise only keep events within the new set of times valid_events.extend( event for event in event_getter(instrument) if event.time > new_times[0] and event.time < new_times[-1] ) event_getter(instrument)[:] = valid_events # Correct pitch bends and control changes adjust_events(lambda i: i.pitch_bends) adjust_events(lambda i: i.control_changes) return pm def get_features_from_audio(audio, tuning_offset, visualize=False): f_pitch = audio_to_pitch_features( f_audio=audio, Fs=Fs, tuning_offset=tuning_offset, feature_rate=feature_rate, verbose=visualize, ) f_chroma = pitch_to_chroma(f_pitch=f_pitch) f_chroma_quantized = quantize_chroma(f_chroma=f_chroma) f_pitch_onset = audio_to_pitch_onset_features( f_audio=audio, Fs=Fs, tuning_offset=tuning_offset, verbose=visualize ) f_DLNCO = pitch_onset_features_to_DLNCO( f_peaks=f_pitch_onset, feature_rate=feature_rate, feature_sequence_length=f_chroma_quantized.shape[1], visualize=visualize, ) return f_chroma_quantized, f_DLNCO def main(samples, dry_run): import multiprocessing from joblib import Parallel, delayed Parallel(n_jobs=multiprocessing.cpu_count() // 2)( delayed(save_delayed_song)(sample=sample, dry_run=dry_run) for sample in tqdm(samples) ) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="piano cover downloader") parser.add_argument( "data_dir", type=str, default=None, help="""directory contains {id}/{song_filename.wav} """, ) parser.add_argument( "--dry_run", default=False, action="store_true", help="whether dry_run" ) args = parser.parse_args() def getfiles(): meta_files = sorted(glob.glob(args.data_dir + "/*.yaml")) print("meta ", len(meta_files)) samples = list() for meta_file in tqdm(meta_files): m = MidiAudioPair(meta_file, auto_remove_no_song=True) if m.error_code != MidiAudioPair.NO_SONG: aux_txt = os.path.join( m.audio_dir, m.yaml.piano.ytid, f"{m.yaml.piano.title[:50]}___{m.yaml.song.title[:50]}.txt", ) with open(aux_txt, "w") as f: f.write(".") samples.append(m) print(f"files available {len(samples)}") return samples samples = getfiles() main(samples=samples, dry_run=args.dry_run)