|
import torch |
|
from speechbrain.inference.interfaces import Pretrained |
|
import librosa |
|
import numpy as np |
|
|
|
|
|
class ASR(Pretrained): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
def encode_batch(self, device, wavs, wav_lens=None, normalize=False): |
|
wavs = wavs.to(device) |
|
wav_lens = wav_lens.to(device) |
|
|
|
|
|
encoded_outputs = self.mods.encoder_w2v2(wavs.detach()) |
|
|
|
tokens_bos = torch.zeros((wavs.size(0), 1), dtype=torch.long).to(device) |
|
embedded_tokens = self.mods.embedding(tokens_bos) |
|
decoder_outputs, _ = self.mods.decoder(embedded_tokens, encoded_outputs, wav_lens) |
|
|
|
|
|
predictions = self.hparams.test_search(encoded_outputs, wav_lens)[0] |
|
|
|
predicted_words = [] |
|
for prediction in predictions: |
|
prediction = [token for token in prediction if token != 0] |
|
predicted_words.append(self.hparams.tokenizer.decode_ids(prediction).split(" ")) |
|
prediction = [] |
|
for sent in predicted_words: |
|
sent = self.filter_repetitions(sent, 3) |
|
prediction.append(sent) |
|
predicted_words = prediction |
|
return predicted_words |
|
|
|
def filter_repetitions(self, seq, max_repetition_length): |
|
seq = list(seq) |
|
output = [] |
|
max_n = len(seq) // 2 |
|
for n in range(max_n, 0, -1): |
|
max_repetitions = max(max_repetition_length // n, 1) |
|
|
|
|
|
if (len(seq) <= n*2) or (len(seq) <= max_repetition_length): |
|
continue |
|
iterator = enumerate(seq) |
|
|
|
buffers = [[next(iterator)[1]] for _ in range(n)] |
|
for seq_index, token in iterator: |
|
current_buffer = seq_index % n |
|
if token != buffers[current_buffer][-1]: |
|
|
|
buf_len = sum(map(len, buffers)) |
|
flush_start = (current_buffer-buf_len) % n |
|
|
|
for flush_index in range(buf_len - buf_len%n): |
|
if (buf_len - flush_index) > n-1: |
|
to_flush = buffers[(flush_index + flush_start) % n].pop(0) |
|
else: |
|
to_flush = None |
|
|
|
if (flush_index // n < max_repetitions) and to_flush is not None: |
|
output.append(to_flush) |
|
elif (flush_index // n >= max_repetitions) and to_flush is None: |
|
output.append(to_flush) |
|
buffers[current_buffer].append(token) |
|
|
|
current_buffer += 1 |
|
buf_len = sum(map(len, buffers)) |
|
flush_start = (current_buffer-buf_len) % n |
|
for flush_index in range(buf_len): |
|
to_flush = buffers[(flush_index + flush_start) % n].pop(0) |
|
|
|
if flush_index // n < max_repetitions: |
|
output.append(to_flush) |
|
seq = [] |
|
to_delete = 0 |
|
for token in output: |
|
if token is None: |
|
to_delete += 1 |
|
elif to_delete > 0: |
|
to_delete -= 1 |
|
else: |
|
seq.append(token) |
|
output = [] |
|
return seq |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def classify_file(self, path, device): |
|
|
|
|
|
waveform, sr = librosa.load(path, sr=16000) |
|
|
|
|
|
audio_length = len(waveform) / sr |
|
|
|
if audio_length >= 20: |
|
print(f"Audio is too long ({audio_length:.2f} seconds), splitting into segments") |
|
|
|
non_silent_intervals = librosa.effects.split(waveform, top_db=20) |
|
|
|
segments = [] |
|
current_segment = [] |
|
current_length = 0 |
|
max_duration = 20 * sr |
|
|
|
for interval in non_silent_intervals: |
|
start, end = interval |
|
segment_part = waveform[start:end] |
|
|
|
|
|
if current_length + len(segment_part) > max_duration: |
|
segments.append(np.concatenate(current_segment)) |
|
current_segment = [] |
|
current_length = 0 |
|
|
|
current_segment.append(segment_part) |
|
current_length += len(segment_part) |
|
|
|
|
|
if current_segment: |
|
segments.append(np.concatenate(current_segment)) |
|
|
|
|
|
outputs = [] |
|
for i, segment in enumerate(segments): |
|
print(f"Processing segment {i + 1}/{len(segments)}, length: {len(segment) / sr:.2f} seconds") |
|
|
|
segment_tensor = torch.tensor(segment).to(device) |
|
|
|
|
|
batch = segment_tensor.unsqueeze(0).to(device) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
|
|
|
|
segment_output = self.encode_batch(device, batch, rel_length) |
|
yield segment_output |
|
else: |
|
waveform = torch.tensor(waveform).to(device) |
|
waveform = waveform.to(device) |
|
|
|
batch = waveform.unsqueeze(0) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
outputs = self.encode_batch(device, batch, rel_length) |
|
yield outputs |
|
|