File size: 4,971 Bytes
bcea2ea
 
 
 
 
afc1e50
bcea2ea
 
 
 
 
 
 
 
 
 
afc1e50
bcea2ea
 
 
afc1e50
 
 
bcea2ea
 
 
afc1e50
 
 
 
 
 
 
 
 
bcea2ea
 
 
 
 
 
 
 
 
afc1e50
bcea2ea
 
 
afc1e50
 
 
 
 
 
 
 
 
 
 
 
 
bcea2ea
 
 
 
 
 
 
 
 
 
 
afc1e50
d91a673
bcea2ea
afc1e50
 
 
 
bcea2ea
d91a673
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcea2ea
 
d91a673
bcea2ea
 
 
 
 
 
 
d91a673
 
bcea2ea
 
 
 
d91a673
bcea2ea
 
afc1e50
 
 
d91a673
afc1e50
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import ray
from ray.util.queue import Queue
from dotenv import load_dotenv
from audio_stream_processor import AudioStreamProcessor
from streaming_chat_service import StreamingChatService
import asyncio
# from ray.actor import ActorHandle

@ray.remote
class PromptToLLMActor:
    def __init__(self, input_queue, output_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
        self.cancel_event = None

    async def run(self):
        while True:
            prompt = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
                if self.chat_service.ignore_sentence(sentence):
                    continue
                print(f"{sentence}")
                await self.output_queue.put_async(sentence)
                
    def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            self.input_queue.get()
        while not self.output_queue.empty():
            self.output_queue.get()

@ray.remote
class LLMSentanceToSpeechActor:
    def __init__(self, input_queue, output_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
        self.cancel_event = None

    async def run(self):
        while True:
            sentance = await self.input_queue.get_async()
            self.cancel_event = asyncio.Event()
            async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
                await self.output_queue.put_async(chunk)

    def cancel(self):
        if self.cancel_event:
            self.cancel_event.set()
        while not self.input_queue.empty():
            self.input_queue.get()
        while not self.output_queue.empty():
            self.output_queue.get()


@ray.remote
class SpeechToSpeakerActor:
    def __init__(self, input_queue, voice_id):
        load_dotenv()
        self.input_queue = input_queue
        self.audio_processor = AudioStreamProcessor()
        self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)

    async def run(self):
        while True:
            audio_chunk = await self.input_queue.get_async()
            # print (f"Got audio chunk {len(audio_chunk)}")
            self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
            
    def cancel(self):
        while not self.input_queue.empty():
            self.input_queue.get()            

@ray.remote
class SpeechToConverterActor:
    def __init__(self, input_queue, ffmpeg_converter_actor):
        load_dotenv()
        self.input_queue = input_queue
        self.ffmpeg_converter_actor = ffmpeg_converter_actor

    async def run(self):
        while True:
            audio_chunk = await self.input_queue.get_async()
            # print (f"Got audio chunk {len(audio_chunk)}")
            await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
            
    def cancel(self):
        while not self.input_queue.empty():
            self.input_queue.get()            


@ray.remote
class RespondToPromptActor:
    def __init__(self, ffmpeg_converter_actor):
        voice_id="2OviOUQc1JsQRQgNkVBj"
        self.prompt_queue = Queue(maxsize=100)
        self.llm_sentence_queue = Queue(maxsize=100)
        self.speech_chunk_queue = Queue(maxsize=100)
        
        self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
        self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
        # self.speech_output = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
        self.speech_output = SpeechToConverterActor.remote(self.speech_chunk_queue, ffmpeg_converter_actor)

        # Start the pipeline components.
        self.prompt_to_llm.run.remote()
        self.llm_sentence_to_speech.run.remote()
        self.speech_output.run.remote()
            
    def enqueue_prompt(self, prompt):
        print("flush anything queued")
        prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
        llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
        speech_to_speaker_future = self.speech_output.cancel.remote()
        ray.get([
            prompt_to_llm_future,
            llm_sentence_to_speech_future,
            speech_to_speaker_future,
        ])
        self.prompt_queue.put(prompt)
        print("Enqueued prompt")