Spaces:
Sleeping
Sleeping
import ray | |
from ray.util.queue import Queue | |
from dotenv import load_dotenv | |
from audio_stream_processor import AudioStreamProcessor | |
from streaming_chat_service import StreamingChatService | |
import asyncio | |
# from ray.actor import ActorHandle | |
class PromptToLLMActor: | |
def __init__(self, input_queue, output_queue, voice_id): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.output_queue = output_queue | |
self.audio_processor = AudioStreamProcessor() | |
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) | |
self.cancel_event = None | |
async def run(self): | |
while True: | |
prompt = await self.input_queue.get_async() | |
self.cancel_event = asyncio.Event() | |
async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event): | |
if self.chat_service.ignore_sentence(sentence): | |
continue | |
print(f"{sentence}") | |
await self.output_queue.put_async(sentence) | |
async def cancel(self): | |
if self.cancel_event: | |
self.cancel_event.set() | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
while not self.output_queue.empty(): | |
await self.output_queue.get_async() | |
class LLMSentanceToSpeechActor: | |
def __init__(self, input_queue, output_queue, voice_id): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.output_queue = output_queue | |
self.audio_processor = AudioStreamProcessor() | |
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) | |
self.cancel_event = None | |
async def run(self): | |
while True: | |
sentance = await self.input_queue.get_async() | |
self.cancel_event = asyncio.Event() | |
async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event): | |
await self.output_queue.put_async(chunk) | |
async def cancel(self): | |
if self.cancel_event: | |
self.cancel_event.set() | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
while not self.output_queue.empty(): | |
await self.output_queue.get_async() | |
class SpeechToSpeakerActor: | |
def __init__(self, input_queue, voice_id): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.audio_processor = AudioStreamProcessor() | |
self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id) | |
async def run(self): | |
while True: | |
audio_chunk = await self.input_queue.get_async() | |
# print (f"Got audio chunk {len(audio_chunk)}") | |
self.chat_service.enqueue_speech_bytes_to_play([audio_chunk]) | |
async def cancel(self): | |
while not self.input_queue.empty(): | |
await self.input_queue.get_async() | |
class SpeechToConverterActor: | |
def __init__(self, input_queue, ffmpeg_converter_actor): | |
load_dotenv() | |
self.input_queue = input_queue | |
self.ffmpeg_converter_actor = ffmpeg_converter_actor | |
async def run(self): | |
while True: | |
audio_chunk = await self.input_queue.get_async() | |
# print (f"Got audio chunk {len(audio_chunk)}") | |
await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk) | |
def cancel(self): | |
while not self.input_queue.empty(): | |
self.input_queue.get() | |
class RespondToPromptActor: | |
def __init__(self, ffmpeg_converter_actor): | |
voice_id="2OviOUQc1JsQRQgNkVBj" | |
self.prompt_queue = Queue(maxsize=100) | |
self.llm_sentence_queue = Queue(maxsize=100) | |
self.speech_chunk_queue = Queue(maxsize=100) | |
self.ffmepg_converter_actor = ffmpeg_converter_actor | |
self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id) | |
self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id) | |
# self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id) | |
self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, ffmpeg_converter_actor) | |
# Start the pipeline components. | |
self.prompt_to_llm.run.remote() | |
self.llm_sentence_to_speech.run.remote() | |
self.speech_output.run.remote() | |
async def enqueue_prompt(self, prompt): | |
print("flush anything queued") | |
prompt_to_llm_future = self.prompt_to_llm.cancel.remote() | |
llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote() | |
speech_to_speaker_future = self.speech_output.cancel.remote() | |
ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote() | |
await asyncio.gather( | |
prompt_to_llm_future, | |
llm_sentence_to_speech_future, | |
speech_to_speaker_future, | |
ffmpeg_converter_future, | |
) | |
self.prompt_queue.put(prompt) | |
print("Enqueued prompt") | |