File size: 5,209 Bytes
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
814feb3
 
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from asyncio import Queue, TaskGroup
import asyncio
from contextlib import asynccontextmanager

import ray
from chat_service import ChatService
# from local_speaker_service import LocalSpeakerService
from text_to_speech_service import TextToSpeechService
from environment_state_actor import EnvironmentStateActor
from ffmpeg_converter_actor import FFMpegConverterActor
from agent_response import AgentResponse
import json
from asyncio import Semaphore

class RespondToPromptAsync:
    def __init__(
            self, 
            environment_state_actor:EnvironmentStateActor, 
            audio_output_queue):
        voice_id="2OviOUQc1JsQRQgNkVBj"
        self.prompt_queue = Queue(maxsize=100)
        self.llm_sentence_queue = Queue(maxsize=100)
        self.speech_chunk_queue = Queue(maxsize=100)
        self.voice_id = voice_id
        self.audio_output_queue = audio_output_queue
        self.environment_state_actor = environment_state_actor
        self.processing_semaphore = Semaphore(1)
        self.sentence_queues = []
        self.sentence_tasks = []    
        # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)

    async def enqueue_prompt(self, prompt):
        # Reset queues and services
        # print("flush anything queued")
        # self.prompt_queue = Queue(maxsize=100)
        # self.llm_sentence_queue = Queue(maxsize=100)
        # self.speech_chunk_queue = Queue(maxsize=100)

        if len(prompt) > 0:  # handles case where we just want to flush
            await self.prompt_queue.put(prompt)
        print("Enqueued prompt")

    # @asynccontextmanager
    # async def task_group(self):
    #     tg = TaskGroup()
    #     try:
    #         yield tg
    #     finally:
    #         await tg.aclose()

    async def prompt_to_llm(self):
        chat_service = ChatService()

        async with TaskGroup() as tg:
            while True:
                prompt = await self.prompt_queue.get()
                agent_response = AgentResponse(prompt)
                async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(prompt):
                    if chat_service.ignore_sentence(text):
                        is_complete_sentance = False
                    if not is_complete_sentance:
                        agent_response['llm_preview'] = text
                        await self.environment_state_actor.set_llm_preview.remote(text)
                        continue
                    agent_response['llm_preview'] = ''
                    agent_response['llm_sentence'] = text
                    agent_response['llm_sentences'].append(text)
                    await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
                    print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
                    sentence_response = agent_response.make_copy()
                    new_queue = Queue()
                    self.sentence_queues.append(new_queue)
                    task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue))
                    self.sentence_tasks.append(task)
                    agent_response['llm_sentence_id'] += 1


    async def llm_sentence_to_speech(self, sentence_response, output_queue):
        tts_service = TextToSpeechService(self.voice_id)
        
        chunk_count = 0
        async for chunk_response in tts_service.get_speech_chunks_async(sentence_response):
            chunk_response = chunk_response.make_copy()
            # await self.output_queue.put_async(chunk_response)
            await output_queue.put(chunk_response)
            chunk_response = {
                'prompt': sentence_response['prompt'],
                'llm_sentence_id': sentence_response['llm_sentence_id'],
                'chunk_count': chunk_count,
            }
            chunk_id_json = json.dumps(chunk_response)
            await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
            chunk_count += 1

    async def speech_to_converter(self):
        self.ffmpeg_converter_actor = FFMpegConverterActor.remote(self.audio_output_queue)
        await self.ffmpeg_converter_actor.start_process.remote()
        self.ffmpeg_converter_actor.run.remote()
        
        while True:
            for i, task in enumerate(self.sentence_tasks):
                # Skip this task/queue pair if task completed
                queue = self.sentence_queues[i]
                if task.done() and queue.empty():
                    continue 
                while not queue.empty():
                    chunk_response = await queue.get()
                    audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
                    audio_chunk = ray.get(audio_chunk_ref)
                    await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
                break

            await asyncio.sleep(0.01) 

    async def run(self):
        async with TaskGroup() as tg:  # Use asyncio's built-in TaskGroup
            tg.create_task(self.prompt_to_llm())
            tg.create_task(self.speech_to_converter())