sohojoe commited on
Commit
730fe87
1 Parent(s): 8c9e2db

created chat_pipeline

Browse files
audio_stream_processor.py CHANGED
@@ -10,6 +10,7 @@ class AudioStreamProcessor:
10
  self.queue = Queue()
11
  self._is_running = threading.Event()
12
  self._is_running.set()
 
13
  self.thread = Thread(target=self._process_audio_streams)
14
  self.thread.start()
15
 
@@ -18,30 +19,34 @@ class AudioStreamProcessor:
18
  self.queue.put(audio_stream)
19
 
20
  def _process_audio_streams(self):
 
21
  while self._is_running.is_set() or not self.queue.empty():
22
  try:
23
  audio_stream = self.queue.get(timeout=1)
24
  except Empty:
25
  continue
26
  self._stream(audio_stream)
 
27
 
28
  def _stream(self, audio_stream: Iterator[bytes]):
 
 
 
 
 
 
29
  mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
30
- mpv_process = subprocess.Popen(
31
  mpv_command,
32
  stdin=subprocess.PIPE,
33
  stdout=subprocess.DEVNULL,
34
  stderr=subprocess.DEVNULL,
35
  )
36
 
37
- for chunk in audio_stream:
38
- if chunk is not None:
39
- mpv_process.stdin.write(chunk)
40
- mpv_process.stdin.flush()
41
-
42
- if mpv_process.stdin:
43
- mpv_process.stdin.close()
44
- mpv_process.wait()
45
 
46
  def close(self):
47
  self._is_running.clear()
 
10
  self.queue = Queue()
11
  self._is_running = threading.Event()
12
  self._is_running.set()
13
+ self.mpv_process = None
14
  self.thread = Thread(target=self._process_audio_streams)
15
  self.thread.start()
16
 
 
19
  self.queue.put(audio_stream)
20
 
21
  def _process_audio_streams(self):
22
+ self._start_mpv()
23
  while self._is_running.is_set() or not self.queue.empty():
24
  try:
25
  audio_stream = self.queue.get(timeout=1)
26
  except Empty:
27
  continue
28
  self._stream(audio_stream)
29
+ self._close_mpv()
30
 
31
  def _stream(self, audio_stream: Iterator[bytes]):
32
+ for chunk in audio_stream:
33
+ if chunk is not None:
34
+ self.mpv_process.stdin.write(chunk)
35
+ self.mpv_process.stdin.flush()
36
+
37
+ def _start_mpv(self):
38
  mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
39
+ self.mpv_process = subprocess.Popen(
40
  mpv_command,
41
  stdin=subprocess.PIPE,
42
  stdout=subprocess.DEVNULL,
43
  stderr=subprocess.DEVNULL,
44
  )
45
 
46
+ def _close_mpv(self):
47
+ if self.mpv_process.stdin:
48
+ self.mpv_process.stdin.close()
49
+ self.mpv_process.wait()
 
 
 
 
50
 
51
  def close(self):
52
  self._is_running.clear()
