Spaces:
Runtime error
Runtime error
from asyncio import Queue, TaskGroup | |
import asyncio | |
from contextlib import asynccontextmanager | |
import ray | |
from chat_service import ChatService | |
# from local_speaker_service import LocalSpeakerService | |
from text_to_speech_service import TextToSpeechService | |
from environment_state_actor import EnvironmentStateActor | |
from ffmpeg_converter_actor import FFMpegConverterActor | |
from agent_response import AgentResponse | |
import json | |
from asyncio import Semaphore | |
class RespondToPromptAsync: | |
def __init__( | |
self, | |
environment_state_actor:EnvironmentStateActor, | |
audio_output_queue): | |
voice_id="2OviOUQc1JsQRQgNkVBj" | |
self.prompt_queue = Queue(maxsize=100) | |
self.llm_sentence_queue = Queue(maxsize=100) | |
self.speech_chunk_queue = Queue(maxsize=100) | |
self.voice_id = voice_id | |
self.audio_output_queue = audio_output_queue | |
self.environment_state_actor = environment_state_actor | |
self.processing_semaphore = Semaphore(1) | |
self.sentence_queues = [] | |
self.sentence_tasks = [] | |
# self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue) | |
async def enqueue_prompt(self, prompt): | |
# Reset queues and services | |
# print("flush anything queued") | |
# self.prompt_queue = Queue(maxsize=100) | |
# self.llm_sentence_queue = Queue(maxsize=100) | |
# self.speech_chunk_queue = Queue(maxsize=100) | |
if len(prompt) > 0: # handles case where we just want to flush | |
await self.prompt_queue.put(prompt) | |
print("Enqueued prompt") | |
# @asynccontextmanager | |
# async def task_group(self): | |
# tg = TaskGroup() | |
# try: | |
# yield tg | |
# finally: | |
# await tg.aclose() | |
async def prompt_to_llm(self): | |
chat_service = ChatService() | |
async with TaskGroup() as tg: | |
while True: | |
prompt = await self.prompt_queue.get() | |
agent_response = AgentResponse(prompt) | |
async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(prompt): | |
if chat_service.ignore_sentence(text): | |
is_complete_sentance = False | |
if not is_complete_sentance: | |
agent_response['llm_preview'] = text | |
await self.environment_state_actor.set_llm_preview.remote(text) | |
continue | |
agent_response['llm_preview'] = '' | |
agent_response['llm_sentence'] = text | |
agent_response['llm_sentences'].append(text) | |
await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text) | |
print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}") | |
sentence_response = agent_response.make_copy() | |
new_queue = Queue() | |
self.sentence_queues.append(new_queue) | |
task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue)) | |
self.sentence_tasks.append(task) | |
agent_response['llm_sentence_id'] += 1 | |
async def llm_sentence_to_speech(self, sentence_response, output_queue): | |
tts_service = TextToSpeechService(self.voice_id) | |
chunk_count = 0 | |
async for chunk_response in tts_service.get_speech_chunks_async(sentence_response): | |
chunk_response = chunk_response.make_copy() | |
# await self.output_queue.put_async(chunk_response) | |
await output_queue.put(chunk_response) | |
chunk_response = { | |
'prompt': sentence_response['prompt'], | |
'llm_sentence_id': sentence_response['llm_sentence_id'], | |
'chunk_count': chunk_count, | |
} | |
chunk_id_json = json.dumps(chunk_response) | |
await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json) | |
chunk_count += 1 | |
async def speech_to_converter(self): | |
self.ffmpeg_converter_actor = FFMpegConverterActor.remote(self.audio_output_queue) | |
await self.ffmpeg_converter_actor.start_process.remote() | |
self.ffmpeg_converter_actor.run.remote() | |
while True: | |
for i, task in enumerate(self.sentence_tasks): | |
# Skip this task/queue pair if task completed | |
queue = self.sentence_queues[i] | |
if task.done() and queue.empty(): | |
continue | |
while not queue.empty(): | |
chunk_response = await queue.get() | |
audio_chunk_ref = chunk_response['tts_raw_chunk_ref'] | |
audio_chunk = ray.get(audio_chunk_ref) | |
await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk) | |
break | |
await asyncio.sleep(0.01) | |
async def run(self): | |
async with TaskGroup() as tg: # Use asyncio's built-in TaskGroup | |
tg.create_task(self.prompt_to_llm()) | |
tg.create_task(self.speech_to_converter()) | |