File size: 7,593 Bytes
bcea2ea
 
 
1a63d97
 
 
afc1e50
bcea2ea
9b7ec10
87dcd10
149eeaf
 
bcea2ea
 
 
149eeaf
 
 
 
 
bcea2ea
 
 
1a63d97
afc1e50
149eeaf
bcea2ea
 
 
afc1e50
 
87dcd10
149eeaf
 
 
 
 
 
bcea2ea
149eeaf
 
 
 
 
 
87dcd10
149eeaf
afc1e50
df0ea75
afc1e50
 
 
df0ea75
afc1e50
df0ea75
bcea2ea
 
 
149eeaf
 
 
 
 
 
bcea2ea
 
 
1a63d97
afc1e50
149eeaf
bcea2ea
 
 
87dcd10
afc1e50
149eeaf
87dcd10
 
 
149eeaf
 
 
 
 
 
 
 
afc1e50
df0ea75
afc1e50
 
 
df0ea75
afc1e50
df0ea75
afc1e50
bcea2ea
b6ba8eb
 
 
 
 
 
 
bcea2ea
b6ba8eb
 
 
 
 
afc1e50
b6ba8eb
 
 
bcea2ea
d91a673
 
149eeaf
 
 
 
d91a673
 
 
 
 
b6ba8eb
 
d91a673
87dcd10
cf5e7f4
 
d91a673
 
0d27fd9
d91a673
0d27fd9
d91a673
 
bcea2ea
 
149eeaf
 
 
aec6f97
bcea2ea
 
 
 
149eeaf
b6ba8eb
aec6f97
bcea2ea
149eeaf
 
 
 
 
 
 
 
 
d91a673
149eeaf
 
 
bcea2ea
 
 
 
d91a673
bcea2ea
df0ea75
afc1e50
 
 
0d27fd9
b6ba8eb
df0ea75
afc1e50
 
0d27fd9
ae52b65
df0ea75
795c382
 
afc1e50
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from local_speaker_service import LocalSpeakerService
from text_to_speech_service import TextToSpeechService
from chat_service import ChatService
import asyncio
# from ray.actor import ActorHandle
from ffmpeg_converter_actor import FFMpegConverterActor
from agent_response import AgentResponse
from environment_state_actor import EnvironmentStateActor
import json

@ray.remote
class PromptToLLMActor:
    def __init__(
            self, 
            environment_state_actor:EnvironmentStateActor, 
            input_queue:Queue,
            output_queue:Queue):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.chat_service = ChatService()
        self.cancel_event = None
        self.environment_state_actor = environment_state_actor

    async def run(self):
        while True:
            prompt = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            agent_response = AgentResponse(prompt)
            async for text, is_complete_sentance in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
                if self.chat_service.ignore_sentence(text):
                    is_complete_sentance = False
                if not is_complete_sentance:
                    agent_response['llm_preview'] = text
                    await self.environment_state_actor.set_llm_preview.remote(text)
                    continue
                agent_response['llm_preview'] = ''
                agent_response['llm_sentence'] = text
                agent_response['llm_sentences'].append(text)
                await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
                print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
                sentence_response = agent_response.make_copy()
                await self.output_queue.put_async(sentence_response)
                agent_response['llm_sentence_id'] += 1
                
    async def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            await self.input_queue.get_async()
        while not self.output_queue.empty():
            await self.output_queue.get_async()

@ray.remote
class LLMSentanceToSpeechActor:
    def __init__(
            self,
            environment_state_actor:EnvironmentStateActor, 
            input_queue, 
            output_queue, 
            voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.tts_service = TextToSpeechService(voice_id=voice_id)
        self.cancel_event = None
        self.environment_state_actor = environment_state_actor

    async def run(self):
        while True:
            sentence_response = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            chunk_count = 0
            async for chunk_response in self.tts_service.get_speech_chunks_async(sentence_response, self.cancel_event):
                chunk_response = chunk_response.make_copy()
                await self.output_queue.put_async(chunk_response)
                chunk_response = {
                    'prompt': sentence_response['prompt'],
                    'llm_sentence_id': sentence_response['llm_sentence_id'],
                    'chunk_count': chunk_count,
                }
                chunk_id_json = json.dumps(chunk_response)
                await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
                chunk_count += 1

    async def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            await self.input_queue.get_async()
        while not self.output_queue.empty():
            await self.output_queue.get_async()


# legacy code for playing from local speaker
# @ray.remote
# class SpeechToSpeakerActor:
#     def __init__(self, input_queue, voice_id):
#         load_dotenv()
#         self.input_queue = input_queue
#         self.speaker_service = LocalSpeakerService()

#     async def run(self):
#         while True:
#             audio_chunk = await self.input_queue.get_async()
#             # print (f"Got audio chunk {len(audio_chunk)}")
#             self.speaker_service.add_audio_stream([audio_chunk])
            
#     async def cancel(self):
#         while not self.input_queue.empty():
#             await self.input_queue.get_async()            

@ray.remote
class SpeechToConverterActor:
    def __init__(
            self,
            input_queue:Queue,
            ffmpeg_converter_actor:FFMpegConverterActor):
        load_dotenv()
        self.input_queue = input_queue
        self.ffmpeg_converter_actor = ffmpeg_converter_actor

    async def run(self):
        await self.ffmpeg_converter_actor.start_process.remote()
        self.ffmpeg_converter_actor.run.remote()
        while True:
            chunk_response = await self.input_queue.get_async()
            audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
            audio_chunk = ray.get(audio_chunk_ref)
            await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
            
    async def cancel(self):
        while not self.input_queue.empty():
            await self.input_queue.get_async()


@ray.remote
class RespondToPromptActor:
    def __init__(
            self, 
            environment_state_actor:EnvironmentStateActor, 
            audio_output_queue):
        voice_id="2OviOUQc1JsQRQgNkVBj"
        self.prompt_queue = Queue(maxsize=100)
        self.llm_sentence_queue = Queue(maxsize=100)
        self.speech_chunk_queue = Queue(maxsize=100)
        self.environment_state_actor = environment_state_actor

        self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
        
        self.prompt_to_llm = PromptToLLMActor.remote(
            self.environment_state_actor,
            self.prompt_queue, 
            self.llm_sentence_queue)
        self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(
            self.environment_state_actor,
            self.llm_sentence_queue, 
            self.speech_chunk_queue, 
            voice_id)
        # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
        self.speech_output = SpeechToConverterActor.remote(
            self.speech_chunk_queue, 
            self.ffmpeg_converter_actor)

        # Start the pipeline components.
        self.prompt_to_llm.run.remote()
        self.llm_sentence_to_speech.run.remote()
        self.speech_output.run.remote()
            
    async def enqueue_prompt(self, prompt):
        print("flush anything queued")
        prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
        llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
        speech_output_future = self.speech_output.cancel.remote()
        ffmpeg_converter_future = self.ffmpeg_converter_actor.flush_output_queue.remote()
        await asyncio.gather(
            prompt_to_llm_future,
            llm_sentence_to_speech_future,
            speech_output_future,
            ffmpeg_converter_future,
        )
        if len(prompt) > 0: # handles case where we just want to flush
            await self.prompt_queue.put_async(prompt)
        print("Enqueued prompt")