sohojoe commited on
Commit
ac35a95
β€’
1 Parent(s): c4fe843

refactor so we have in and then i can add out

Browse files
charles_actor.py CHANGED
@@ -56,7 +56,7 @@ class CharlesActor:
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
  self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
59
- audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
62
  # Concatenate all audio frames into a single buffer
 
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
  self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
59
+ audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
62
  # Concatenate all audio frames into a single buffer
streamlit_av_queue.py CHANGED
@@ -6,15 +6,15 @@ import threading
6
 
7
  import numpy as np
8
  import ray
9
- from input_av_queue_actor import InputAVQueueActor
10
  import pydub
11
  import torch
12
 
13
  class StreamlitAVQueue:
14
  def __init__(self, audio_bit_rate=16000):
15
  self._audio_bit_rate = audio_bit_rate
16
- self.queue_actor = InputAVQueueActor.options(
17
- name="InputAVQueueActor",
18
  get_if_exists=True,
19
  ).remote()
20
 
@@ -26,7 +26,7 @@ class StreamlitAVQueue:
26
  for frame in frames:
27
  shared_tensor = torch.from_numpy(frame.to_ndarray())
28
  shared_tensor_ref = ray.put(shared_tensor)
29
- self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref)
30
  except Exception as e:
31
  print (e)
32
  return frames
@@ -50,7 +50,7 @@ class StreamlitAVQueue:
50
  sound_chunk += sound
51
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
52
  shared_buffer_ref = ray.put(shared_buffer)
53
- self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref)
54
  except Exception as e:
55
  print (e)
56
 
@@ -66,10 +66,10 @@ class StreamlitAVQueue:
66
  new_frames.append(new_frame)
67
  return new_frames
68
 
69
- async def get_audio_frames_async(self) -> List[av.AudioFrame]:
70
- shared_buffers = await self.queue_actor.get_audio_frames.remote()
71
  return shared_buffers
72
 
73
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
74
- shared_tensors = await self.queue_actor.get_video_frames.remote()
75
  return shared_tensors
 
6
 
7
  import numpy as np
8
  import ray
9
+ from webrtc_av_queue_actor import WebRtcAVQueueActor
10
  import pydub
11
  import torch
12
 
13
  class StreamlitAVQueue:
14
  def __init__(self, audio_bit_rate=16000):
15
  self._audio_bit_rate = audio_bit_rate
16
+ self.queue_actor = WebRtcAVQueueActor.options(
17
+ name="WebRtcAVQueueActor",
18
  get_if_exists=True,
19
  ).remote()
20
 
 
26
  for frame in frames:
27
  shared_tensor = torch.from_numpy(frame.to_ndarray())
28
  shared_tensor_ref = ray.put(shared_tensor)
29
+ self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref)
30
  except Exception as e:
31
  print (e)
32
  return frames
 
50
  sound_chunk += sound
51
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
52
  shared_buffer_ref = ray.put(shared_buffer)
53
+ self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref)
54
  except Exception as e:
55
  print (e)
56
 
 
66
  new_frames.append(new_frame)
67
  return new_frames
68
 
69
+ async def get_in_audio_frames_async(self) -> List[av.AudioFrame]:
70
+ shared_buffers = await self.queue_actor.get_in_audio_frames.remote()
71
  return shared_buffers
72
 
73
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
74
+ shared_tensors = await self.queue_actor.get_in_video_frames.remote()
75
  return shared_tensors
input_av_queue_actor.py β†’ webrtc_av_queue_actor.py RENAMED
@@ -6,38 +6,38 @@ import numpy as np
6
 
7
 
8
  @ray.remote
9
- class InputAVQueueActor:
10
  def __init__(self):
11
- self.audio_queue = Queue(maxsize=100) # Adjust the size as needed
12
- self.video_queue = Queue(maxsize=100) # Adjust the size as needed
13
 
14
- def enqueue_video_frame(self, shared_tensor_ref):
15
- if self.video_queue.full():
16
- evicted_item = self.video_queue.get()
17
  del evicted_item
18
- self.video_queue.put(shared_tensor_ref)
19
 
20
- def enqueue_audio_frame(self, shared_buffer_ref):
21
- if self.audio_queue.full():
22
- evicted_item = self.audio_queue.get()
23
  del evicted_item
24
- self.audio_queue.put(shared_buffer_ref)
25
 
26
 
27
- def get_audio_frames(self):
28
  audio_frames = []
29
- if self.audio_queue.empty():
30
  return audio_frames
31
- while not self.audio_queue.empty():
32
- shared_tensor_ref = self.audio_queue.get()
33
  audio_frames.append(shared_tensor_ref)
34
  return audio_frames
35
 
36
- def get_video_frames(self):
37
  video_frames = []
38
- if self.video_queue.empty():
39
  return video_frames
40
- while not self.video_queue.empty():
41
- shared_tensor_ref = self.video_queue.get()
42
  video_frames.append(shared_tensor_ref)
43
  return video_frames
 
6
 
7
 
8
  @ray.remote
9
+ class WebRtcAVQueueActor:
10
  def __init__(self):
11
+ self.in_audio_queue = Queue(maxsize=100) # Adjust the size as needed
12
+ self.in_video_queue = Queue(maxsize=100) # Adjust the size as needed
13
 
14
+ def enqueue_in_video_frame(self, shared_tensor_ref):
15
+ if self.in_video_queue.full():
16
+ evicted_item = self.in_video_queue.get()
17
  del evicted_item
18
+ self.in_video_queue.put(shared_tensor_ref)
19
 
20
+ def enqueue_in_audio_frame(self, shared_buffer_ref):
21
+ if self.in_audio_queue.full():
22
+ evicted_item = self.in_audio_queue.get()
23
  del evicted_item
24
+ self.in_audio_queue.put(shared_buffer_ref)
25
 
26
 
27
+ def get_in_audio_frames(self):
28
  audio_frames = []
29
+ if self.in_audio_queue.empty():
30
  return audio_frames
31
+ while not self.in_audio_queue.empty():
32
+ shared_tensor_ref = self.in_audio_queue.get()
33
  audio_frames.append(shared_tensor_ref)
34
  return audio_frames
35
 
36
+ def get_in_video_frames(self):
37
  video_frames = []
38
+ if self.in_video_queue.empty():
39
  return video_frames
40
+ while not self.in_video_queue.empty():
41
+ shared_tensor_ref = self.in_video_queue.get()
42
  video_frames.append(shared_tensor_ref)
43
  return video_frames