# coding=utf8 import os import shutil import sys import subprocess import xml.etree.ElementTree as ET from transformers import pipeline from environment import DEFAULT_MODEL_LANGUAGE, DEFAULT_MODEL, DEFAULT_LANGUAGE, DEVICE # class for annotation segments class Segment: def __init__(self, segment_id: str, start: int, end: int): self.segment_id:str = segment_id self.start:int = start self.end:int = end self.transcription:str = '' #self.ts_start:str = ts_start #self.ts_end:str = ts_end def getTimeSlots(eaf): time_slot_dic = {} order = eaf.find('TIME_ORDER') for slot in order: time_slot_dic[slot.get('TIME_SLOT_ID')] = slot.get('TIME_VALUE') return time_slot_dic def getAnnotationSegments(eaf, tier_type): segment_list = [] time_slot_dic = getTimeSlots(eaf) for tier in eaf.findall('TIER'): if tier.get('LINGUISTIC_TYPE_REF') == tier_type: for annotation in tier: alignable_annotation = annotation.find('ALIGNABLE_ANNOTATION') segment_id = alignable_annotation.get('ANNOTATION_ID') start = time_slot_dic[alignable_annotation.get('TIME_SLOT_REF1')] end = time_slot_dic[alignable_annotation.get('TIME_SLOT_REF2')] segment_list.append(Segment(segment_id, start, end)) return segment_list def splice_audio(audio_path, segment_id, start, end, temp_dir): file_path = f"{temp_dir}/{segment_id}.wav" if os.path.exists(file_path): os.remove(file_path) subprocess.call([ "ffmpeg", "-loglevel", "fatal", "-hide_banner", "-nostdin", "-i", audio_path, "-ss", f"{int(start)/1000}", "-to", f"{int(end)/1000}", file_path ]) return f"{temp_dir}/{segment_id}.wav" # transcribes a single and returns the transcription def transcribe_audio(model_id, audio_path): transcribe = pipeline( task = "automatic-speech-recognition", model = model_id, chunk_length_s = 30, device = DEVICE, ) transcribe.model.config.forced_decoder_ids = transcribe.tokenizer.get_decoder_prompt_ids(language='bengali', task="transcribe") result = transcribe(audio_path, max_new_tokens=448) transcription = result["text"].strip() #print(f"Transcription for {audio_path}: {transcription}") return transcription def transcribe_eaf(eaf_path, audio_path, tier_type): eaf_tree = ET.parse(eaf_path) eaf_root = eaf_tree.getroot() segment_list = getAnnotationSegments(eaf_root, tier_type) if not os.path.exists('temp_dir'): os.makedirs('temp_dir') for segment in segment_list: # get the audio segment segment_audio_file = splice_audio(audio_path, segment.segment_id, segment.start, segment.end, 'temp_dir') segment.transcription = transcribe_audio(DEFAULT_MODEL, segment_audio_file) os.remove(segment_audio_file) print(f'{segment.segment_id}\t{segment.transcription}') shutil.rmtree('temp_dir') for segment in segment_list: for e in eaf_root.iter(): if e.tag == 'ALIGNABLE_ANNOTATION' and e.get('ANNOTATION_ID') == segment.segment_id: e.find('ANNOTATION_VALUE').text = segment.transcription new_eaf_path = f'{eaf_path[:-4]}_autotranscribed.eaf' eaf_tree.write(new_eaf_path, encoding='utf-8', xml_declaration=True) return new_eaf_path