chat_pipeline.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import time
3
+ from clip_transform import CLIPTransform
4
+ from chat_service import ChatService
5
+ from dotenv import load_dotenv
6
+ from speech_service import SpeechService
7
+ from concurrent.futures import ThreadPoolExecutor
8
+ from audio_stream_processor import AudioStreamProcessor
9
+ from streaming_chat_service import StreamingChatService
10
+ from pipeline import Pipeline, Node, Job
11
+ from typing import List
12
+
13
+ class ChatJob(Job):
14
+ def __init__(self, data, chat_service: ChatService):
15
+ super().__init__(data)
16
+ self.chat_service = chat_service
17
+
18
+ class Node1(Node):
19
+ next_id = 0
20
+
21
+ async def process_job(self, job: ChatJob):
22
+ # input job.data is the input string
23
+ # output job.data is the next sentance
24
+ async for sentence in job.chat_service.get_responses_as_sentances_async(job.data):
25
+ if job.chat_service.ignore_sentence(sentence):
26
+ continue
27
+ print(f"{sentence}")
28
+ new_job = ChatJob(sentence, job.chat_service)
29
+ new_job.id = self.next_id
30
+ self.next_id += 1
31
+ yield new_job
32
+
33
+ class Node2(Node):
34
+ next_id = 0
35
+
36
+ async def process_job(self, job: ChatJob):
37
+ # input job.data is the sentance
38
+ # output job.data is the streamed speech bytes
39
+ async for chunk in job.chat_service.get_speech_chunks_async(job.data):
40
+ new_job = ChatJob(chunk, job.chat_service)
41
+ new_job.id = self.next_id
42
+ self.next_id += 1
43
+ yield new_job
44
+
45
+
46
+ class Node3(Node):
47
+ # sync_size = 64
48
+ # sync = []
49
+
50
+ async def process_job(self, job: ChatJob):
51
+ # input job.data is the streamed speech bytes
52
+ # Node3.sync.append(job.data)
53
+ job.chat_service.enqueue_speech_bytes_to_play([job.data])
54
+ yield job
55
+ # if len(Node3.sync) >= Node3.sync_size:
56
+ # audio_chunks = Node3.sync[:Node3.sync_size]
57
+ # Node3.sync = Node3.sync[Node3.sync_size:]
58
+ # job.chat_service.enqueue_speech_bytes_to_play(audio_chunks)
59
+ # yield job
60
+
61
+ class ChatPipeline():
62
+ def __init__(self):
63
+ load_dotenv()
64
+ self.pipeline = Pipeline()
65
+ self.audio_processor = AudioStreamProcessor()
66
+ self.chat_service = StreamingChatService(self.audio_processor, voice_id="2OviOUQc1JsQRQgNkVBj") # Chales003
67
+
68
+ def __enter__(self):
69
+ return self
70
+
71
+ def __exit__(self, exc_type, exc_value, traceback):
72
+ self.audio_processor.close()
73
+ self.audio_processor = None
74
+
75
+ def __del__(self):
76
+ if self.audio_processor:
77
+ self.audio_processor.close()
78
+ self.audio_processor = None
79
+
80
+ async def start(self):
81
+ self.node1_queue = asyncio.Queue()
82
+ self.node2_queue = asyncio.Queue()
83
+ self.node3_queue = asyncio.Queue()
84
+ self.sync = []
85
+ await self.pipeline.add_node(Node1, 1, self.node1_queue, self.node2_queue, sequential_node=True)
86
+ await self.pipeline.add_node(Node2, 1, self.node2_queue, self.node3_queue, sequential_node=True)
87
+ await self.pipeline.add_node(Node3, 1, self.node3_queue, None, sequential_node=True)
88
+
89
+ async def enqueue(self, prompt):
90
+ job = ChatJob(prompt, self.chat_service)
91
+ await self.pipeline.enqueue_job(job)
92
+
93
+ async def wait_until_all_jobs_idle(self):
94
+ # TODO - implement this
95
+ while True:
96
+ await asyncio.sleep(0.1)
debug.py CHANGED
@@ -1,4 +1,6 @@
 
1
  import time
 
2
  from clip_transform import CLIPTransform
3
  from chat_service import ChatService
4
  from dotenv import load_dotenv
@@ -125,8 +127,29 @@ def run_debug_code():
125
  audio_processor.close()
126
  print ("Chat success")
127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
 
129
  if __name__ == '__main__':
130
  # time_sentance_lenghts()
131
  # test_sentance_lenghts()
132
- run_debug_code()
 
 
 
1
+ import asyncio
2
  import time
3
+ from chat_pipeline import ChatPipeline
4
  from clip_transform import CLIPTransform
5
  from chat_service import ChatService
6
  from dotenv import load_dotenv
 
127
  audio_processor.close()
128
  print ("Chat success")
129
 
