Spaces:
Build error
Build error
import functools | |
import numpy as np | |
import tensorflow.compat.v2 as tf | |
import functools | |
import gin | |
import jax | |
import seqio | |
import t5 | |
import t5x | |
from mt3 import metrics_utils | |
from mt3 import models | |
from mt3 import network | |
from mt3 import note_sequences | |
from mt3 import preprocessors | |
from mt3 import spectrograms | |
from mt3 import vocabularies | |
class InferenceModel(object): | |
"""Wrapper of T5X model for music transcription.""" | |
def __init__(self, checkpoint_path, model_type='mt3'): | |
# Model Constants. | |
if model_type == 'ismir2021': | |
num_velocity_bins = 127 | |
self.encoding_spec = note_sequences.NoteEncodingSpec | |
self.inputs_length = 512 | |
elif model_type == 'mt3': | |
num_velocity_bins = 1 | |
self.encoding_spec = note_sequences.NoteEncodingWithTiesSpec | |
self.inputs_length = 256 | |
else: | |
raise ValueError('unknown model_type: %s' % model_type) | |
gin_files = ['/home/user/app/mt3/gin/model.gin', | |
'/home/user/app/mt3/gin/mt3.gin'] | |
self.batch_size = 8 | |
self.outputs_length = 1024 | |
self.sequence_length = {'inputs': self.inputs_length, | |
'targets': self.outputs_length} | |
self.partitioner = t5x.partitioning.PjitPartitioner( | |
model_parallel_submesh=(1, 1, 1, 1), num_partitions=1) | |
# Build Codecs and Vocabularies. | |
self.spectrogram_config = spectrograms.SpectrogramConfig() | |
self.codec = vocabularies.build_codec( | |
vocab_config=vocabularies.VocabularyConfig( | |
num_velocity_bins=num_velocity_bins)) | |
self.vocabulary = vocabularies.vocabulary_from_codec(self.codec) | |
self.output_features = { | |
'inputs': seqio.ContinuousFeature(dtype=tf.float32, rank=2), | |
'targets': seqio.Feature(vocabulary=self.vocabulary), | |
} | |
# Create a T5X model. | |
self._parse_gin(gin_files) | |
self.model = self._load_model() | |
# Restore from checkpoint. | |
self.restore_from_checkpoint(checkpoint_path) | |
def input_shapes(self): | |
return { | |
'encoder_input_tokens': (self.batch_size, self.inputs_length), | |
'decoder_input_tokens': (self.batch_size, self.outputs_length) | |
} | |
def _parse_gin(self, gin_files): | |
"""Parse gin files used to train the model.""" | |
gin_bindings = [ | |
'from __gin__ import dynamic_registration', | |
'from mt3 import vocabularies', | |
'VOCAB_CONFIG=@vocabularies.VocabularyConfig()', | |
'vocabularies.VocabularyConfig.num_velocity_bins=%NUM_VELOCITY_BINS' | |
] | |
with gin.unlock_config(): | |
gin.parse_config_files_and_bindings( | |
gin_files, gin_bindings, finalize_config=False) | |
def _load_model(self): | |
"""Load up a T5X `Model` after parsing training gin config.""" | |
model_config = gin.get_configurable(network.T5Config)() | |
module = network.Transformer(config=model_config) | |
return models.ContinuousInputsEncoderDecoderModel( | |
module=module, | |
input_vocabulary=self.output_features['inputs'].vocabulary, | |
output_vocabulary=self.output_features['targets'].vocabulary, | |
optimizer_def=t5x.adafactor.Adafactor(decay_rate=0.8, step_offset=0), | |
input_depth=spectrograms.input_depth(self.spectrogram_config)) | |
def restore_from_checkpoint(self, checkpoint_path): | |
"""Restore training state from checkpoint, resets self._predict_fn().""" | |
train_state_initializer = t5x.utils.TrainStateInitializer( | |
optimizer_def=self.model.optimizer_def, | |
init_fn=self.model.get_initial_variables, | |
input_shapes=self.input_shapes, | |
partitioner=self.partitioner) | |
restore_checkpoint_cfg = t5x.utils.RestoreCheckpointConfig( | |
path=checkpoint_path, mode='specific', dtype='float32') | |
train_state_axes = train_state_initializer.train_state_axes | |
self._predict_fn = self._get_predict_fn(train_state_axes) | |
self._train_state = train_state_initializer.from_checkpoint_or_scratch( | |
[restore_checkpoint_cfg], init_rng=jax.random.PRNGKey(0)) | |
def _get_predict_fn(self, train_state_axes): | |
"""Generate a partitioned prediction function for decoding.""" | |
def partial_predict_fn(params, batch, decode_rng): | |
return self.model.predict_batch_with_aux( | |
params, batch, decoder_params={'decode_rng': None}) | |
return self.partitioner.partition( | |
partial_predict_fn, | |
in_axis_resources=( | |
train_state_axes.params, | |
t5x.partitioning.PartitionSpec('data',), None), | |
out_axis_resources=t5x.partitioning.PartitionSpec('data',) | |
) | |
def predict_tokens(self, batch, seed=0): | |
"""Predict tokens from preprocessed dataset batch.""" | |
prediction, _ = self._predict_fn( | |
self._train_state.params, batch, jax.random.PRNGKey(seed)) | |
return self.vocabulary.decode_tf(prediction).numpy() | |
def __call__(self, audio): | |
"""Infer note sequence from audio samples. | |
Args: | |
audio: 1-d numpy array of audio samples (16kHz) for a single example. | |
Returns: | |
A note_sequence of the transcribed audio. | |
""" | |
ds = self.audio_to_dataset(audio) | |
ds = self.preprocess(ds) | |
model_ds = self.model.FEATURE_CONVERTER_CLS(pack=False)( | |
ds, task_feature_lengths=self.sequence_length) | |
model_ds = model_ds.batch(self.batch_size) | |
inferences = (tokens for batch in model_ds.as_numpy_iterator() | |
for tokens in self.predict_tokens(batch)) | |
predictions = [] | |
for example, tokens in zip(ds.as_numpy_iterator(), inferences): | |
predictions.append(self.postprocess(tokens, example)) | |
result = metrics_utils.event_predictions_to_ns( | |
predictions, codec=self.codec, encoding_spec=self.encoding_spec) | |
return result['est_ns'] | |
def audio_to_dataset(self, audio): | |
"""Create a TF Dataset of spectrograms from input audio.""" | |
frames, frame_times = self._audio_to_frames(audio) | |
return tf.data.Dataset.from_tensors({ | |
'inputs': frames, | |
'input_times': frame_times, | |
}) | |
def _audio_to_frames(self, audio): | |
"""Compute spectrogram frames from audio.""" | |
frame_size = self.spectrogram_config.hop_width | |
padding = [0, frame_size - len(audio) % frame_size] | |
audio = np.pad(audio, padding, mode='constant') | |
frames = spectrograms.split_audio(audio, self.spectrogram_config) | |
num_frames = len(audio) // frame_size | |
times = np.arange(num_frames) / self.spectrogram_config.frames_per_second | |
return frames, times | |
def preprocess(self, ds): | |
pp_chain = [ | |
functools.partial( | |
t5.data.preprocessors.split_tokens_to_inputs_length, | |
sequence_length=self.sequence_length, | |
output_features=self.output_features, | |
feature_key='inputs', | |
additional_feature_keys=['input_times']), | |
# Cache occurs here during training. | |
preprocessors.add_dummy_targets, | |
functools.partial( | |
preprocessors.compute_spectrograms, | |
spectrogram_config=self.spectrogram_config) | |
] | |
for pp in pp_chain: | |
ds = pp(ds) | |
return ds | |
def postprocess(self, tokens, example): | |
tokens = self._trim_eos(tokens) | |
start_time = example['input_times'][0] | |
# Round down to nearest symbolic token step. | |
start_time -= start_time % (1 / self.codec.steps_per_second) | |
return { | |
'est_tokens': tokens, | |
'start_time': start_time, | |
# Internal MT3 code expects raw inputs, not used here. | |
'raw_inputs': [] | |
} | |
def _trim_eos(tokens): | |
tokens = np.array(tokens, np.int32) | |
if vocabularies.DECODED_EOS_ID in tokens: | |
tokens = tokens[:np.argmax(tokens == vocabularies.DECODED_EOS_ID)] | |
return tokens |