|
import torch |
|
from speechbrain.inference.interfaces import Pretrained |
|
import librosa |
|
import numpy as np |
|
|
|
|
|
class ASR(Pretrained): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
def encode_batch_w2v2(self, device, wavs, wav_lens=None, normalize=False): |
|
wavs = wavs.to(device) |
|
wav_lens = wav_lens.to(device) |
|
|
|
|
|
encoded_outputs = self.mods.encoder_w2v2(wavs.detach()) |
|
|
|
tokens_bos = torch.zeros((wavs.size(0), 1), dtype=torch.long).to(device) |
|
embedded_tokens = self.mods.embedding(tokens_bos) |
|
decoder_outputs, _ = self.mods.decoder(embedded_tokens, encoded_outputs, wav_lens) |
|
|
|
|
|
predictions = self.hparams.test_search(encoded_outputs, wav_lens)[0] |
|
|
|
predicted_words = [] |
|
for prediction in predictions: |
|
prediction = [token for token in prediction if token != 0] |
|
predicted_words.append(self.hparams.tokenizer.decode_ids(prediction).split(" ")) |
|
prediction = [] |
|
for sent in predicted_words: |
|
sent = self.filter_repetitions(sent, 3) |
|
prediction.append(sent) |
|
predicted_words = prediction |
|
return predicted_words |
|
|
|
|
|
def encode_batch_whisper(self, device, wavs, wav_lens=None, normalize=False): |
|
wavs = wavs.to(device) |
|
wav_lens = wav_lens.to(device) |
|
|
|
|
|
tokens = torch.tensor([[1, 1]]) * self.mods.whisper.config.decoder_start_token_id |
|
tokens = tokens.to(device) |
|
enc_out, logits, _ = self.mods.whisper(wavs.detach(), tokens.detach()) |
|
log_probs = self.hparams.log_softmax(logits) |
|
|
|
hyps, _, _, _ = self.hparams.test_search(enc_out.detach(), wav_lens) |
|
predicted_words = [self.mods.whisper.tokenizer.decode(token, skip_special_tokens=True).strip() for token in hyps] |
|
return predicted_words |
|
|
|
|
|
def filter_repetitions(self, seq, max_repetition_length): |
|
seq = list(seq) |
|
output = [] |
|
max_n = len(seq) // 2 |
|
for n in range(max_n, 0, -1): |
|
max_repetitions = max(max_repetition_length // n, 1) |
|
|
|
|
|
if (len(seq) <= n*2) or (len(seq) <= max_repetition_length): |
|
continue |
|
iterator = enumerate(seq) |
|
|
|
buffers = [[next(iterator)[1]] for _ in range(n)] |
|
for seq_index, token in iterator: |
|
current_buffer = seq_index % n |
|
if token != buffers[current_buffer][-1]: |
|
|
|
buf_len = sum(map(len, buffers)) |
|
flush_start = (current_buffer-buf_len) % n |
|
|
|
for flush_index in range(buf_len - buf_len%n): |
|
if (buf_len - flush_index) > n-1: |
|
to_flush = buffers[(flush_index + flush_start) % n].pop(0) |
|
else: |
|
to_flush = None |
|
|
|
if (flush_index // n < max_repetitions) and to_flush is not None: |
|
output.append(to_flush) |
|
elif (flush_index // n >= max_repetitions) and to_flush is None: |
|
output.append(to_flush) |
|
buffers[current_buffer].append(token) |
|
|
|
current_buffer += 1 |
|
buf_len = sum(map(len, buffers)) |
|
flush_start = (current_buffer-buf_len) % n |
|
for flush_index in range(buf_len): |
|
to_flush = buffers[(flush_index + flush_start) % n].pop(0) |
|
|
|
if flush_index // n < max_repetitions: |
|
output.append(to_flush) |
|
seq = [] |
|
to_delete = 0 |
|
for token in output: |
|
if token is None: |
|
to_delete += 1 |
|
elif to_delete > 0: |
|
to_delete -= 1 |
|
else: |
|
seq.append(token) |
|
output = [] |
|
return seq |
|
|
|
|
|
def increase_volume(self, waveform, threshold_db=-25): |
|
|
|
loudness_vector = librosa.feature.rms(y=waveform) |
|
average_loudness = np.mean(loudness_vector) |
|
average_loudness_db = librosa.amplitude_to_db(average_loudness) |
|
|
|
print(f"Average Loudness: {average_loudness_db} dB") |
|
|
|
|
|
if average_loudness_db < threshold_db: |
|
|
|
gain_db = threshold_db - average_loudness_db |
|
gain = librosa.db_to_amplitude(gain_db) |
|
|
|
|
|
waveform = waveform * gain |
|
loudness_vector = librosa.feature.rms(y=waveform) |
|
average_loudness = np.mean(loudness_vector) |
|
average_loudness_db = librosa.amplitude_to_db(average_loudness) |
|
|
|
print(f"Average Loudness: {average_loudness_db} dB") |
|
return waveform |
|
|
|
|
|
def classify_file_w2v2(self, waveform, device): |
|
|
|
|
|
|
|
|
|
audio_length = len(waveform) / 16000 |
|
|
|
if audio_length >= 30: |
|
|
|
segments = [] |
|
all_segments = [] |
|
max_duration = 30 * 16000 |
|
num_segments = int(np.ceil(len(waveform) / max_duration)) |
|
start = 0 |
|
for i in range(num_segments): |
|
end = start + max_duration |
|
if end > len(waveform): |
|
end = len(waveform) |
|
segment_part = waveform[start:end] |
|
segment_len = len(segment_part) / 16000 |
|
if segment_len < 1: |
|
continue |
|
segments.append(segment_part) |
|
start = end |
|
|
|
for segment in segments: |
|
segment_tensor = torch.tensor(segment).to(device) |
|
|
|
|
|
batch = segment_tensor.unsqueeze(0).to(device) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
|
|
|
|
segment_output = self.encode_batch_w2v2(device, batch, rel_length) |
|
segment_output = [" ".join(segment) for segment in segment_output] |
|
all_segments.append(segment_output) |
|
|
|
segments = "" |
|
for segment in all_segments: |
|
segment = segment[0] |
|
segments += segment + " " |
|
return [segments] |
|
else: |
|
waveform = torch.tensor(waveform).to(device) |
|
waveform = waveform.to(device) |
|
|
|
batch = waveform.unsqueeze(0) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
outputs = self.encode_batch_w2v2(device, batch, rel_length) |
|
return [" ".join(out) for out in outputs] |
|
|
|
|
|
|
|
|
|
def classify_file_whisper_mkd(self, waveform, device): |
|
|
|
|
|
|
|
|
|
audio_length = len(waveform) / 16000 |
|
|
|
if audio_length >= 30: |
|
|
|
segments = [] |
|
all_segments = [] |
|
max_duration = 30 * 16000 |
|
num_segments = int(np.ceil(len(waveform) / max_duration)) |
|
start = 0 |
|
for i in range(num_segments): |
|
end = start + max_duration |
|
if end > len(waveform): |
|
end = len(waveform) |
|
segment_part = waveform[start:end] |
|
segment_len = len(segment_part) / 16000 |
|
if segment_len < 1: |
|
continue |
|
segments.append(segment_part) |
|
start = end |
|
|
|
for segment in segments: |
|
segment_tensor = torch.tensor(segment).to(device) |
|
|
|
|
|
batch = segment_tensor.unsqueeze(0).to(device) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
|
|
|
|
segment_output = self.encode_batch_whisper(device, batch, rel_length) |
|
|
|
all_segments.append(segment_output) |
|
|
|
segments = "" |
|
for segment in all_segments: |
|
segment = segment[0] |
|
segments += segment + " " |
|
return [segments] |
|
else: |
|
waveform = torch.tensor(waveform).to(device) |
|
waveform = waveform.to(device) |
|
batch = waveform.unsqueeze(0) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
outputs = self.encode_batch_whisper(device, batch, rel_length) |
|
return outputs |
|
|
|
|
|
def classify_file_whisper_mkd_streaming(self, waveform, device): |
|
|
|
|
|
|
|
|
|
audio_length = len(waveform) / 16000 |
|
|
|
if audio_length >= 20: |
|
|
|
segments = [] |
|
max_duration = 20 * 16000 |
|
num_segments = int(np.ceil(len(waveform) / max_duration)) |
|
start = 0 |
|
for i in range(num_segments): |
|
end = start + max_duration |
|
if end > len(waveform): |
|
end = len(waveform) |
|
segment_part = waveform[start:end] |
|
segment_len = len(segment_part) / 16000 |
|
if segment_len < 1: |
|
continue |
|
segments.append(segment_part) |
|
start = end |
|
|
|
for segment in segments: |
|
segment_tensor = torch.tensor(segment).to(device) |
|
|
|
|
|
batch = segment_tensor.unsqueeze(0).to(device) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
|
|
|
|
segment_output = self.encode_batch_whisper(device, batch, rel_length) |
|
yield segment_output |
|
else: |
|
waveform = torch.tensor(waveform).to(device) |
|
waveform = waveform.to(device) |
|
batch = waveform.unsqueeze(0) |
|
rel_length = torch.tensor([1.0]).to(device) |
|
outputs = self.encode_batch_whisper(device, batch, rel_length) |
|
yield outputs |
|
|
|
|
|
def classify_file_whisper(self, waveform, pipe, device): |
|
|
|
transcription = pipe(waveform, generate_kwargs={"language": "macedonian"})["text"] |
|
return [transcription] |
|
|
|
|
|
def classify_file_mms(self, waveform, processor, model, device): |
|
|
|
|
|
|
|
|
|
audio_length = len(waveform) / 16000 |
|
|
|
if audio_length >= 30: |
|
|
|
segments = [] |
|
all_segments = [] |
|
max_duration = 30 * 16000 |
|
num_segments = int(np.ceil(len(waveform) / max_duration)) |
|
start = 0 |
|
for i in range(num_segments): |
|
end = start + max_duration |
|
if end > len(waveform): |
|
end = len(waveform) |
|
segment_part = waveform[start:end] |
|
segment_len = len(segment_part) / 16000 |
|
if segment_len < 1: |
|
continue |
|
segments.append(segment_part) |
|
start = end |
|
|
|
for segment in segments: |
|
segment_tensor = torch.tensor(segment).to(device) |
|
|
|
|
|
inputs = processor(segment_tensor, sampling_rate=16_000, return_tensors="pt").to(device) |
|
inputs['input_values'] = inputs['input_values'] |
|
outputs = model(**inputs).logits |
|
ids = torch.argmax(outputs, dim=-1)[0] |
|
segment_output = processor.decode(ids) |
|
|
|
all_segments.append(segment_output) |
|
|
|
segments = "" |
|
for segment in all_segments: |
|
segments += segment + " " |
|
return [segments] |
|
else: |
|
waveform = torch.tensor(waveform).to(device) |
|
inputs = processor(waveform, sampling_rate=16_000, return_tensors="pt").to(device) |
|
inputs['input_values'] = inputs['input_values'] |
|
outputs = model(**inputs).logits |
|
ids = torch.argmax(outputs, dim=-1)[0] |
|
transcription = processor.decode(ids) |
|
return [transcription] |
|
|