File size: 3,618 Bytes
4dc8f4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# coding=utf8
import os
import shutil
import sys
import subprocess
import xml.etree.ElementTree as ET
from transformers import pipeline
from environment import DEFAULT_MODEL_LANGUAGE, DEFAULT_MODEL, DEFAULT_LANGUAGE, DEVICE

# class for annotation segments
class Segment:
    def __init__(self, segment_id: str, start: int, end: int):
        self.segment_id:str = segment_id
        self.start:int = start
        self.end:int = end
        self.transcription:str = ''
        #self.ts_start:str = ts_start
        #self.ts_end:str = ts_end

def getTimeSlots(eaf):
    time_slot_dic = {}

    order = eaf.find('TIME_ORDER')
    for slot in order:
        time_slot_dic[slot.get('TIME_SLOT_ID')] = slot.get('TIME_VALUE')
    
    return time_slot_dic

def getAnnotationSegments(eaf, tier_type):
    segment_list = []

    time_slot_dic = getTimeSlots(eaf)

    for tier in eaf.findall('TIER'):
        if tier.get('LINGUISTIC_TYPE_REF') == tier_type:
            for annotation in tier:
                alignable_annotation = annotation.find('ALIGNABLE_ANNOTATION')
                segment_id = alignable_annotation.get('ANNOTATION_ID')
                start = time_slot_dic[alignable_annotation.get('TIME_SLOT_REF1')]
                end = time_slot_dic[alignable_annotation.get('TIME_SLOT_REF2')]
                segment_list.append(Segment(segment_id, start, end))

    return segment_list

def splice_audio(audio_path, segment_id, start, end, temp_dir):
    file_path = f"{temp_dir}/{segment_id}.wav"

    if os.path.exists(file_path):
        os.remove(file_path)

    subprocess.call([
        "ffmpeg", 
        "-loglevel", "fatal", 
        "-hide_banner", 
        "-nostdin",
        "-i", audio_path,
        "-ss", f"{int(start)/1000}",
        "-to", f"{int(end)/1000}",
        file_path
    ])
    return f"{temp_dir}/{segment_id}.wav"


# transcribes a single and returns the transcription
def transcribe_audio(model_id, audio_path):
    transcribe = pipeline(
        task = "automatic-speech-recognition",
        model = model_id,
        chunk_length_s = 30,
        device = DEVICE,
    )
    
    transcribe.model.config.forced_decoder_ids = transcribe.tokenizer.get_decoder_prompt_ids(language='bengali', task="transcribe")
    
    result = transcribe(audio_path, max_new_tokens=448)
    
    transcription = result["text"].strip()
    
    #print(f"Transcription for {audio_path}: {transcription}")
    
    return transcription


def transcribe_eaf(eaf_path, audio_path, tier_type):

    eaf_tree = ET.parse(eaf_path)
    eaf_root = eaf_tree.getroot()
    
    segment_list = getAnnotationSegments(eaf_root, tier_type)

    if not os.path.exists('temp_dir'):
        os.makedirs('temp_dir')

    for segment in segment_list:
        # get the audio segment
        segment_audio_file = splice_audio(audio_path, segment.segment_id, segment.start, segment.end, 'temp_dir')
        segment.transcription = transcribe_audio(DEFAULT_MODEL, segment_audio_file)
        os.remove(segment_audio_file)

        print(f'{segment.segment_id}\t{segment.transcription}')
    shutil.rmtree('temp_dir')

    for segment in segment_list:
        for e in eaf_root.iter():
            if e.tag == 'ALIGNABLE_ANNOTATION' and e.get('ANNOTATION_ID') == segment.segment_id:
                e.find('ANNOTATION_VALUE').text = segment.transcription

    new_eaf_path = f'{eaf_path[:-4]}_autotranscribed.eaf'
    eaf_tree.write(new_eaf_path, encoding='utf-8', xml_declaration=True)

    return new_eaf_path