import asyncio import itertools import os from elevenlabs import generate, play from elevenlabs import set_api_key from elevenlabs import generate, stream from agent_response import AgentResponse import ray class TextToSpeechService: def __init__(self, voice_id="Bella", model_id="eleven_monolingual_v1"): # def __init__(self, voice_id="Bella", model_id="eleven_english_v2"): account_sid = os.environ["ELEVENLABS_API_KEY"] set_api_key(account_sid) self._voice_id = voice_id self._model_id = model_id # def print_models(self): # models = generate() # for model in models: # print (model["id"], model["name"]) def print_voices(self): from elevenlabs.api import Voices voices = Voices.from_api() for voice in voices: print (voice) def speak(self, prompt): audio = generate( text=prompt, voice=self._voice_id, model=self._model_id, ) play(audio) return def stream(self, prompt): audio_stream = generate( text=prompt, voice=self._voice_id, model=self._model_id, stream_chunk_size=2048, stream=True, ) return audio_stream async def get_speech_chunks_async(self, sentence_response:AgentResponse, cancel_event=None): text_to_speak = sentence_response['llm_sentence'] stream = self.stream(text_to_speak) stream, stream_backup = itertools.tee(stream) while True: # Check if there's a next item in the stream # Run next(stream) in a separate thread to avoid blocking the event loop chunk = await asyncio.to_thread(next, stream, None) if chunk is None: # Stream is exhausted, exit the loop break chunk_ref = ray.put(chunk) sentence_response['tts_raw_chunk_ref'] = chunk_ref if cancel_event is not None and cancel_event.is_set(): return yield sentence_response sentence_response['tts_raw_chunk_id'] += 1