sohojoe commited on
Commit
245e7f9
·
1 Parent(s): bd435b3

fix: close would lock

Browse files
audio_stream_processor.py CHANGED
@@ -1,22 +1,28 @@
1
  import subprocess
2
  from threading import Thread
3
- from queue import Queue
4
  from typing import Iterator
 
 
5
 
6
  class AudioStreamProcessor:
7
  def __init__(self):
8
  self.queue = Queue()
 
 
9
  self.thread = Thread(target=self._process_audio_streams)
10
  self.thread.start()
11
 
12
  def add_audio_stream(self, audio_stream: Iterator[bytes]):
13
- self.queue.put(audio_stream)
 
14
 
15
  def _process_audio_streams(self):
16
- while True:
17
- audio_stream = self.queue.get()
18
- if audio_stream is None: # We'll use None as a sentinel to mark the end
19
- break
 
20
  self._stream(audio_stream)
21
 
22
  def _stream(self, audio_stream: Iterator[bytes]):
@@ -38,5 +44,7 @@ class AudioStreamProcessor:
38
  mpv_process.wait()
39
 
40
  def close(self):
41
- self.queue.put(None) # Signal the processing thread to terminate
 
 
42
  self.thread.join()
 
1
  import subprocess
2
  from threading import Thread
3
+ from queue import Queue, Empty
4
  from typing import Iterator
5
+ import threading
6
+ import time
7
 
8
  class AudioStreamProcessor:
9
  def __init__(self):
10
  self.queue = Queue()
11
+ self._is_running = threading.Event()
12
+ self._is_running.set()
13
  self.thread = Thread(target=self._process_audio_streams)
14
  self.thread.start()
15
 
16
  def add_audio_stream(self, audio_stream: Iterator[bytes]):
17
+ if self._is_running.is_set():
18
+ self.queue.put(audio_stream)
19
 
20
  def _process_audio_streams(self):
21
+ while self._is_running.is_set() or not self.queue.empty():
22
+ try:
23
+ audio_stream = self.queue.get(timeout=1)
24
+ except Empty:
25
+ continue
26
  self._stream(audio_stream)
27
 
28
  def _stream(self, audio_stream: Iterator[bytes]):
 
44
  mpv_process.wait()
45
 
46
  def close(self):
47
+ self._is_running.clear()
48
+ while not self.queue.empty(): # Wait until the queue is empty
49
+ time.sleep(0.1)
50
  self.thread.join()
streaming_chat_service.py CHANGED
@@ -53,19 +53,19 @@ class StreamingChatService:
53
  chunk_message = chunk['choices'][0]['delta']
54
  if 'content' in chunk_message:
55
  chunk_text = chunk_message['content']
56
- print(chunk_text)
57
  current_sentence += chunk_text
58
  agent_response += chunk_text
59
  text_to_speak = self._should_we_send_to_voice(current_sentence)
60
  if text_to_speak:
61
  stream = self._speech_service.stream(text_to_speak)
62
  self._audio_processor.add_audio_stream(stream)
63
-
64
- # current_sentence should be reset to the text after the last sentence termination character
65
  current_sentence = current_sentence[len(text_to_speak):]
66
 
67
  if len(current_sentence) > 0:
68
  stream = self._speech_service.stream(current_sentence)
69
  self._audio_processor.add_audio_stream(stream)
 
70
  self._messages.append({"role": "assistant", "content": agent_response})
71
  return agent_response
 
53
  chunk_message = chunk['choices'][0]['delta']
54
  if 'content' in chunk_message:
55
  chunk_text = chunk_message['content']
56
+ # print(chunk_text)
57
  current_sentence += chunk_text
58
  agent_response += chunk_text
59
  text_to_speak = self._should_we_send_to_voice(current_sentence)
60
  if text_to_speak:
61
  stream = self._speech_service.stream(text_to_speak)
62
  self._audio_processor.add_audio_stream(stream)
63
+ print(text_to_speak)
 
64
  current_sentence = current_sentence[len(text_to_speak):]
65
 
66
  if len(current_sentence) > 0:
67
  stream = self._speech_service.stream(current_sentence)
68
  self._audio_processor.add_audio_stream(stream)
69
+ print(current_sentence)
70
  self._messages.append({"role": "assistant", "content": agent_response})
71
  return agent_response