130
+ async def run_pipeline():
131
+ load_dotenv()
132
+
133
+ try:
134
+ chat_pipeline = ChatPipeline()
135
+ await chat_pipeline.start()
136
+ prompts = [
137
+ "hello, how are you today?",
138
+ "tell me about your shadow self?",
139
+ "hmm, interesting, tell me more about that.",
140
+ "wait, that is so interesting, what else?",
141
+ ]
142
+ for prompt in prompts:
143
+ await chat_pipeline.enqueue(prompt)
144
+ await chat_pipeline.wait_until_all_jobs_idle()
145
+ except KeyboardInterrupt:
146
+ print("Pipeline interrupted by user")
147
+ except Exception as e:
148
+ print(f"An error occurred: {e}")
149
 
150
  if __name__ == '__main__':
151
  # time_sentance_lenghts()
152
  # test_sentance_lenghts()
153
+ # run_debug_code()
154
+ asyncio.run(run_pipeline())
155
+
pipeline.py CHANGED
@@ -1,4 +1,5 @@
1
  import asyncio
 
2
 
3
  class Job:
4
  def __init__(self, data):
@@ -23,28 +24,33 @@ class Node:
23
  raise ValueError('job_sync is not None and sequential_node is False')
24
 
25
  async def run(self):
26
- while True:
27
- job: Job = await self.input_queue.get()
28
- self._jobs_dequeued += 1
29
- if self.sequential_node == False:
30
- async for job in self.process_job(job):
31
- if self.output_queue is not None:
32
- await self.output_queue.put(job)
33
- if self.job_sync is not None:
34
- self.job_sync.append(job)
35
- self._jobs_processed += 1
36
- else:
37
- # ensure that jobs are processed in order
38
- self.buffer[job.id] = job
39
- while self.next_i in self.buffer:
40
- job = self.buffer.pop(self.next_i)
41
  async for job in self.process_job(job):
42
  if self.output_queue is not None:
43
  await self.output_queue.put(job)
44
  if self.job_sync is not None:
45
  self.job_sync.append(job)
 
 
 
 
 
 
 
 
 
 
 
46
  self._jobs_processed += 1
47
- self.next_i += 1
 
 
 
 
48
 
49
  async def process_job(self, job: Job):
50
  raise NotImplementedError()
@@ -79,7 +85,7 @@ class Pipeline:
79
  self.nodes.append(node_name)
80
 
81
  # if input_queue is None then this is the root node
82
- if len(self.input_queues) is 0:
83
  self.root_queue = input_queue
84
 
85
  self.input_queues.append(input_queue)
 
1
  import asyncio
2
+ import traceback
3
 
4
  class Job:
5
  def __init__(self, data):
 
24
  raise ValueError('job_sync is not None and sequential_node is False')
25
 
26
  async def run(self):
27
+ try:
28
+ while True:
29
+ job: Job = await self.input_queue.get()
30
+ self._jobs_dequeued += 1
31
+ if self.sequential_node == False:
 
 
 
 
 
 
 
 
 
 
32
  async for job in self.process_job(job):
33
  if self.output_queue is not None:
34
  await self.output_queue.put(job)
35
  if self.job_sync is not None:
36
  self.job_sync.append(job)
37
+ self._jobs_processed += 1
38
+ else:
39
+ # ensure that jobs are processed in order
40
+ self.buffer[job.id] = job
41
+ while self.next_i in self.buffer:
42
+ job = self.buffer.pop(self.next_i)
43
+ async for job in self.process_job(job):
44
+ if self.output_queue is not None:
45
+ await self.output_queue.put(job)
46
+ if self.job_sync is not None:
47
+ self.job_sync.append(job)
48
  self._jobs_processed += 1
49
+ self.next_i += 1
50
+ except Exception as e:
51
+ print(f"An error occurred in node: {self.__class__.__name__} worker: {self.worker_id}: {e}")
52
+ traceback.print_exc()
53
+ raise # Re-raises the last exception.
54
 
55
  async def process_job(self, job: Job):
56
  raise NotImplementedError()
 
85
  self.nodes.append(node_name)
86
 
87
  # if input_queue is None then this is the root node
88
+ if len(self.input_queues) == 0:
89
  self.root_queue = input_queue
90
 
91
  self.input_queues.append(input_queue)
speech_service.py CHANGED
@@ -38,7 +38,8 @@ class SpeechService:
38
  text=prompt,
39
  voice=self._voice_id,
40
  model=self._model_id,
