Spaces:
Sleeping
Sleeping
import os | |
from pyannote.audio import Pipeline | |
from pydub import AudioSegment | |
from transformers import WhisperForConditionalGeneration, WhisperProcessor | |
import torchaudio | |
import torch | |
device = 0 if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float32 | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
#MODEL_NAME = "openai/whisper-large-v3" | |
MODEL_NAME = "projecte-aina/whisper-large-v3-ca-es-synth-cs" | |
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, torch_dtype=torch_dtype,token=HF_TOKEN).to(device) | |
processor = WhisperProcessor.from_pretrained(MODEL_NAME) | |
pipeline_vad = Pipeline.from_pretrained("./pyannote/config.yaml") | |
threshold = 15000 # adjust max duration threshold | |
segments_dir = "." | |
def clean_text(input_text): | |
remove_chars = ['.', ',', ';', ':', '¿', '?', '«', '»', '-', '¡', '!', '@', | |
'*', '{', '}', '[', ']', '=', '/', '\\', '&', '#', '…'] | |
output_text = ''.join(char if char not in remove_chars else ' ' for char in input_text) #removing special chars | |
return (' '.join(output_text.split()).lower()) #remove extra spaces and return cleaned text | |
def convert_forced_to_tokens(forced_decoder_ids): | |
forced_decoder_tokens = [] | |
for i, (idx, token) in enumerate(forced_decoder_ids): | |
if token is not None: | |
forced_decoder_tokens.append([idx, processor.tokenizer.decode(token)]) | |
else: | |
forced_decoder_tokens.append([idx, token]) | |
return forced_decoder_tokens | |
def generate_1st_chunk(audio): | |
input_audio, sample_rate = torchaudio.load(audio) | |
input_audio = torchaudio.transforms.Resample(sample_rate, 16000)(input_audio) | |
input_speech = input_audio[0] | |
input_features = processor(input_speech, | |
sampling_rate=16_000, | |
return_tensors="pt", torch_dtype=torch_dtype).input_features.to(device) | |
forced_decoder_ids = [] | |
forced_decoder_ids.append([1,50270]) #[1, '<|ca|>'] | |
forced_decoder_ids.append([2,50262]) #[2, '<|es|>'] | |
forced_decoder_ids.append([3,50360]) #[3, '<|transcribe|>'] | |
forced_decoder_ids_modified = forced_decoder_ids | |
# we need to force these tokens | |
forced_decoder_ids = [] | |
# now we need to append the prefix tokens (lang, task, timestamps) | |
offset = len(forced_decoder_ids) | |
for idx, token in forced_decoder_ids_modified: | |
forced_decoder_ids.append([idx + offset , token]) | |
model.generation_config.forced_decoder_ids = forced_decoder_ids | |
pred_ids = model.generate(input_features, | |
return_timestamps=True, | |
max_new_tokens=128) | |
#exclude prompt from output | |
forced_decoder_tokens = convert_forced_to_tokens(forced_decoder_ids) | |
output = processor.decode(pred_ids[0][len(forced_decoder_tokens) + 1:], skip_special_tokens=True) | |
return output[1:] | |
def generate_from_2nd_chunk(audio, prev_prompt): | |
input_audio, sample_rate = torchaudio.load(audio) | |
input_audio = torchaudio.transforms.Resample(sample_rate, 16000)(input_audio) | |
input_speech = input_audio[0] | |
input_features = processor(input_speech, | |
sampling_rate=16_000, | |
return_tensors="pt", torch_dtype=torch_dtype).input_features.to(device) | |
forced_decoder_ids = [] | |
forced_decoder_ids.append([1,50270]) #[1, '<|ca|>'] | |
forced_decoder_ids.append([2,50262]) #[2, '<|es|>'] | |
forced_decoder_ids.append([3,50360]) #[3, '<|transcribe|>'] | |
forced_decoder_ids_modified = forced_decoder_ids | |
idx = processor.tokenizer.all_special_tokens.index("<|startofprev|>") | |
forced_bos_token_id = processor.tokenizer.all_special_ids[idx] | |
prompt_tokens = processor.tokenizer(prev_prompt, add_special_tokens=False).input_ids | |
# we need to force these tokens | |
forced_decoder_ids = [] | |
for idx, token in enumerate(prompt_tokens): | |
# indexing starts from 1 for forced tokens (token at position 0 is the SOS token) | |
forced_decoder_ids.append([idx + 1, token]) | |
# now we add the SOS token at the end | |
offset = len(forced_decoder_ids) | |
forced_decoder_ids.append([offset + 1, model.generation_config.decoder_start_token_id]) | |
# now we need to append the rest of the prefix tokens (lang, task, timestamps) | |
offset = len(forced_decoder_ids) | |
for idx, token in forced_decoder_ids_modified: | |
forced_decoder_ids.append([idx + offset , token]) | |
model.generation_config.forced_decoder_ids = forced_decoder_ids | |
pred_ids = model.generate(input_features, | |
return_timestamps=True, | |
max_new_tokens=128, | |
decoder_start_token_id=forced_bos_token_id) | |
#exclude prompt from output | |
forced_decoder_tokens = convert_forced_to_tokens(forced_decoder_ids) | |
output = processor.decode(pred_ids[0][len(forced_decoder_tokens) + 1:], skip_special_tokens=True) | |
return output[1:] | |
def processing_vad_v3(audio, output_vad, prev_prompt): | |
transcription_audio = "" | |
first_chunk = True | |
for speech in output_vad.get_timeline().support(): | |
start, end = speech.start, speech.end | |
segment_audio = audio[start * 1000:end * 1000] | |
filename = os.path.join(segments_dir, f"temp_segment.wav") | |
segment_audio.export(filename, format="wav") | |
if first_chunk: | |
output = generate_1st_chunk(filename) | |
first_chunk = False | |
else: | |
output = generate_from_2nd_chunk(filename, prev_prompt) | |
prev_prompt = output | |
transcription_audio = transcription_audio + " " + output | |
return transcription_audio | |
def processing_vad_v4(audio, output_vad, threshold, max_duration, prev_prompt, concatenated_segment): | |
transcription_audio = "" | |
is_first_chunk = True | |
for speech in output_vad.get_timeline().support(): | |
start, end = speech.start, speech.end | |
segment_duration = (end - start) * 1000 | |
segment_audio = audio[start * 1000:end * 1000] | |
if max_duration + segment_duration < threshold: | |
concatenated_segment += audio[start * 1000:end * 1000] | |
max_duration += segment_duration | |
else: | |
if len(concatenated_segment) > 0: | |
temp_segment_path = os.path.join(segments_dir, f"temp_segment.wav") | |
concatenated_segment.export(temp_segment_path, format="wav") | |
if is_first_chunk: | |
output = generate_1st_chunk(temp_segment_path) | |
is_first_chunk = False | |
else: | |
output = generate_from_2nd_chunk(temp_segment_path, prev_prompt) | |
prev_prompt = output | |
transcription_audio = transcription_audio + output | |
max_duration = segment_duration | |
concatenated_segment = segment_audio | |
# Process any remaining audio in the concatenated_segment | |
if len(concatenated_segment) > 0: | |
temp_segment_path = os.path.join(segments_dir, f"temp_segment.wav") | |
concatenated_segment.export(temp_segment_path, format="wav") | |
output = generate_from_2nd_chunk(temp_segment_path, prev_prompt) | |
prev_prompt = output | |
transcription_audio = transcription_audio + output | |
return transcription_audio | |
def generate(audio_path, use_v4): | |
#check audio lenght | |
audio = AudioSegment.from_wav(audio_path) | |
duration_seconds = len(audio) / 1000.0 | |
#apply VAD only if the duration is >30s | |
if duration_seconds >= 30: | |
output_vad = pipeline_vad(audio_path) | |
concatenated_segment = AudioSegment.empty() | |
max_duration = 0 | |
prev_prompt = "" | |
if use_v4: | |
return processing_vad_v4(audio, output_vad, threshold, max_duration, prev_prompt, concatenated_segment) | |
else: | |
return processing_vad_v3(audio, output_vad, prev_prompt) | |
else: | |
#if duraion is <30s, process directly with generate | |
return generate_1st_chunk(audio_path) | |