project_charles / respond_to_prompt_actor.py
sohojoe's picture
animate charles
cf5e7f4
raw
history blame
7.6 kB
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from local_speaker_service import LocalSpeakerService
from text_to_speech_service import TextToSpeechService
from chat_service import ChatService
import asyncio
# from ray.actor import ActorHandle
from ffmpeg_converter_actor import FFMpegConverterActor
from agent_response import AgentResponse
from environment_state_actor import EnvironmentStateActor
from agent_state_actor import AgentStateActor
from agent_state_actor import AgentState
import json
@ray.remote
class PromptToLLMActor:
def __init__(
self,
environment_state_actor:EnvironmentStateActor,
input_queue:Queue,
output_queue:Queue):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.chat_service = ChatService()
self.cancel_event = None
self.environment_state_actor = environment_state_actor
async def run(self):
while True:
prompt = await self.input_queue.get_async()
self.cancel_event = asyncio.Event()
agent_response = AgentResponse(prompt)
async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
if self.chat_service.ignore_sentence(text):
is_complete_sentance = False
if not is_complete_sentance:
agent_response['llm_preview'] = text
await self.environment_state_actor.set_llm_preview.remote(text)
continue
agent_response['llm_preview'] = ''
agent_response['llm_sentence'] = text
agent_response['llm_sentences'].append(text)
await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
sentence_response = agent_response.make_copy()
await self.output_queue.put_async(sentence_response)
agent_response['llm_sentence_id'] += 1
async def cancel(self):
if self.cancel_event:
self.cancel_event.set()
while not self.input_queue.empty():
await self.input_queue.get_async()
while not self.output_queue.empty():
await self.output_queue.get_async()
@ray.remote
class LLMSentanceToSpeechActor:
def __init__(
self,
environment_state_actor:EnvironmentStateActor,
input_queue,
output_queue,
voice_id):
load_dotenv()
self.input_queue = input_queue
self.output_queue = output_queue
self.tts_service = TextToSpeechService(voice_id=voice_id)
self.cancel_event = None
self.environment_state_actor = environment_state_actor
async def run(self):
while True:
sentence_response = await self.input_queue.get_async()
self.cancel_event = asyncio.Event()
chunk_count = 0
async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event):
chunk_response = chunk_response.make_copy()
await self.output_queue.put_async(chunk_response)
chunk_response = {
'prompt': sentence_response['prompt'],
'llm_sentence_id': sentence_response['llm_sentence_id'],
'chunk_count': chunk_count,
}
chunk_id_json = json.dumps(chunk_response)
await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
chunk_count += 1
async def cancel(self):
if self.cancel_event:
self.cancel_event.set()
while not self.input_queue.empty():
await self.input_queue.get_async()
while not self.output_queue.empty():
await self.output_queue.get_async()
# legacy code for playing from local speaker
# @ray.remote
# class SpeechToSpeakerActor:
# def __init__(self, input_queue, voice_id):
# load_dotenv()
# self.input_queue = input_queue
# self.speaker_service = LocalSpeakerService()
# async def run(self):
# while True:
# audio_chunk = await self.input_queue.get_async()
# # print (f"Got audio chunk {len(audio_chunk)}")
# self.speaker_service.add_audio_stream([audio_chunk])
# async def cancel(self):
# while not self.input_queue.empty():
# await self.input_queue.get_async()
@ray.remote
class SpeechToConverterActor:
def __init__(
self,
input_queue:Queue,
ffmpeg_converter_actor:FFMpegConverterActor):
load_dotenv()
self.input_queue = input_queue
self.ffmpeg_converter_actor = ffmpeg_converter_actor
async def run(self):
await self.ffmpeg_converter_actor.start_process.remote()
self.ffmpeg_converter_actor.run.remote()
while True:
chunk_response = await self.input_queue.get_async()
audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
audio_chunk = ray.get(audio_chunk_ref)
await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
async def cancel(self):
while not self.input_queue.empty():
await self.input_queue.get_async()
@ray.remote
class RespondToPromptActor:
def __init__(
self,
environment_state_actor:EnvironmentStateActor,
out_audio_queue):
voice_id="2OviOUQc1JsQRQgNkVBj"
self.prompt_queue = Queue(maxsize=100)
self.llm_sentence_queue = Queue(maxsize=100)
self.speech_chunk_queue = Queue(maxsize=100)
self.environment_state_actor = environment_state_actor
self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue)
self.prompt_to_llm = PromptToLLMActor.remote(
self.environment_state_actor,
self.prompt_queue,
self.llm_sentence_queue)
self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(
self.environment_state_actor,
self.llm_sentence_queue,
self.speech_chunk_queue,
voice_id)
# self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
self.speech_output = SpeechToConverterActor.remote(
self.speech_chunk_queue,
self.ffmpeg_converter_actor)
# Start the pipeline components.
self.prompt_to_llm.run.remote()
self.llm_sentence_to_speech.run.remote()
self.speech_output.run.remote()
async def enqueue_prompt(self, prompt):
print("flush anything queued")
prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
speech_output_future = self.speech_output.cancel.remote()
ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote()
await asyncio.gather(
prompt_to_llm_future,
llm_sentence_to_speech_future,
speech_output_future,
ffmpeg_converter_future,
)
await self.prompt_queue.put_async(prompt)
print("Enqueued prompt")