Spaces:
Sleeping
Sleeping
File size: 3,346 Bytes
3667c7a 44800eb 3667c7a 44800eb 3667c7a 44800eb 33727a3 44800eb 3667c7a 44800eb 3667c7a 44800eb 3667c7a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import io
import wave
import requests
from openai import OpenAI
from utils.errors import APIError, AudioConversionError
def numpy_audio_to_bytes(audio_data):
sample_rate = 44100
num_channels = 1
sampwidth = 2
buffer = io.BytesIO()
try:
with wave.open(buffer, "wb") as wf:
wf.setnchannels(num_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(sample_rate)
wf.writeframes(audio_data.tobytes())
except Exception as e:
raise AudioConversionError(f"Error converting numpy array to audio bytes: {e}")
return buffer.getvalue()
class STTManager:
def __init__(self, config):
self.config = config
def speech_to_text(self, audio, convert_to_bytes=True):
if convert_to_bytes:
audio = numpy_audio_to_bytes(audio[1])
try:
if self.config.stt.type == "OPENAI_API":
data = ("temp.wav", audio, "audio/wav")
client = OpenAI(base_url=self.config.stt.url, api_key=self.config.stt.key)
transcription = client.audio.transcriptions.create(model=self.config.stt.name, file=data, response_format="text")
elif self.config.stt.type == "HF_API":
headers = {"Authorization": "Bearer " + self.config.stt.key}
response = requests.post(self.config.stt.url, headers=headers, data=audio)
if response.status_code != 200:
error_details = response.json().get("error", "No error message provided")
raise APIError("STT Error: HF API error", status_code=response.status_code, details=error_details)
transcription = response.json().get("text", None)
if transcription is None:
raise APIError("STT Error: No transcription returned by HF API")
except APIError as e:
raise
except Exception as e:
raise APIError(f"STT Error: Unexpected error: {e}")
return transcription
class TTSManager:
def __init__(self, config):
self.config = config
def text_to_speech(self, text):
try:
if self.config.tts.type == "OPENAI_API":
client = OpenAI(base_url=self.config.tts.url, api_key=self.config.tts.key)
response = client.audio.speech.create(model=self.config.tts.name, voice="alloy", response_format="opus", input=text)
elif self.config.tts.type == "HF_API":
headers = {"Authorization": "Bearer " + self.config.tts.key}
response = requests.post(self.config.tts.url, headers=headers, json={"inputs": text})
if response.status_code != 200:
error_details = response.json().get("error", "No error message provided")
raise APIError("TTS Error: HF API error", status_code=response.status_code, details=error_details)
except APIError as e:
raise
except Exception as e:
raise APIError(f"TTS Error: Unexpected error: {e}")
return response.content
def read_last_message(self, chat_display):
if chat_display:
last_message = chat_display[-1][1]
if last_message is not None:
return self.text_to_speech(last_message)
return None
|