sohojoe commited on
Commit
0d27fd9
1 Parent(s): 572f2e6

add missing async / await

Browse files
respond_to_prompt_actor.py CHANGED
@@ -9,7 +9,7 @@ import asyncio
9
 
10
  @ray.remote
11
  class PromptToLLMActor:
12
- def __init__(self, input_queue, output_queue):
13
  load_dotenv()
14
  self.input_queue = input_queue
15
  self.output_queue = output_queue
@@ -78,7 +78,7 @@ class SpeechToSpeakerActor:
78
 
79
  @ray.remote
80
  class SpeechToConverterActor:
81
- def __init__(self, input_queue, ffmpeg_converter_actor):
82
  load_dotenv()
83
  self.input_queue = input_queue
84
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
@@ -89,9 +89,9 @@ class SpeechToConverterActor:
89
  # print (f"Got audio chunk {len(audio_chunk)}")
90
  await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
91
 
92
- def cancel(self):
93
  while not self.input_queue.empty():
94
- self.input_queue.get()
95
 
96
 
97
  @ray.remote
@@ -117,14 +117,14 @@ class RespondToPromptActor:
117
  print("flush anything queued")
118
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
119
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
120
- speech_to_speaker_future = self.speech_output.cancel.remote()
121
  ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
122
  await asyncio.gather(
123
  prompt_to_llm_future,
124
  llm_sentence_to_speech_future,
125
- speech_to_speaker_future,
126
  ffmpeg_converter_future,
127
  )
128
- self.prompt_queue.put(prompt)
129
  print("Enqueued prompt")
130
 
 
9
 
10
  @ray.remote
11
  class PromptToLLMActor:
12
+ def __init__(self, input_queue:Queue, output_queue:Queue):
13
  load_dotenv()
14
  self.input_queue = input_queue
15
  self.output_queue = output_queue
 
78
 
79
  @ray.remote
80
  class SpeechToConverterActor:
81
+ def __init__(self, input_queue:Queue, ffmpeg_converter_actor:Queue):
82
  load_dotenv()
83
  self.input_queue = input_queue
84
  self.ffmpeg_converter_actor = ffmpeg_converter_actor
 
89
  # print (f"Got audio chunk {len(audio_chunk)}")
90
  await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
91
 
92
+ async def cancel(self):
93
  while not self.input_queue.empty():
94
+ await self.input_queue.get_async()
95
 
96
 
97
  @ray.remote
 
117
  print("flush anything queued")
118
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
119
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
120
+ speech_output_future = self.speech_output.cancel.remote()
121
  ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
122
  await asyncio.gather(
123
  prompt_to_llm_future,
124
  llm_sentence_to_speech_future,
125
+ speech_output_future,
126
  ffmpeg_converter_future,
127
  )
128
+ await self.prompt_queue.put_async(prompt)
129
  print("Enqueued prompt")
130
 
streamlit_av_queue.py CHANGED
@@ -27,7 +27,7 @@ class StreamlitAVQueue:
27
  for frame in frames:
28
  shared_tensor = torch.from_numpy(frame.to_ndarray())
29
  shared_tensor_ref = ray.put(shared_tensor)
30
- self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
31
  except Exception as e:
32
  print (e)
33
  return frames
@@ -51,7 +51,7 @@ class StreamlitAVQueue:
51
  sound_chunk += sound
52
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
53
  shared_buffer_ref = ray.put(shared_buffer)
54
- self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
55
  except Exception as e:
56
  print (e)
57
 
 
27
  for frame in frames:
28
  shared_tensor = torch.from_numpy(frame.to_ndarray())
29
  shared_tensor_ref = ray.put(shared_tensor)
30
+ await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
31
  except Exception as e:
32
  print (e)
33
  return frames
 
51
  sound_chunk += sound
52
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
53
  shared_buffer_ref = ray.put(shared_buffer)
54
+ await self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
55
  except Exception as e:
56
  print (e)
57
 
webrtc_av_queue_actor.py CHANGED
@@ -13,34 +13,34 @@ class WebRtcAVQueueActor:
13
  self.out_audio_queue = Queue(maxsize=100) # Adjust the size as needed
14
 
15
 
16
- def enqueue_in_video_frame(self, shared_tensor_ref):
17
  if self.in_video_queue.full():
18
- evicted_item = self.in_video_queue.get()
19
  del evicted_item
20
- self.in_video_queue.put(shared_tensor_ref)
21
 
22
- def enqueue_in_audio_frame(self, shared_buffer_ref):
23
  if self.in_audio_queue.full():
24
- evicted_item = self.in_audio_queue.get()
25
  del evicted_item
26
- self.in_audio_queue.put(shared_buffer_ref)
27
 
28
 
29
- def get_in_audio_frames(self):
30
  audio_frames = []
31
  if self.in_audio_queue.empty():
32
  return audio_frames
33
  while not self.in_audio_queue.empty():
34
- shared_tensor_ref = self.in_audio_queue.get()
35
  audio_frames.append(shared_tensor_ref)
36
  return audio_frames
37
 
38
- def get_in_video_frames(self):
39
  video_frames = []
40
  if self.in_video_queue.empty():
41
  return video_frames
42
  while not self.in_video_queue.empty():
43
- shared_tensor_ref = self.in_video_queue.get()
44
  video_frames.append(shared_tensor_ref)
45
  return video_frames
46
 
 
13
  self.out_audio_queue = Queue(maxsize=100) # Adjust the size as needed
14
 
15
 
16
+ async def enqueue_in_video_frame(self, shared_tensor_ref):
17
  if self.in_video_queue.full():
18
+ evicted_item = await self.in_video_queue.get_async()
19
  del evicted_item
20
+ await self.in_video_queue.put_async(shared_tensor_ref)
21
 
22
+ async def enqueue_in_audio_frame(self, shared_buffer_ref):
23
  if self.in_audio_queue.full():
24
+ evicted_item = await self.in_audio_queue.get_async()
25
  del evicted_item
26
+ await self.in_audio_queue.put_async(shared_buffer_ref)
27
 
28
 
29
+ async def get_in_audio_frames(self):
30
  audio_frames = []
31
  if self.in_audio_queue.empty():
32
  return audio_frames
33
  while not self.in_audio_queue.empty():
34
+ shared_tensor_ref = await self.in_audio_queue.get_async()
35
  audio_frames.append(shared_tensor_ref)
36
  return audio_frames
37
 
38
+ async def get_in_video_frames(self):
39
  video_frames = []
40
  if self.in_video_queue.empty():
41
  return video_frames
42
  while not self.in_video_queue.empty():
43
+ shared_tensor_ref = await self.in_video_queue.get_async()
44
  video_frames.append(shared_tensor_ref)
45
  return video_frames
46