41
- stream=True
 
42
  )
43
  return audio_stream
44
 
 
38
  text=prompt,
39
  voice=self._voice_id,
40
  model=self._model_id,
41
+ stream_chunk_size=2048,
42
+ stream=True,
43
  )
44
  return audio_stream
45
 
streaming_chat_service.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import json
2
  import os
3
  import torch
@@ -51,14 +53,19 @@ class StreamingChatService:
51
  text_to_speak = sentence[:last_termination_index+1]
52
  return text_to_speak
53
 
54
- def _safe_enqueue_text_to_speak(self, text_to_speak):
55
  # exit if empty, white space or an single breaket
56
  if text_to_speak.isspace():
57
- return
58
  # exit if not letters or numbers
59
  has_letters = any(char.isalpha() for char in text_to_speak)
60
  has_numbers = any(char.isdigit() for char in text_to_speak)
61
  if not has_letters and not has_numbers:
 
 
 
 
 
62
  return
63
  stream = self._speech_service.stream(text_to_speak)
64
  self._audio_processor.add_audio_stream(stream)
@@ -93,3 +100,47 @@ class StreamingChatService:
93
  print(current_sentence)
94
  self._messages.append({"role": "assistant", "content": agent_response})
95
  return agent_response
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import itertools
3
  import json
4
  import os
5
  import torch
 
53
  text_to_speak = sentence[:last_termination_index+1]
54
  return text_to_speak
55
 
56
+ def ignore_sentence(self, text_to_speak):
57
  # exit if empty, white space or an single breaket
58
  if text_to_speak.isspace():
59
+ return True
60
  # exit if not letters or numbers
61
  has_letters = any(char.isalpha() for char in text_to_speak)
62
  has_numbers = any(char.isdigit() for char in text_to_speak)
63
  if not has_letters and not has_numbers:
64
+ return True
65
+ return False
66
+
67
+ def _safe_enqueue_text_to_speak(self, text_to_speak):
68
+ if self.ignore_sentence(text_to_speak):
69
  return
70
  stream = self._speech_service.stream(text_to_speak)
71
  self._audio_processor.add_audio_stream(stream)
 
100
  print(current_sentence)
101
  self._messages.append({"role": "assistant", "content": agent_response})
102
  return agent_response
103
+
104
+ async def get_responses_as_sentances_async(self, prompt):
105
+ self._messages.append({"role": "user", "content": prompt})
106
+ agent_response = ""
107
+ current_sentence = ""
108
+
109
+ response = await openai.ChatCompletion.acreate(
110
+ model=self._model_id,
111
+ messages=self._messages,
112
+ temperature=1.0, # use 1.0 for debugging/deterministic results
113
+ stream=True
114
+ )
115
+
116
+ async for chunk in response:
117
+ chunk_message = chunk['choices'][0]['delta']
118
+ if 'content' in chunk_message:
119
+ chunk_text = chunk_message['content']
120
+ current_sentence += chunk_text
121
+ agent_response += chunk_text
122
+ text_to_speak = self._should_we_send_to_voice(current_sentence)
123
+ if text_to_speak:
124
+ yield text_to_speak
125
+ current_sentence = current_sentence[len(text_to_speak):]
126
+
127
+ if len(current_sentence) > 0:
128
+ yield current_sentence
129
+ self._messages.append({"role": "assistant", "content": agent_response})
130
+
131
+ async def get_speech_chunks_async(self, text_to_speak):
132
+ stream = self._speech_service.stream(text_to_speak)
133
+ stream, stream_backup = itertools.tee(stream)
134
+ while True:
135
+ # Check if there's a next item in the stream
136
+ next_item = next(stream_backup, None)
137
+ if next_item is None:
138
+ # Stream is exhausted, exit the loop
139
+ break
140
+
141
+ # Run next(stream) in a separate thread to avoid blocking the event loop
142
+ chunk = await asyncio.to_thread(next, stream)
143
+ yield chunk
144
+
145
+ def enqueue_speech_bytes_to_play(self, speech_bytes):
146
+ self._audio_processor.add_audio_stream(speech_bytes)