Spaces:
Sleeping
Sleeping
import asyncio | |
import time | |
from clip_transform import CLIPTransform | |
from chat_service import ChatService | |
from dotenv import load_dotenv | |
from speech_service import SpeechService | |
from concurrent.futures import ThreadPoolExecutor | |
from audio_stream_processor import AudioStreamProcessor | |
from streaming_chat_service import StreamingChatService | |
from pipeline import Pipeline, Node, Job | |
from typing import List | |
class ChatJob(Job): | |
def __init__(self, data, chat_service: ChatService): | |
super().__init__(data) | |
self.chat_service = chat_service | |
class Node1(Node): | |
next_id = 0 | |
async def process_job(self, job: ChatJob): | |
# input job.data is the input string | |
# output job.data is the next sentance | |
async for sentence in job.chat_service.get_responses_as_sentances_async(job.data): | |
if job.chat_service.ignore_sentence(sentence): | |
continue | |
print(f"{sentence}") | |
new_job = ChatJob(sentence, job.chat_service) | |
new_job.id = self.next_id | |
self.next_id += 1 | |
yield new_job | |
class Node2(Node): | |
next_id = 0 | |
async def process_job(self, job: ChatJob): | |
# input job.data is the sentance | |
# output job.data is the streamed speech bytes | |
async for chunk in job.chat_service.get_speech_chunks_async(job.data): | |
new_job = ChatJob(chunk, job.chat_service) | |
new_job.id = self.next_id | |
self.next_id += 1 | |
yield new_job | |
class Node3(Node): | |
# sync_size = 64 | |
# sync = [] | |
async def process_job(self, job: ChatJob): | |
# input job.data is the streamed speech bytes | |
# Node3.sync.append(job.data) | |
job.chat_service.enqueue_speech_bytes_to_play([job.data]) | |
yield job | |
# if len(Node3.sync) >= Node3.sync_size: | |
# audio_chunks = Node3.sync[:Node3.sync_size] | |
# Node3.sync = Node3.sync[Node3.sync_size:] | |
# job.chat_service.enqueue_speech_bytes_to_play(audio_chunks) | |
# yield job | |
class ChatPipeline(): | |
def __init__(self): | |
load_dotenv() | |
self.pipeline = Pipeline() | |
self.audio_processor = AudioStreamProcessor() | |
self.chat_service = StreamingChatService(self.audio_processor, voice_id="2OviOUQc1JsQRQgNkVBj") # Chales003 | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.audio_processor.close() | |
self.audio_processor = None | |
def __del__(self): | |
if self.audio_processor: | |
self.audio_processor.close() | |
self.audio_processor = None | |
async def start(self): | |
self.node1_queue = asyncio.Queue() | |
self.node2_queue = asyncio.Queue() | |
self.node3_queue = asyncio.Queue() | |
self.sync = [] | |
await self.pipeline.add_node(Node1, 1, self.node1_queue, self.node2_queue, sequential_node=True) | |
await self.pipeline.add_node(Node2, 1, self.node2_queue, self.node3_queue, sequential_node=True) | |
await self.pipeline.add_node(Node3, 1, self.node3_queue, None, sequential_node=True) | |
async def enqueue(self, prompt): | |
job = ChatJob(prompt, self.chat_service) | |
await self.pipeline.enqueue_job(job) | |
async def wait_until_all_jobs_idle(self): | |
# TODO - implement this | |
while True: | |
await asyncio.sleep(0.1) | |