import ray from ray.util.queue import Queue from dotenv import load_dotenv from local_speaker_service import LocalSpeakerService from text_to_speech_service import TextToSpeechService from chat_service import ChatService import asyncio # from ray.actor import ActorHandle from ffmpeg_converter_actor import FFMpegConverterActor from agent_response import AgentResponse from environment_state_actor import EnvironmentStateActor import json @ray.remote class PromptToLLMActor: def __init__( self, environment_state_actor:EnvironmentStateActor, input_queue:Queue, output_queue:Queue): load_dotenv() self.input_queue = input_queue self.output_queue = output_queue self.chat_service = ChatService() self.cancel_event = None self.environment_state_actor = environment_state_actor async def run(self): while True: prompt = await self.input_queue.get_async() self.cancel_event = asyncio.Event() agent_response = AgentResponse(prompt) async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event): if self.chat_service.ignore_sentence(text): is_complete_sentance = False if not is_complete_sentance: agent_response['llm_preview'] = text await self.environment_state_actor.set_llm_preview.remote(text) continue agent_response['llm_preview'] = '' agent_response['llm_sentence'] = text agent_response['llm_sentences'].append(text) await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text) print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}") sentence_response = agent_response.make_copy() await self.output_queue.put_async(sentence_response) agent_response['llm_sentence_id'] += 1 async def cancel(self): if self.cancel_event: self.cancel_event.set() while not self.input_queue.empty(): await self.input_queue.get_async() while not self.output_queue.empty(): await self.output_queue.get_async() @ray.remote class LLMSentanceToSpeechActor: def __init__( self, environment_state_actor:EnvironmentStateActor, input_queue, output_queue, voice_id): load_dotenv() self.input_queue = input_queue self.output_queue = output_queue self.tts_service = TextToSpeechService(voice_id=voice_id) self.cancel_event = None self.environment_state_actor = environment_state_actor async def run(self): while True: sentence_response = await self.input_queue.get_async() self.cancel_event = asyncio.Event() chunk_count = 0 async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event): chunk_response = chunk_response.make_copy() await self.output_queue.put_async(chunk_response) chunk_response = { 'prompt': sentence_response['prompt'], 'llm_sentence_id': sentence_response['llm_sentence_id'], 'chunk_count': chunk_count, } chunk_id_json = json.dumps(chunk_response) await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json) chunk_count += 1 async def cancel(self): if self.cancel_event: self.cancel_event.set() while not self.input_queue.empty(): await self.input_queue.get_async() while not self.output_queue.empty(): await self.output_queue.get_async() # legacy code for playing from local speaker # @ray.remote # class SpeechToSpeakerActor: # def __init__(self, input_queue, voice_id): # load_dotenv() # self.input_queue = input_queue # self.speaker_service = LocalSpeakerService() # async def run(self): # while True: # audio_chunk = await self.input_queue.get_async() # # print (f"Got audio chunk {len(audio_chunk)}") # self.speaker_service.add_audio_stream([audio_chunk]) # async def cancel(self): # while not self.input_queue.empty(): # await self.input_queue.get_async() @ray.remote class SpeechToConverterActor: def __init__( self, input_queue:Queue, ffmpeg_converter_actor:FFMpegConverterActor): load_dotenv() self.input_queue = input_queue self.ffmpeg_converter_actor = ffmpeg_converter_actor async def run(self): await self.ffmpeg_converter_actor.start_process.remote() self.ffmpeg_converter_actor.run.remote() while True: chunk_response = await self.input_queue.get_async() audio_chunk_ref = chunk_response['tts_raw_chunk_ref'] audio_chunk = ray.get(audio_chunk_ref) await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk) async def cancel(self): while not self.input_queue.empty(): await self.input_queue.get_async() @ray.remote class RespondToPromptActor: def __init__( self, environment_state_actor:EnvironmentStateActor, audio_output_queue): voice_id="2OviOUQc1JsQRQgNkVBj" self.prompt_queue = Queue(maxsize=100) self.llm_sentence_queue = Queue(maxsize=100) self.speech_chunk_queue = Queue(maxsize=100) self.environment_state_actor = environment_state_actor self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue) self.prompt_to_llm = PromptToLLMActor.remote( self.environment_state_actor, self.prompt_queue, self.llm_sentence_queue) self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote( self.environment_state_actor, self.llm_sentence_queue, self.speech_chunk_queue, voice_id) # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id) self.speech_output = SpeechToConverterActor.remote( self.speech_chunk_queue, self.ffmpeg_converter_actor) # Start the pipeline components. self.prompt_to_llm.run.remote() self.llm_sentence_to_speech.run.remote() self.speech_output.run.remote() async def enqueue_prompt(self, prompt): print("flush anything queued") prompt_to_llm_future = self.prompt_to_llm.cancel.remote() llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote() speech_output_future = self.speech_output.cancel.remote() ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote() await asyncio.gather( prompt_to_llm_future, llm_sentence_to_speech_future, speech_output_future, ffmpeg_converter_future, ) if len(prompt) > 0: # handles case where we just want to flush await self.prompt_queue.put_async(prompt) print("Enqueued prompt")