Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import ray | |
from ray.util.queue import Queue | |
from dotenv import load_dotenv | |
from local_speaker_service import LocalSpeakerService | |
from text_to_speech_service import TextToSpeechService | |
from chat_service import ChatService | |
import asyncio | |
# from ray.actor import ActorHandle | |
from ffmpeg_converter_actor import FFMpegConverterActor | |
from agent_response import AgentResponse | |
from environment_state_actor import EnvironmentStateActor | |
from agent_state_actor import AgentStateActor | |
from agent_state_actor import AgentState | |
import json | |
class PromptToLLMActor: | |
def __init__( | |
self, | |
environment_state_actor:EnvironmentStateActor, | |
input_queue:Queue, | |
output_queue:Queue): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.output_queue = output_queue | |
self.chat_service = ChatService() | |
self.cancel_event = None | |
self.environment_state_actor = environment_state_actor | |
async def run(self): | |
while True: | |
prompt = await self.input_queue.get_async() | |
self.cancel_event = asyncio.Event() | |
agent_response = AgentResponse(prompt) | |
async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event): | |
if self.chat_service.ignore_sentence(text): | |
is_complete_sentance = False | |
if not is_complete_sentance: | |
agent_response['llm_preview'] = text | |
await self.environment_state_actor.set_llm_preview.remote(text) | |
continue | |
agent_response['llm_preview'] = '' | |
agent_response['llm_sentence'] = text | |
agent_response['llm_sentences'].append(text) | |
await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text) | |
print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}") | |
sentence_response = agent_response.make_copy() | |
await self.output_queue.put_async(sentence_response) | |
agent_response['llm_sentence_id'] += 1 | |
async def cancel(self): | |
if self.cancel_event: | |
self.cancel_event.set() | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
while not self.output_queue.empty(): | |
await self.output_queue.get_async() | |
class LLMSentanceToSpeechActor: | |
def __init__( | |
self, | |
environment_state_actor:EnvironmentStateActor, | |
input_queue, | |
output_queue, | |
voice_id): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.output_queue = output_queue | |
self.tts_service = TextToSpeechService(voice_id=voice_id) | |
self.cancel_event = None | |
self.environment_state_actor = environment_state_actor | |
async def run(self): | |
while True: | |
sentence_response = await self.input_queue.get_async() | |
self.cancel_event = asyncio.Event() | |
chunk_count = 0 | |
async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event): | |
chunk_response = chunk_response.make_copy() | |
await self.output_queue.put_async(chunk_response) | |
chunk_response = { | |
'prompt': sentence_response['prompt'], | |
'llm_sentence_id': sentence_response['llm_sentence_id'], | |
'chunk_count': chunk_count, | |
} | |
chunk_id_json = json.dumps(chunk_response) | |
await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json) | |
chunk_count += 1 | |
async def cancel(self): | |
if self.cancel_event: | |
self.cancel_event.set() | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
while not self.output_queue.empty(): | |
await self.output_queue.get_async() | |
# legacy code for playing from local speaker | |
# @ray.remote | |
# class SpeechToSpeakerActor: | |
# def __init__(self, input_queue, voice_id): | |
# load_dotenv() | |
# self.input_queue = input_queue | |
# self.speaker_service = LocalSpeakerService() | |
# async def run(self): | |
# while True: | |
# audio_chunk = await self.input_queue.get_async() | |
# # print (f"Got audio chunk {len(audio_chunk)}") | |
# self.speaker_service.add_audio_stream([audio_chunk]) | |
# async def cancel(self): | |
# while not self.input_queue.empty(): | |
# await self.input_queue.get_async() | |
class SpeechToConverterActor: | |
def __init__( | |
self, | |
input_queue:Queue, | |
ffmpeg_converter_actor:FFMpegConverterActor): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.ffmpeg_converter_actor = ffmpeg_converter_actor | |
async def run(self): | |
await self.ffmpeg_converter_actor.start_process.remote() | |
self.ffmpeg_converter_actor.run.remote() | |
while True: | |
chunk_response = await self.input_queue.get_async() | |
audio_chunk = chunk_response['tts_raw_chunk'] | |
await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk) | |
async def cancel(self): | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
class RespondToPromptActor: | |
def __init__( | |
self, | |
environment_state_actor:EnvironmentStateActor, | |
out_audio_queue): | |
voice_id="2OviOUQc1JsQRQgNkVBj" | |
self.prompt_queue = Queue(maxsize=100) | |
self.llm_sentence_queue = Queue(maxsize=100) | |
self.speech_chunk_queue = Queue(maxsize=100) | |
self.environment_state_actor = environment_state_actor | |
self.ffmpeg_converter_actor = FFMpegConverterActor.remote(out_audio_queue) | |
self.prompt_to_llm = PromptToLLMActor.remote( | |
self.environment_state_actor, | |
self.prompt_queue, | |
self.llm_sentence_queue) | |
self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote( | |
self.environment_state_actor, | |
self.llm_sentence_queue, | |
self.speech_chunk_queue, | |
voice_id) | |
# self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id) | |
self.speech_output = SpeechToConverterActor.remote( | |
self.speech_chunk_queue, | |
self.ffmpeg_converter_actor) | |
# Start the pipeline components. | |
self.prompt_to_llm.run.remote() | |
self.llm_sentence_to_speech.run.remote() | |
self.speech_output.run.remote() | |
async def enqueue_prompt(self, prompt): | |
print("flush anything queued") | |
prompt_to_llm_future = self.prompt_to_llm.cancel.remote() | |
llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote() | |
speech_output_future = self.speech_output.cancel.remote() | |
ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote() | |
await asyncio.gather( | |
prompt_to_llm_future, | |
llm_sentence_to_speech_future, | |
speech_output_future, | |
ffmpeg_converter_future, | |
) | |
await self.prompt_queue.put_async(prompt) | |
print("Enqueued prompt") | |