File size: 2,889 Bytes
162d5c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import os
import asyncio
from vosk import SetLogLevel, Model, KaldiRecognizer
from multiprocessing import Process, Queue
from queue import Empty
SetLogLevel(-1) # mutes vosk verbosity

class SpeechToTextVosk:
    def __init__(self, model='small', audio_bit_rate=16000) -> None:
        self.model = model
        self.audio_bit_rate = audio_bit_rate

        # Create a Queue for inter-process communication
        self.queue = Queue()
        self.result_queue = Queue()

        # Create and start a new Process with the worker function
        self.process = Process(target=self.worker)
        self.process.start()

    def worker(self):
        # load vosk model
        # get path of current file
        current_file_path = os.path.abspath(__file__)
        current_directory = os.path.dirname(current_file_path)
        _path = os.path.join(current_directory, 'models', 'vosk', self.model)
        model_voice = Model(_path)
        vosk = KaldiRecognizer(model_voice, self.audio_bit_rate)

        while True:
            try:
                # Get the next item from the queue. Blocks for 1s if necessary.
                data = self.queue.get(timeout=1)

                # Stop the worker if the sentinel None is received
                if data is None:
                    break
                
                text, speaker_finished = self._process_speech(vosk, data)

                # put the result into result_queue
                self.result_queue.put((text, speaker_finished))
            except Empty:
                pass

    def add_speech_bytes(self, data: bytearray):
        self.queue.put(data)

    def _process_speech(self, vosk: KaldiRecognizer, data: bytearray) -> tuple[str, bool]:
        text = ''
        speaker_finished = False
        if vosk.AcceptWaveform(data):
            result = vosk.Result()
            result_json = json.loads(result)
            text = result_json['text']
            speaker_finished = True
        else:
            result = vosk.PartialResult()
            result_json = json.loads(result)
            text = result_json['partial']
        return text, speaker_finished

    def get_text(self):
        text = ''
        speaker_finished = False
        while not self.result_queue.empty():
            result, speaker_finished = self.result_queue.get()
            text += result
            if speaker_finished:
                break
        return (text, speaker_finished)
    
    def get_audio_bit_rate(self):
        return self.audio_bit_rate


    def shutdown(self):
        # Send sentinel value to stop the worker
        self.queue.put(None)
        # Wait for the worker process to finish
        self.process.join()
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.shutdown()

    def __del__(self):
        self.shutdown()