File size: 5,469 Bytes
3667c7a
3447ff0
3667c7a
 
ac13632
3667c7a
 
 
 
44800eb
 
3667c7a
 
 
 
 
 
 
44800eb
 
 
 
 
 
 
 
3667c7a
 
 
 
 
 
ac13632
 
3667c7a
ac13632
82598a2
44800eb
 
 
 
33727a3
44800eb
 
 
 
 
 
 
 
 
 
 
 
 
3667c7a
ac13632
 
 
 
 
 
 
 
 
 
 
82598a2
 
3667c7a
 
 
e98b248
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3667c7a
 
e98b248
 
 
 
 
 
 
3447ff0
e98b248
 
 
 
 
 
 
43d5e00
e98b248
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43d5e00
e98b248
 
 
 
 
 
 
 
 
 
 
 
 
55d992f
e98b248
 
 
55d992f
e98b248
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import io
import os
import wave

import numpy as np
import requests

from openai import OpenAI

from utils.errors import APIError, AudioConversionError


def numpy_audio_to_bytes(audio_data):
    sample_rate = 44100
    num_channels = 1
    sampwidth = 2

    buffer = io.BytesIO()
    try:
        with wave.open(buffer, "wb") as wf:
            wf.setnchannels(num_channels)
            wf.setsampwidth(sampwidth)
            wf.setframerate(sample_rate)
            wf.writeframes(audio_data.tobytes())
    except Exception as e:
        raise AudioConversionError(f"Error converting numpy array to audio bytes: {e}")
    return buffer.getvalue()


class STTManager:
    def __init__(self, config):
        self.config = config
        self.status = self.test_stt()
        self.streaming = False

    def speech_to_text(self, audio):
        audio = numpy_audio_to_bytes(audio[1])
        try:
            if self.config.stt.type == "OPENAI_API":
                data = ("temp.wav", audio, "audio/wav")
                client = OpenAI(base_url=self.config.stt.url, api_key=self.config.stt.key)
                transcription = client.audio.transcriptions.create(model=self.config.stt.name, file=data, response_format="text")
            elif self.config.stt.type == "HF_API":
                headers = {"Authorization": "Bearer " + self.config.stt.key}
                response = requests.post(self.config.stt.url, headers=headers, data=audio)
                if response.status_code != 200:
                    error_details = response.json().get("error", "No error message provided")
                    raise APIError("STT Error: HF API error", status_code=response.status_code, details=error_details)
                transcription = response.json().get("text", None)
                if transcription is None:
                    raise APIError("STT Error: No transcription returned by HF API")
        except APIError as e:
            raise
        except Exception as e:
            raise APIError(f"STT Error: Unexpected error: {e}")

        return transcription

    def test_stt(self):
        try:
            self.speech_to_text((48000, np.zeros(10000)))
            return True
        except:
            return False

    def add_user_message(self, audio, chat_display):
        transcription = self.speech_to_text(audio)
        chat_display.append([transcription, None])
        return chat_display


class TTSManager:
    def test_tts(self):
        try:
            self.read_text("Handshake")
            return True
        except:
            return False

    def test_tts_stream(self):
        try:
            for _ in self.read_text_stream("Handshake"):
                pass
            return True
        except:
            return False

    def __init__(self, config):
        self.config = config
        self.status = self.test_tts()
        if self.status:
            self.streaming = self.test_tts_stream()
        else:
            self.streaming = False
        if self.streaming:
            self.read_last_message = self.rlm_stream
        else:
            self.read_last_message = self.rlm

    def read_text(self, text):
        headers = {"Authorization": "Bearer " + self.config.tts.key}
        try:
            if self.config.tts.type == "OPENAI_API":
                data = {"model": self.config.tts.name, "input": text, "voice": "alloy", "response_format": "opus"}
                response = requests.post(self.config.tts.url + "/audio/speech", headers=headers, json=data)
            elif self.config.tts.type == "HF_API":
                response = requests.post(self.config.tts.url, headers=headers, json={"inputs": text})
            if response.status_code != 200:
                error_details = response.json().get("error", "No error message provided")
                raise APIError(f"TTS Error: {self.config.tts.type} error", status_code=response.status_code, details=error_details)
        except APIError as e:
            raise
        except Exception as e:
            raise APIError(f"TTS Error: Unexpected error: {e}")

        return response.content

    def read_text_stream(self, text):
        if self.config.tts.type not in ["OPENAI_API"]:
            raise APIError("TTS Error: Streaming not supported for this TTS type")
        headers = {"Authorization": "Bearer " + self.config.tts.key}
        data = {"model": self.config.tts.name, "input": text, "voice": "alloy", "response_format": "opus"}

        try:
            with requests.post(self.config.tts.url + "/audio/speech", headers=headers, json=data, stream=True) as response:
                if response.status_code != 200:
                    error_details = response.json().get("error", "No error message provided")
                    raise APIError("TTS Error: OPENAI API error", status_code=response.status_code, details=error_details)
                else:
                    yield from response.iter_content(chunk_size=1024)
        except StopIteration:
            pass
        except APIError as e:
            raise
        except Exception as e:
            raise APIError(f"TTS Error: Unexpected error: {e}")

    def rlm(self, chat_history):
        if len(chat_history) > 0 and chat_history[-1][1]:
            return self.read_text(chat_history[-1][1])

    def rlm_stream(self, chat_history):
        if len(chat_history) > 0 and chat_history[-1][1]:
            yield from self.read_text_stream(chat_history[-1][1])